You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/09/28 15:15:17 UTC

[45/69] [abbrv] stratos git commit: Publishing metering service event streams

Publishing metering service event streams


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b38e27c9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b38e27c9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b38e27c9

Branch: refs/heads/stratos-4.1.x
Commit: b38e27c9adfa5e28d62e8ba842118e0cf6daafcd
Parents: a34decc
Author: Thanuja <th...@wso2.com>
Authored: Tue Sep 22 22:25:24 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Mon Sep 28 18:44:17 2015 +0530

----------------------------------------------------------------------
 .../client/AutoscalerCloudControllerClient.java |   9 +-
 .../monitor/cluster/ClusterMonitor.java         |  16 +-
 .../autoscaler/rule/RuleTasksDelegator.java     |   5 +-
 .../publisher/AutoscalerPublisherFactory.java   |  37 +++
 .../publisher/DASScalingDecisionPublisher.java  | 161 +++++++++++++
 .../publisher/ScalingDecisionPublisher.java     |  57 +++++
 .../autoscaler/util/AutoscalerConstants.java    |  19 ++
 .../messaging/topology/TopologyBuilder.java     | 227 +++++++++++++++----
 .../impl/CloudControllerServiceUtil.java        |  13 +-
 .../services/impl/InstanceCreator.java          |  13 --
 .../publisher/BAMUsageDataPublisher.java        | 213 -----------------
 .../CloudControllerPublisherFactory.java        |  55 +++++
 .../DASMemberInformationPublisher.java          | 161 +++++++++++++
 .../publisher/DASMemberStatusPublisher.java     | 127 +++++++++++
 .../publisher/MemberInformationPublisher.java   |  38 ++++
 .../publisher/MemberStatusPublisher.java        |  45 ++++
 .../util/CloudControllerConstants.java          |  32 +++
 .../controller/util/CloudControllerUtil.java    |  16 +-
 .../common/client/AutoscalerServiceClient.java  |  20 +-
 .../common/constants/StratosConstants.java      |   1 +
 .../src/main/conf/drools/dependent-scaling.drl  |  21 +-
 .../src/main/conf/drools/mincheck.drl           |  22 +-
 .../src/main/conf/drools/scaling.drl            |  24 +-
 .../src/test/resources/common/scaling.drl       |  25 +-
 .../resources/common/thrift-client-config.xml   |  22 +-
 25 files changed, 1064 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index 0124206..9f380d0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -75,8 +75,8 @@ public class AutoscalerCloudControllerClient {
 
     public synchronized MemberContext startInstance(PartitionRef partition,
                                                     String clusterId, String clusterInstanceId,
-                                                    String networkPartitionId,
-                                                    int minMemberCount) throws SpawningException {
+                                                    String networkPartitionId, int minMemberCount,
+                                                    String scalingDecisionId) throws SpawningException {
         try {
             if (log.isInfoEnabled()) {
                 log.info(String.format("Trying to spawn an instance via cloud controller: " +
@@ -102,8 +102,11 @@ public class AutoscalerCloudControllerClient {
             Property minCountProp = new Property();
             minCountProp.setName(StratosConstants.MIN_COUNT);
             minCountProp.setValue(String.valueOf(minMemberCount));
-
             memberContextProps.addProperty(minCountProp);
+            Property scalingDecisionIdProp = new Property();
+            scalingDecisionIdProp.setName(StratosConstants.SCALING_DECISION_ID);
+            scalingDecisionIdProp.setValue(String.valueOf(scalingDecisionId));
+            memberContextProps.addProperty(scalingDecisionIdProp);
             instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
 
             long startTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index 43493bd..aadad54 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -45,12 +45,10 @@ import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiv
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
 import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.apache.stratos.autoscaler.util.AutoscalerObjectConverter;
 import org.apache.stratos.autoscaler.util.ConfUtil;
 import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.common.Properties;
-import org.apache.stratos.common.Property;
 import org.apache.stratos.common.client.CloudControllerServiceClient;
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.common.threading.StratosThreadPool;
@@ -328,8 +326,8 @@ public class ClusterMonitor extends Monitor {
                             public void run() {
 
                                 if (log.isDebugEnabled()) {
-                                    log.debug(String.format("Cluster monitor is running: [application-id] %s [cluster-id]: " +
-                                            "%s", getAppId(), getClusterId()));
+                                    log.debug(String.format("Cluster monitor is running: [application-id] " +
+                                            "%s [cluster-id]: %s", getAppId(), getClusterId()));
                                 }
 
                                 instanceContext.getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
@@ -698,8 +696,8 @@ public class ClusterMonitor extends Monitor {
         Float floatValue = averageRequestsServingCapabilityEvent.getValue();
 
         if (log.isDebugEnabled()) {
-            log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s",
-                    clusterId, networkPartitionId, floatValue));
+            log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s " +
+                    "[value] %s", clusterId, networkPartitionId, floatValue));
         }
 
         ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
@@ -1231,7 +1229,8 @@ public class ClusterMonitor extends Monitor {
                     // active members
 
                     if (AutoscalerContext.getInstance().getAppMonitor(getAppId()).isForce()) {
-                        log.info(String.format("Terminating all remaining members of partition [partition-id] %s [application-id] %s", partitionContext.getPartitionId(), getAppId()));
+                        log.info(String.format("Terminating all remaining members of partition [partition-id] %s" +
+                                " [application-id] %s", partitionContext.getPartitionId(), getAppId()));
                         partitionContext.terminateAllRemainingInstances();
                     }
 
@@ -1301,7 +1300,8 @@ public class ClusterMonitor extends Monitor {
                         }
 
                         if (AutoscalerContext.getInstance().getAppMonitor(getAppId()).isForce()) {
-                            log.info(String.format("Terminating all remaining members of partition [partition-id] %s [application-id] %s", partitionContext.getPartitionId(), getAppId()));
+                            log.info(String.format("Terminating all remaining members of partition [partition-id] %s " +
+                                    "[application-id] %s", partitionContext.getPartitionId(), getAppId()));
                             partitionContext.terminateAllRemainingInstances();
                         }
                         //Need to terminate pending members

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 1a8bfc9..132306a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -171,9 +171,10 @@ public class RuleTasksDelegator {
      * @param clusterMonitorPartitionContext Cluster monitor partition context
      * @param clusterId                      Cluster id
      * @param clusterInstanceId              Instance id
+     * @param scalingDecisionId              Scaling Decision id
      */
     public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId,
-                              String clusterInstanceId) {
+                              String clusterInstanceId, String scalingDecisionId) {
 
         try {
             String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId();
@@ -194,7 +195,7 @@ public class RuleTasksDelegator {
                             .startInstance(clusterMonitorPartitionContext.getPartition(),
                                     clusterId,
                                     clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(),
-                                    minimumCountOfNetworkPartition);
+                                    minimumCountOfNetworkPartition, scalingDecisionId);
             if (memberContext != null) {
                 ClusterLevelPartitionContext partitionContext = clusterInstanceContext.
                         getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
new file mode 100644
index 0000000..d057108
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+
+/**
+ * Creating ScalingDecisionPublisher.
+ */
+public class AutoscalerPublisherFactory {
+
+    public static ScalingDecisionPublisher createScalingDecisionPublisher(StatisticsPublisherType type) {
+
+        if (type == StatisticsPublisherType.WSO2DAS) {
+            return new DASScalingDecisionPublisher();
+        } else {
+            throw new RuntimeException("Unknown statistics publisher type");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
new file mode 100644
index 0000000..a907043
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.util.AutoscalerConstants;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * MemberInfoPublisher to publish member information/metadata to DAS.
+ */
+public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
+
+    private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
+    private static final String DATA_STREAM_NAME = "scaling_decision";
+    private static final String VERSION = "1.0.0";
+    private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private ExecutorService executorService;
+
+    public DASScalingDecisionPublisher() {
+        super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
+        executorService = StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 10);
+    }
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            // Create stream definition
+            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("Member Information");
+            streamDefinition.setDescription("Member Information");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+
+            // Set payload definition
+            payloadData.add(new Attribute(AutoscalerConstants.TIMESTAMP, AttributeType.LONG));
+            payloadData.add(new Attribute(AutoscalerConstants.SCALING_DECISION_ID, AttributeType.STRING));
+            payloadData.add(new Attribute(AutoscalerConstants.CLUSTER_ID, AttributeType.STRING));
+            payloadData.add(new Attribute(AutoscalerConstants.MIN_INSTANCE_COUNT, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.MAX_INSTANCE_COUNT, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.RIF_PREDICTED, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.RIF_THRESHOLD, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.RIF_REQUIRED_INSTANCES, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.MC_PREDICTED, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.MC_THRESHOLD, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.MC_REQUIRED_INSTANCES, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.LA_PREDICTED, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.LA_THRESHOLD, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.LA_REQUIRED_INSTANCES, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.REQUIRED_INSTANCE_COUNT, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.ACTIVE_INSTANCE_COUNT, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.ADDITIONAL_INSTANCE_COUNT, AttributeType.INT));
+            payloadData.add(new Attribute(AutoscalerConstants.SCALING_REASON, AttributeType.STRING));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", e);
+        }
+    }
+
+    /**
+     * Publishing scaling decision to DAS.
+     *
+     * @param timestamp               Scaling Time
+     * @param scalingDecisionId       Scaling Decision Id
+     * @param clusterId               Cluster Id
+     * @param minInstanceCount        Minimum Instance Count
+     * @param maxInstanceCount        Maximum Instance Count
+     * @param rifPredicted            RIF Predicted
+     * @param rifThreshold            RIF Threshold
+     * @param rifRequiredInstances    RIF Required Instances
+     * @param mcPredicted             MC Predicted
+     * @param mcThreshold             MC Threshold
+     * @param mcRequiredInstances     MC Required Instances
+     * @param laPredicted             LA Predicted
+     * @param laThreshold             LA Threshold
+     * @param laRequiredInstance      LA Required Instance
+     * @param requiredInstanceCount   Required Instance Count
+     * @param activeInstanceCount     Active Instance Count
+     * @param additionalInstanceCount Additional Instance Needed
+     * @param scalingReason           Scaling Reason
+     */
+    @Override
+    public void publish(final Long timestamp, final String scalingDecisionId, final String clusterId,
+                        final int minInstanceCount, final int maxInstanceCount,
+                        final int rifPredicted, final int rifThreshold, final int rifRequiredInstances,
+                        final int mcPredicted, final int mcThreshold, final int mcRequiredInstances,
+                        final int laPredicted, final int laThreshold, final int laRequiredInstance,
+                        final int requiredInstanceCount, final int activeInstanceCount,
+                        final int additionalInstanceCount, final String scalingReason) {
+        Runnable publisher = new Runnable() {
+            @Override
+            public void run() {
+                if (log.isDebugEnabled())
+
+                {
+                    log.debug(String.format("Publishing scaling decision: [timestamp] %d [scaling_decision_id] %s " +
+                                    "[cluster_id] %s [min_instance_count] %d [max_instance_count] %d " +
+                                    "[rif_predicted] %d [rif_threshold] %d [rif_required_instances] %d " +
+                                    "[mc_predicted] %d [mc_threshold] %d [mc_required_instances] %d " +
+                                    "[la_predicted] %d [la_threshold] %d [la_required_instances] %d " +
+                                    "[required_instance_count] %d [active_instance_count] %d " +
+                                    "[addtitional_instance_count] %d [scaling_reason] %s",
+                            timestamp, scalingDecisionId, clusterId, minInstanceCount, maxInstanceCount, rifPredicted,
+                            rifThreshold, rifRequiredInstances, mcPredicted, mcThreshold, mcRequiredInstances,
+                            laPredicted, laThreshold, laRequiredInstance, requiredInstanceCount, activeInstanceCount,
+                            additionalInstanceCount, scalingReason));
+                }
+
+                //adding payload data
+                List<Object> payload = new ArrayList<Object>();
+                payload.add(timestamp);
+                payload.add(scalingDecisionId);
+                payload.add(clusterId);
+                payload.add(minInstanceCount);
+                payload.add(maxInstanceCount);
+                payload.add(rifPredicted);
+                payload.add(rifThreshold);
+                payload.add(rifRequiredInstances);
+                payload.add(mcPredicted);
+                payload.add(mcThreshold);
+                payload.add(mcRequiredInstances);
+                payload.add(laPredicted);
+                payload.add(laThreshold);
+                payload.add(laRequiredInstance);
+                payload.add(requiredInstanceCount);
+                payload.add(activeInstanceCount);
+                payload.add(additionalInstanceCount);
+                payload.add(scalingReason);
+                DASScalingDecisionPublisher.super.publish(payload.toArray());
+            }
+
+        };
+        executorService.execute(publisher);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
new file mode 100644
index 0000000..f7b0087
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+
+/**
+ * Scaling Decision Publisher interface.
+ */
+public interface ScalingDecisionPublisher extends StatisticsPublisher {
+    /**
+     * Publishing scaling decision to DAS.
+     *
+     * @param timestamp               Scaling Time
+     * @param scalingDecisionId       Scaling Decision Id
+     * @param clusterId               Cluster Id
+     * @param minInstanceCount        Minimum Instance Count
+     * @param maxInstanceCount        Maximum Instance Count
+     * @param rifPredicted            RIF Predicted
+     * @param rifThreshold            RIF Threshold
+     * @param rifRequiredInstances    RIF Required Instances
+     * @param mcPredicted             MC Predicted
+     * @param mcThreshold             MC Threshold
+     * @param mcRequiredInstances     MC Required Instances
+     * @param laPredicted             LA Predicted
+     * @param laThreshold             LA Threshold
+     * @param laRequiredInstance      LA Required Instance
+     * @param requiredInstanceCount   Required Instance Count
+     * @param activeInstanceCount     Active Instance Count
+     * @param additionalInstanceCount Additional Instance Needed
+     * @param scalingReason           Scaling Reason
+     */
+    public void publish(Long timestamp, String scalingDecisionId, String clusterId,
+                        int minInstanceCount, int maxInstanceCount,
+                        int rifPredicted, int rifThreshold, int rifRequiredInstances,
+                        int mcPredicted, int mcThreshold, int mcRequiredInstances,
+                        int laPredicted, int laThreshold, int laRequiredInstance,
+                        int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
+                        String scalingReason);
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index 788c79b..caea65a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -118,4 +118,23 @@ public final class AutoscalerConstants {
     public static final String IDENTITY_APPLICATION_SERVICE_SFX = "services/IdentityApplicationManagementService";
     public static final String TOKEN_ENDPOINT_SFX = "oauth2/token";
     public static final String TERMINATE_DEPENDENTS = "terminate-dependents";
+    //scaling decision payload values
+    public static final String TIMESTAMP = "timestamp";
+    public static final String SCALING_DECISION_ID = "scaling_decision_id";
+    public static final String CLUSTER_ID = "cluster_id";
+    public static final String MIN_INSTANCE_COUNT = "min_instance_count";
+    public static final String MAX_INSTANCE_COUNT = "max_instance_count";
+    public static final String RIF_PREDICTED = "rif_predicted";
+    public static final String RIF_THRESHOLD = "rif_threshold";
+    public static final String RIF_REQUIRED_INSTANCES = "rif_required_instances";
+    public static final String MC_PREDICTED = "mc_predicted";
+    public static final String MC_THRESHOLD = "mc_threshold";
+    public static final String MC_REQUIRED_INSTANCES = "mc_required_instances";
+    public static final String LA_PREDICTED = "la_predicted";
+    public static final String LA_THRESHOLD = "la_threshold";
+    public static final String LA_REQUIRED_INSTANCES = "la_required_instances";
+    public static final String REQUIRED_INSTANCE_COUNT = "required_instance_count";
+    public static final String ACTIVE_INSTANCE_COUNT = "active_instance_count";
+    public static final String ADDITIONAL_INSTANCE_COUNT = "additional_instance_count";
+    public static final String SCALING_REASON = "scaling_reason";
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 936f9ec..7348b81 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -28,9 +28,13 @@ import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeExcepti
 import org.apache.stratos.cloud.controller.exception.InvalidMemberException;
 import org.apache.stratos.cloud.controller.iaases.kubernetes.KubernetesIaas;
 import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
+import org.apache.stratos.cloud.controller.statistics.publisher.CloudControllerPublisherFactory;
+import org.apache.stratos.cloud.controller.statistics.publisher.MemberInformationPublisher;
+import org.apache.stratos.cloud.controller.statistics.publisher.MemberStatusPublisher;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
 import org.apache.stratos.kubernetes.client.KubernetesConstants;
 import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.instance.ClusterInstance;
@@ -148,7 +152,6 @@ public class TopologyBuilder {
     }
 
 
-
     public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) {
 
         TopologyManager.acquireWriteLock();
@@ -371,14 +374,15 @@ public class TopologyBuilder {
      */
     public static void handleMemberCreatedEvent(MemberContext memberContext) {
         Topology topology = TopologyManager.getTopology();
-
         Service service = topology.getService(memberContext.getCartridgeType());
         String clusterId = memberContext.getClusterId();
         Cluster cluster = service.getCluster(clusterId);
+        String applicationId = service.getCluster(memberContext.getClusterId()).getAppId();
         String memberId = memberContext.getMemberId();
         String clusterInstanceId = memberContext.getClusterInstanceId();
         String networkPartitionId = memberContext.getNetworkPartitionId();
         String partitionId = memberContext.getPartition().getId();
+        String clusterAlias = CloudControllerUtil.getAliasFromClusterId(memberContext.getClusterId());
         String lbClusterId = memberContext.getLbClusterId();
         long initTime = memberContext.getInitTime();
 
@@ -396,6 +400,31 @@ public class TopologyBuilder {
             member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
             cluster.addMember(member);
             TopologyManager.updateTopology(topology);
+
+            //member created time
+            Long timestamp = System.currentTimeMillis();
+            //publishing member status to DAS
+            MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+                    createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+
+            if (memStatusPublisher.isEnabled()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Publishing Member Status to DAS");
+                }
+                memStatusPublisher.publish(timestamp,
+                        applicationId,
+                        memberContext.getClusterId(),
+                        clusterAlias,
+                        memberContext.getClusterInstanceId(),
+                        memberContext.getCartridgeType(),
+                        memberContext.getNetworkPartitionId(),
+                        memberContext.getPartition().getId(),
+                        memberContext.getMemberId(),
+                        MemberStatus.Created.toString());
+            } else {
+                log.warn("Member Status Publisher is not enabled");
+            }
+
         } finally {
             TopologyManager.releaseWriteLock();
         }
@@ -411,6 +440,7 @@ public class TopologyBuilder {
     public static void handleMemberInitializedEvent(MemberContext memberContext) {
         Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(memberContext.getCartridgeType());
+
         if (service == null) {
             log.warn(String.format("Service %s does not exist",
                     memberContext.getCartridgeType()));
@@ -423,6 +453,8 @@ public class TopologyBuilder {
             return;
         }
 
+        String applicationId = service.getCluster(memberContext.getClusterId()).getAppId();
+        String clusterAlias = CloudControllerUtil.getAliasFromClusterId(memberContext.getClusterId());
         Member member = service.getCluster(memberContext.getClusterId()).
                 getMember(memberContext.getMemberId());
         if (member == null) {
@@ -464,18 +496,48 @@ public class TopologyBuilder {
                 log.info("Member status updated to initialized");
 
                 TopologyManager.updateTopology(topology);
-
+                //member intialized time
+                Long timestamp = System.currentTimeMillis();
                 TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
-                //publishing data
-                BAMUsageDataPublisher.publish(memberContext.getMemberId(),
-                        memberContext.getPartition().getId(),
-                        memberContext.getNetworkPartitionId(),
-                        memberContext.getClusterId(),
-                        memberContext.getCartridgeType(),
-                        MemberStatus.Initialized.toString(),
-                        null);
+                //publishing member information and status to DAS
+                MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory.
+                        createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS);
+
+                MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+                        createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+
+                if (memInfoPublisher.isEnabled()) {
+                    if (log.isDebugEnabled()) {
+                        log.info("Publishing Member Information");
+                    }
+                    String scalingDecisionId = memberContext.getProperties().getProperty(
+                            StratosConstants.SCALING_DECISION_ID).getValue();
+                    memInfoPublisher.publish(memberContext.getMemberId(), scalingDecisionId,
+                            memberContext.getInstanceMetadata());
+                } else {
+                    log.warn("Member Information Publisher is not enabled");
+                }
+                if (memStatusPublisher.isEnabled()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Publishing Member Status to DAS");
+                    }
+                    memStatusPublisher.publish(timestamp,
+                            applicationId,
+                            memberContext.getClusterId(),
+                            clusterAlias,
+                            memberContext.getClusterInstanceId(),
+                            memberContext.getCartridgeType(),
+                            memberContext.getNetworkPartitionId(),
+                            memberContext.getPartition().getId(),
+                            memberContext.getMemberId(),
+                            MemberStatus.Initialized.toString());
+                } else {
+                    log.warn("Member Status Publisher is not enabled");
+                }
             }
-        } finally {
+        } finally
+
+        {
             TopologyManager.releaseWriteLock();
         }
     }
@@ -495,6 +557,7 @@ public class TopologyBuilder {
         try {
             Topology topology = TopologyManager.getTopology();
             Service service = topology.getService(instanceStartedEvent.getServiceName());
+
             if (service == null) {
                 log.warn(String.format("Service %s does not exist",
                         instanceStartedEvent.getServiceName()));
@@ -507,6 +570,8 @@ public class TopologyBuilder {
                 return;
             }
 
+            String applicationId = service.getCluster(instanceStartedEvent.getClusterId()).getAppId();
+            String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceStartedEvent.getClusterId());
             Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
             Member member = cluster.getMember(instanceStartedEvent.getMemberId());
             if (member == null) {
@@ -527,16 +592,31 @@ public class TopologyBuilder {
                     log.info("member started event adding status started");
 
                     TopologyManager.updateTopology(topology);
+                    //member started time
+                    Long timestamp = System.currentTimeMillis();
                     //memberStartedEvent.
                     TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
-                    //publishing data
-                    BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
-                            instanceStartedEvent.getPartitionId(),
-                            instanceStartedEvent.getNetworkPartitionId(),
-                            instanceStartedEvent.getClusterId(),
-                            instanceStartedEvent.getServiceName(),
-                            MemberStatus.Starting.toString(),
-                            null);
+                    //publishing member status to DAS
+                    MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+                            createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+
+                    if (memStatusPublisher.isEnabled()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Publishing Member Status to DAS");
+                        }
+                        memStatusPublisher.publish(timestamp,
+                                applicationId,
+                                instanceStartedEvent.getClusterId(),
+                                clusterAlias,
+                                instanceStartedEvent.getClusterInstanceId(),
+                                instanceStartedEvent.getServiceName(),
+                                instanceStartedEvent.getNetworkPartitionId(),
+                                instanceStartedEvent.getPartitionId(),
+                                instanceStartedEvent.getMemberId(),
+                                MemberStatus.Starting.toString());
+                    } else {
+                        log.warn("Member Status Publisher is not enabled");
+                    }
                 }
             } finally {
                 TopologyManager.releaseWriteLock();
@@ -549,9 +629,11 @@ public class TopologyBuilder {
         }
     }
 
+
     public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
         Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(instanceActivatedEvent.getServiceName());
+
         if (service == null) {
             log.warn(String.format("Service %s does not exist",
                     instanceActivatedEvent.getServiceName()));
@@ -565,6 +647,9 @@ public class TopologyBuilder {
             return;
         }
 
+        String applicationId = service.getCluster(instanceActivatedEvent.getClusterId()).getAppId();
+        String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceActivatedEvent.getClusterId());
+
         Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
         if (member == null) {
             log.warn(String.format("Member %s does not exist",
@@ -587,7 +672,8 @@ public class TopologyBuilder {
             TopologyManager.acquireWriteLock();
             // try update lifecycle state
             if (!member.isStateTransitionValid(MemberStatus.Active)) {
-                log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
+                log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
+                        MemberStatus.Active + "]");
                 return;
             } else {
                 member.setStatus(MemberStatus.Active);
@@ -632,15 +718,31 @@ public class TopologyBuilder {
 
                 // Publish member activated event
                 TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+                //member activated time
+                Long timestamp = System.currentTimeMillis();
+                // Publish member activated event
+                TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
 
-                // Publish statistics data
-                BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
-                        memberActivatedEvent.getPartitionId(),
-                        memberActivatedEvent.getNetworkPartitionId(),
-                        memberActivatedEvent.getClusterId(),
-                        memberActivatedEvent.getServiceName(),
-                        MemberStatus.Active.toString(),
-                        null);
+                //publishing member status to DAS
+                MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+                        createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+                if (memStatusPublisher.isEnabled()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Publishing Member Status to DAS");
+                    }
+                    memStatusPublisher.publish(timestamp,
+                            applicationId,
+                            memberActivatedEvent.getClusterId(),
+                            clusterAlias,
+                            memberActivatedEvent.getClusterInstanceId(),
+                            memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getNetworkPartitionId(),
+                            memberActivatedEvent.getPartitionId(),
+                            memberActivatedEvent.getMemberId(),
+                            MemberStatus.Active.toString());
+                } else {
+                    log.warn("Member Status Publisher is not enabled");
+                }
             }
         } finally {
             TopologyManager.releaseWriteLock();
@@ -651,6 +753,7 @@ public class TopologyBuilder {
             throws InvalidMemberException, InvalidCartridgeTypeException {
         Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
+
         //update the status of the member
         if (service == null) {
             log.warn(String.format("Service %s does not exist",
@@ -665,6 +768,8 @@ public class TopologyBuilder {
             return;
         }
 
+        String applicationId = service.getCluster(instanceReadyToShutdownEvent.getClusterId()).getAppId();
+        String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceReadyToShutdownEvent.getClusterId());
 
         Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
         if (member == null) {
@@ -679,6 +784,8 @@ public class TopologyBuilder {
                 instanceReadyToShutdownEvent.getMemberId(),
                 instanceReadyToShutdownEvent.getNetworkPartitionId(),
                 instanceReadyToShutdownEvent.getPartitionId());
+        //member ReadyToShutDown state change time
+        Long timestamp = null;
         try {
             TopologyManager.acquireWriteLock();
 
@@ -691,18 +798,31 @@ public class TopologyBuilder {
             log.info("Member Ready to shut down event adding status started");
 
             TopologyManager.updateTopology(topology);
+            timestamp = System.currentTimeMillis();
         } finally {
             TopologyManager.releaseWriteLock();
         }
         TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-        //publishing data
-        BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
-                instanceReadyToShutdownEvent.getPartitionId(),
-                instanceReadyToShutdownEvent.getNetworkPartitionId(),
-                instanceReadyToShutdownEvent.getClusterId(),
-                instanceReadyToShutdownEvent.getServiceName(),
-                MemberStatus.ReadyToShutDown.toString(),
-                null);
+        //publishing member status to DAS.
+        MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+                createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+        if (memStatusPublisher.isEnabled()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Publishing Member Status to DAS");
+            }
+            memStatusPublisher.publish(timestamp,
+                    applicationId,
+                    instanceReadyToShutdownEvent.getClusterId(),
+                    clusterAlias,
+                    instanceReadyToShutdownEvent.getClusterInstanceId(),
+                    instanceReadyToShutdownEvent.getServiceName(),
+                    instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                    instanceReadyToShutdownEvent.getPartitionId(),
+                    instanceReadyToShutdownEvent.getMemberId(),
+                    MemberStatus.ReadyToShutDown.toString());
+        } else {
+            log.warn("Member Status Publisher is not enabled");
+        }
         //termination of particular instance will be handled by autoscaler
     }
 
@@ -786,6 +906,8 @@ public class TopologyBuilder {
             return;
         }
 
+        String applicationId = service.getCluster(clusterId).getAppId();
+        String clusterAlias = CloudControllerUtil.getAliasFromClusterId(clusterId);
         Member member = cluster.getMember(memberId);
         if (member == null) {
             log.warn(String.format("Member %s does not exist",
@@ -795,6 +917,8 @@ public class TopologyBuilder {
 
         String clusterInstanceId = member.getClusterInstanceId();
 
+        //member terminated time
+        Long timestamp = null;
         try {
             TopologyManager.acquireWriteLock();
             properties = member.getProperties();
@@ -802,12 +926,34 @@ public class TopologyBuilder {
             TopologyManager.updateTopology(topology);
         } finally {
             TopologyManager.releaseWriteLock();
+            timestamp = System.currentTimeMillis();
         }
         /* @TODO leftover from grouping_poc*/
         String groupAlias = null;
         TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
                 clusterInstanceId, networkPartitionId,
                 partitionId, properties, groupAlias);
+
+        //publishing member status to DAS.
+        MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+                createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+        if (memStatusPublisher.isEnabled()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Publishing Member Status to DAS");
+            }
+            memStatusPublisher.publish(timestamp,
+                    applicationId,
+                    member.getClusterId(),
+                    clusterAlias,
+                    member.getClusterInstanceId(),
+                    member.getServiceName(),
+                    member.getNetworkPartitionId(),
+                    member.getPartitionId(),
+                    member.getMemberId(),
+                    MemberStatus.Terminated.toString());
+        } else {
+            log.warn("Member Status Publisher is not enabled");
+        }
     }
 
     public static void handleMemberSuspended() {
@@ -819,7 +965,8 @@ public class TopologyBuilder {
         }
     }
 
-    public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) {
+    public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent
+                                                           clusterStatusClusterActivatedEvent) {
 
         Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
@@ -855,7 +1002,7 @@ public class TopologyBuilder {
             Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(clusterStatusClusterActivatedEvent.getInstanceId());
 
             if (kubernetesServices != null) {
-               
+
                 try {
                     // Generate access URLs for kubernetes services
                     for (KubernetesService kubernetesService : kubernetesServices) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
index 37580eb..da23598 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
@@ -31,9 +31,7 @@ import org.apache.stratos.cloud.controller.exception.InvalidPartitionException;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
 import org.apache.stratos.cloud.controller.iaases.PartitionValidator;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
 import java.util.Properties;
 
@@ -49,7 +47,7 @@ public class CloudControllerServiceUtil {
     }
 
     /**
-     * Update the topology, publish statistics to BAM, remove member context
+     * Update the topology, publish statistics to DAS, remove member context
      * and persist cloud controller context.
      *
      * @param memberContext
@@ -66,15 +64,6 @@ public class CloudControllerServiceUtil {
                 memberContext.getClusterId(), memberContext.getNetworkPartitionId(),
                 partitionId, memberContext.getMemberId());
 
-        // Publish statistics to BAM
-        BAMUsageDataPublisher.publish(memberContext.getMemberId(),
-                partitionId,
-                memberContext.getNetworkPartitionId(),
-                memberContext.getClusterId(),
-                memberContext.getCartridgeType(),
-                MemberStatus.Terminated.toString(),
-                null);
-
         // Remove member context
         CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId());
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
index 77cfea2..afd7a23 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
@@ -27,9 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*;
 import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-
 import java.util.concurrent.locks.Lock;
 
 /**
@@ -84,16 +81,6 @@ public class InstanceCreator implements Runnable {
 
             // Update topology
             TopologyBuilder.handleMemberInitializedEvent(memberContext);
-
-            // Publish instance creation statistics to BAM
-            BAMUsageDataPublisher.publish(
-                    memberContext.getMemberId(),
-                    memberContext.getPartition().getId(),
-                    memberContext.getNetworkPartitionId(),
-                    memberContext.getClusterId(),
-                    memberContext.getCartridgeType(),
-                    MemberStatus.Initialized.toString(),
-                    memberContext.getInstanceMetadata());
         } catch (Exception e) {
             String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s",
                     memberContext.getCartridgeType(), memberContext.getClusterId());

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
deleted file mode 100644
index 56c5f87..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.statistics.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.domain.Cartridge;
-import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.cloud.controller.domain.MemberContext;
-import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.wso2.carbon.base.ServerConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-import org.wso2.carbon.utils.CarbonUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Usage data publisher for publishing instance usage data to BAM.
- */
-public class BAMUsageDataPublisher {
-
-    private static final Log log = LogFactory.getLog(BAMUsageDataPublisher.class);
-
-    private static AsyncDataPublisher dataPublisher;
-    private static StreamDefinition streamDefinition;
-    private static final String cloudControllerEventStreamVersion = "1.0.0";
-
-    public static void publish(String memberId,
-                               String partitionId,
-                               String networkId,
-                               String clusterId,
-                               String serviceName,
-                               String status,
-                               InstanceMetadata metadata) {
-        if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) {
-            return;
-        }
-        log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME + " cycle started.");
-
-        if (dataPublisher == null) {
-            createDataPublisher();
-
-            //If we cannot create a data publisher we should give up
-            //this means data will not be published
-            if (dataPublisher == null) {
-                log.error("Data Publisher cannot be created or found.");
-                release();
-                return;
-            }
-        }
-
-        MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
-        String cartridgeType = memberContext.getCartridgeType();
-        Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
-
-        //Construct the data to be published
-        List<Object> payload = new ArrayList<Object>();
-        // Payload values
-        payload.add(memberId);
-        payload.add(serviceName);
-        payload.add(clusterId);
-        payload.add(handleNull(memberContext.getLbClusterId()));
-        payload.add(handleNull(partitionId));
-        payload.add(handleNull(networkId));
-        if (cartridge != null) {
-            payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
-        } else {
-            payload.add("");
-        }
-        payload.add(handleNull(memberContext.getPartition().getProvider()));
-        payload.add(handleNull(status));
-
-        if (metadata != null) {
-            payload.add(metadata.getHostname());
-            payload.add(metadata.getHypervisor());
-            payload.add(String.valueOf(metadata.getRam()));
-            payload.add(metadata.getImageId());
-            payload.add(metadata.getLoginPort());
-            payload.add(metadata.getOperatingSystemName());
-            payload.add(metadata.getOperatingSystemVersion());
-            payload.add(metadata.getOperatingSystemArchitecture());
-            payload.add(String.valueOf(metadata.isOperatingSystem64bit()));
-        } else {
-            payload.add("");
-            payload.add("");
-            payload.add("");
-            payload.add("");
-            payload.add(0);
-            payload.add("");
-            payload.add("");
-            payload.add("");
-            payload.add("");
-        }
-
-        payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs())));
-        payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs())));
-        payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs())));
-
-        Event event = new Event();
-        event.setPayloadData(payload.toArray());
-        event.setArbitraryDataMap(new HashMap<String, String>());
-
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
-            }
-            dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
-        } catch (AgentException e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
-            }
-        }
-    }
-
-    private static void release() {
-        CloudControllerContext.getInstance().setPublisherRunning(false);
-    }
-
-    private static StreamDefinition initializeStream() throws Exception {
-        streamDefinition = new StreamDefinition(
-                CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM,
-                cloudControllerEventStreamVersion);
-        streamDefinition.setNickName("cloud.controller");
-        streamDefinition.setDescription("Instances booted up by the Cloud Controller");
-        // Payload definition
-        List<Attribute> payloadData = new ArrayList<Attribute>();
-        payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.IAAS_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.STATUS_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.RAM_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_LABEL, AttributeType.INT));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_LABEL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.ALLOCATE_IP_LABEL, AttributeType.STRING));
-        streamDefinition.setPayloadData(payloadData);
-        return streamDefinition;
-    }
-
-
-    private static void createDataPublisher() {
-        //creating the agent
-
-        ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration();
-        String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location");
-        String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password");
-        String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
-        String adminUsername = CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername();
-        String adminPassword = CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword();
-
-        System.setProperty("javax.net.ssl.trustStore", trustStorePath);
-        System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword);
-
-
-        try {
-            dataPublisher = new AsyncDataPublisher("tcp://" + bamServerUrl + "", adminUsername, adminPassword);
-            CloudControllerContext.getInstance().setDataPublisher(dataPublisher);
-            initializeStream();
-            dataPublisher.addStreamDefinition(streamDefinition);
-        } catch (Exception e) {
-            String msg = "Unable to create a data publisher to " + bamServerUrl +
-                    ". Usage Agent will not function properly. ";
-            log.error(msg, e);
-            throw new CloudControllerException(msg, e);
-        }
-    }
-
-    private static String handleNull(String val) {
-        if (val == null) {
-            return "";
-        }
-        return val;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
new file mode 100644
index 0000000..db68396
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+
+/**
+ * Creating MemberInformationPublisher.
+ */
+public class CloudControllerPublisherFactory {
+    /**
+     * Create member information publisher
+     *
+     * @param type StatisticsPublisherType
+     * @return MemberInformationPublisher
+     */
+    public static MemberInformationPublisher createMemberInformationPublisher(StatisticsPublisherType type) {
+        if (type == StatisticsPublisherType.WSO2DAS) {
+            return new DASMemberInformationPublisher();
+        } else {
+            throw new RuntimeException("Unknown statistics publisher type");
+        }
+    }
+
+    /**
+     * Create member status publisher
+     *
+     * @param type StatisticsPublisherType
+     * @return MemberStatusPublisher
+     */
+    public static MemberStatusPublisher createMemberStatusPublisher(StatisticsPublisherType type) {
+        if (type == StatisticsPublisherType.WSO2DAS) {
+            return new DASMemberStatusPublisher();
+        } else {
+            throw new RuntimeException("Unknown statistics publisher type");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
new file mode 100644
index 0000000..2bab194
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
+import org.apache.stratos.cloud.controller.domain.Cartridge;
+import org.apache.stratos.cloud.controller.domain.IaasProvider;
+import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
+import org.apache.stratos.cloud.controller.domain.MemberContext;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * MemberInfoPublisher to publish member information/metadata to DAS.
+ */
+public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
+
+    private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
+
+    private static final String DATA_STREAM_NAME = "member_info";
+    private static final String VERSION = "1.0.0";
+    private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private static final String NULL_VALUE = "Value Not Found";
+    private ExecutorService executorService;
+
+    public DASMemberInformationPublisher() {
+        super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
+        executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+    }
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            // Create stream definition
+            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("Member Information");
+            streamDefinition.setDescription("Member Information");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+
+            // Set payload definition
+            payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.SCALING_DECISION_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.ALLOCATED_IP_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.CPU_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, AttributeType.INT));
+            payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, AttributeType.BOOL));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", e);
+        }
+    }
+
+    /**
+     * Publishing member info to DAS.
+     *
+     * @param memberId          Member Id
+     * @param scalingDecisionId Scaling Decision Id
+     * @param metadata          InstanceMetadata
+     */
+    @Override
+    public void publish(final String memberId, final String scalingDecisionId, final InstanceMetadata metadata) {
+
+        Runnable publisher = new Runnable() {
+            @Override
+            public void run() {
+
+                if (metadata == null) {
+                    return;
+                } else {
+                    MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
+                    String cartridgeType = memberContext.getCartridgeType();
+                    Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
+                    IaasProvider iaasProvider = CloudControllerContext.getInstance().getIaasProviderOfPartition(
+                            cartridge.getType(), memberContext.getPartition().getId());
+                    String instanceType = iaasProvider.getProperty(CloudControllerConstants.INSTANCE_TYPE);
+
+                    //adding payload data
+                    List<Object> payload = new ArrayList<Object>();
+                    payload.add(memberId);
+                    payload.add(handleNull(instanceType));
+                    payload.add(scalingDecisionId);
+                    payload.add(String.valueOf(cartridge.isMultiTenant()));
+                    payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs())));
+                    payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs())));
+                    payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs())));
+                    payload.add(handleNull(metadata.getHostname()));
+                    payload.add(handleNull(metadata.getHypervisor()));
+                    payload.add(handleNull(metadata.getCpu()));
+                    payload.add(handleNull(metadata.getRam()));
+                    payload.add(handleNull(metadata.getImageId()));
+                    payload.add(metadata.getLoginPort());
+                    payload.add(handleNull(metadata.getOperatingSystemName()));
+                    payload.add(handleNull(metadata.getOperatingSystemVersion()));
+                    payload.add(handleNull(metadata.getOperatingSystemArchitecture()));
+                    payload.add(Boolean.valueOf(metadata.isOperatingSystem64bit()));
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Publishing member information: [member_id] %s [instance_type] %s " +
+                                        "[scaling_decison_id] %s [is_multi_tenant] %s [private_IPs] %s " +
+                                        "[public_IPs] %s [allocated_IPs] %s [host_name] %s [hypervisor] %s [cpu] %s " +
+                                        "[ram] %s [image_id] %s [login_port] %d [os_name] %s " +
+                                        "[os_version] %s [os_arch] %s [is_os_64bit] %b",
+                                memberId, instanceType, scalingDecisionId, String.valueOf(cartridge.isMultiTenant()),
+                                memberContext.getPrivateIPs(), memberContext.getPublicIPs(),
+                                memberContext.getAllocatedIPs(), metadata.getHostname(), metadata.getHypervisor(),
+                                metadata.getCpu(), metadata.getRam(), metadata.getImageId(), metadata.getLoginPort(),
+                                metadata.getOperatingSystemName(), metadata.getOperatingSystemVersion(),
+                                metadata.getOperatingSystemArchitecture(), metadata.isOperatingSystem64bit()));
+                    }
+                    DASMemberInformationPublisher.super.publish(payload.toArray());
+                }
+            }
+        };
+        executorService.execute(publisher);
+    }
+
+    public static String handleNull(String param) {
+        if (null != param) {
+            return param;
+        }
+        return NULL_VALUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
new file mode 100644
index 0000000..877256d
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Publishing member status to DAS.
+ */
+public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
+
+    private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
+    private static final String DATA_STREAM_NAME = "member_lifecycle";
+    private static final String VERSION = "1.0.0";
+    private static final String DAS_THRIFT_CLIENT_NAME = "das";
+    private ExecutorService executorService;
+
+    public DASMemberStatusPublisher() {
+        super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
+        executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+    }
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            // Create stream definition
+            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("Member Lifecycle");
+            streamDefinition.setDescription("Member Lifecycle");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+
+            // Set payload definition
+            payloadData.add(new Attribute(CloudControllerConstants.TIMESTAMP_COL, AttributeType.LONG));
+            payloadData.add(new Attribute(CloudControllerConstants.APPLICATION_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ALIAS_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.SERVICE_NAME_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.NETWORK_PARTITION_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING));
+            payloadData.add(new Attribute(CloudControllerConstants.MEMBER_STATUS_COL, AttributeType.STRING));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", e);
+        }
+    }
+
+    /**
+     * publishing Member Status to DAS.
+     *
+     * @param timestamp          Status changed time
+     * @param applicationId      Application Id
+     * @param clusterId          Cluster Id
+     * @param clusterAlias       Cluster Alias
+     * @param clusterInstanceId  Cluster Instance Id
+     * @param networkPartitionId Network Partition Id
+     * @param partitionId        Partition Id
+     * @param serviceName        Service Name
+     * @param memberId           Member Id
+     * @param status             Member Status
+     * @parm tenantId            Tenant Id
+     */
+    @Override
+    public void publish(final Long timestamp, final String applicationId, final String clusterId,
+                        final String clusterAlias, final String clusterInstanceId,
+                        final String serviceName, final String networkPartitionId, final String partitionId,
+                        final String memberId, final String status) {
+
+        Runnable publisher = new Runnable() {
+            @Override
+            public void run() {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Publishing member status: [timestamp] %d application_id] %s " +
+                                    "[cluster_id] %s [cluster_alias] %s [cluster_instance_id] %s [service_name] %s " +
+                                    "[network_partition_id] %s [partition_id] %s " +
+                                    "[member_id] %s [member_status] %s ",
+                            timestamp, applicationId, clusterId, clusterAlias, clusterInstanceId, serviceName,
+                            networkPartitionId, partitionId, memberId, status));
+                }
+                //adding payload data
+                List<Object> payload = new ArrayList<Object>();
+                payload.add(timestamp);
+                payload.add(applicationId);
+                payload.add(clusterId);
+                payload.add(clusterAlias);
+                payload.add(clusterInstanceId);
+                payload.add(serviceName);
+                payload.add(networkPartitionId);
+                payload.add(partitionId);
+                payload.add(memberId);
+                payload.add(status);
+                DASMemberStatusPublisher.super.publish(payload.toArray());
+            }
+        };
+        executorService.execute(publisher);
+    }
+
+}