You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/09/28 15:15:17 UTC
[45/69] [abbrv] stratos git commit: Publishing metering service event
streams
Publishing metering service event streams
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b38e27c9
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b38e27c9
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b38e27c9
Branch: refs/heads/stratos-4.1.x
Commit: b38e27c9adfa5e28d62e8ba842118e0cf6daafcd
Parents: a34decc
Author: Thanuja <th...@wso2.com>
Authored: Tue Sep 22 22:25:24 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Mon Sep 28 18:44:17 2015 +0530
----------------------------------------------------------------------
.../client/AutoscalerCloudControllerClient.java | 9 +-
.../monitor/cluster/ClusterMonitor.java | 16 +-
.../autoscaler/rule/RuleTasksDelegator.java | 5 +-
.../publisher/AutoscalerPublisherFactory.java | 37 +++
.../publisher/DASScalingDecisionPublisher.java | 161 +++++++++++++
.../publisher/ScalingDecisionPublisher.java | 57 +++++
.../autoscaler/util/AutoscalerConstants.java | 19 ++
.../messaging/topology/TopologyBuilder.java | 227 +++++++++++++++----
.../impl/CloudControllerServiceUtil.java | 13 +-
.../services/impl/InstanceCreator.java | 13 --
.../publisher/BAMUsageDataPublisher.java | 213 -----------------
.../CloudControllerPublisherFactory.java | 55 +++++
.../DASMemberInformationPublisher.java | 161 +++++++++++++
.../publisher/DASMemberStatusPublisher.java | 127 +++++++++++
.../publisher/MemberInformationPublisher.java | 38 ++++
.../publisher/MemberStatusPublisher.java | 45 ++++
.../util/CloudControllerConstants.java | 32 +++
.../controller/util/CloudControllerUtil.java | 16 +-
.../common/client/AutoscalerServiceClient.java | 20 +-
.../common/constants/StratosConstants.java | 1 +
.../src/main/conf/drools/dependent-scaling.drl | 21 +-
.../src/main/conf/drools/mincheck.drl | 22 +-
.../src/main/conf/drools/scaling.drl | 24 +-
.../src/test/resources/common/scaling.drl | 25 +-
.../resources/common/thrift-client-config.xml | 22 +-
25 files changed, 1064 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index 0124206..9f380d0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -75,8 +75,8 @@ public class AutoscalerCloudControllerClient {
public synchronized MemberContext startInstance(PartitionRef partition,
String clusterId, String clusterInstanceId,
- String networkPartitionId,
- int minMemberCount) throws SpawningException {
+ String networkPartitionId, int minMemberCount,
+ String scalingDecisionId) throws SpawningException {
try {
if (log.isInfoEnabled()) {
log.info(String.format("Trying to spawn an instance via cloud controller: " +
@@ -102,8 +102,11 @@ public class AutoscalerCloudControllerClient {
Property minCountProp = new Property();
minCountProp.setName(StratosConstants.MIN_COUNT);
minCountProp.setValue(String.valueOf(minMemberCount));
-
memberContextProps.addProperty(minCountProp);
+ Property scalingDecisionIdProp = new Property();
+ scalingDecisionIdProp.setName(StratosConstants.SCALING_DECISION_ID);
+ scalingDecisionIdProp.setValue(String.valueOf(scalingDecisionId));
+ memberContextProps.addProperty(scalingDecisionIdProp);
instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index 43493bd..aadad54 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -45,12 +45,10 @@ import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiv
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
import org.apache.stratos.autoscaler.util.AutoscalerConstants;
-import org.apache.stratos.autoscaler.util.AutoscalerObjectConverter;
import org.apache.stratos.autoscaler.util.ConfUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
import org.apache.stratos.common.Properties;
-import org.apache.stratos.common.Property;
import org.apache.stratos.common.client.CloudControllerServiceClient;
import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.common.threading.StratosThreadPool;
@@ -328,8 +326,8 @@ public class ClusterMonitor extends Monitor {
public void run() {
if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster monitor is running: [application-id] %s [cluster-id]: " +
- "%s", getAppId(), getClusterId()));
+ log.debug(String.format("Cluster monitor is running: [application-id] " +
+ "%s [cluster-id]: %s", getAppId(), getClusterId()));
}
instanceContext.getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
@@ -698,8 +696,8 @@ public class ClusterMonitor extends Monitor {
Float floatValue = averageRequestsServingCapabilityEvent.getValue();
if (log.isDebugEnabled()) {
- log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
+ log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s " +
+ "[value] %s", clusterId, networkPartitionId, floatValue));
}
ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
@@ -1231,7 +1229,8 @@ public class ClusterMonitor extends Monitor {
// active members
if (AutoscalerContext.getInstance().getAppMonitor(getAppId()).isForce()) {
- log.info(String.format("Terminating all remaining members of partition [partition-id] %s [application-id] %s", partitionContext.getPartitionId(), getAppId()));
+ log.info(String.format("Terminating all remaining members of partition [partition-id] %s" +
+ " [application-id] %s", partitionContext.getPartitionId(), getAppId()));
partitionContext.terminateAllRemainingInstances();
}
@@ -1301,7 +1300,8 @@ public class ClusterMonitor extends Monitor {
}
if (AutoscalerContext.getInstance().getAppMonitor(getAppId()).isForce()) {
- log.info(String.format("Terminating all remaining members of partition [partition-id] %s [application-id] %s", partitionContext.getPartitionId(), getAppId()));
+ log.info(String.format("Terminating all remaining members of partition [partition-id] %s " +
+ "[application-id] %s", partitionContext.getPartitionId(), getAppId()));
partitionContext.terminateAllRemainingInstances();
}
//Need to terminate pending members
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 1a8bfc9..132306a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -171,9 +171,10 @@ public class RuleTasksDelegator {
* @param clusterMonitorPartitionContext Cluster monitor partition context
* @param clusterId Cluster id
* @param clusterInstanceId Instance id
+ * @param scalingDecisionId Scaling Decision id
*/
public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId,
- String clusterInstanceId) {
+ String clusterInstanceId, String scalingDecisionId) {
try {
String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId();
@@ -194,7 +195,7 @@ public class RuleTasksDelegator {
.startInstance(clusterMonitorPartitionContext.getPartition(),
clusterId,
clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(),
- minimumCountOfNetworkPartition);
+ minimumCountOfNetworkPartition, scalingDecisionId);
if (memberContext != null) {
ClusterLevelPartitionContext partitionContext = clusterInstanceContext.
getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
new file mode 100644
index 0000000..d057108
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/AutoscalerPublisherFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+
+/**
+ * Creating ScalingDecisionPublisher.
+ */
+public class AutoscalerPublisherFactory {
+
+ public static ScalingDecisionPublisher createScalingDecisionPublisher(StatisticsPublisherType type) {
+
+ if (type == StatisticsPublisherType.WSO2DAS) {
+ return new DASScalingDecisionPublisher();
+ } else {
+ throw new RuntimeException("Unknown statistics publisher type");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
new file mode 100644
index 0000000..a907043
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/DASScalingDecisionPublisher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.util.AutoscalerConstants;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * MemberInfoPublisher to publish member information/metadata to DAS.
+ */
+public class DASScalingDecisionPublisher extends ThriftStatisticsPublisher implements ScalingDecisionPublisher {
+
+ private static final Log log = LogFactory.getLog(DASScalingDecisionPublisher.class);
+ private static final String DATA_STREAM_NAME = "scaling_decision";
+ private static final String VERSION = "1.0.0";
+ private static final String DAS_THRIFT_CLIENT_NAME = "das";
+ private ExecutorService executorService;
+
+ public DASScalingDecisionPublisher() {
+ super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
+ executorService = StratosThreadPool.getExecutorService("autoscaler.stats.publisher.thread.pool", 10);
+ }
+
+ private static StreamDefinition createStreamDefinition() {
+ try {
+ // Create stream definition
+ StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+ streamDefinition.setNickName("Member Information");
+ streamDefinition.setDescription("Member Information");
+ List<Attribute> payloadData = new ArrayList<Attribute>();
+
+ // Set payload definition
+ payloadData.add(new Attribute(AutoscalerConstants.TIMESTAMP, AttributeType.LONG));
+ payloadData.add(new Attribute(AutoscalerConstants.SCALING_DECISION_ID, AttributeType.STRING));
+ payloadData.add(new Attribute(AutoscalerConstants.CLUSTER_ID, AttributeType.STRING));
+ payloadData.add(new Attribute(AutoscalerConstants.MIN_INSTANCE_COUNT, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.MAX_INSTANCE_COUNT, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.RIF_PREDICTED, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.RIF_THRESHOLD, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.RIF_REQUIRED_INSTANCES, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.MC_PREDICTED, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.MC_THRESHOLD, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.MC_REQUIRED_INSTANCES, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.LA_PREDICTED, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.LA_THRESHOLD, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.LA_REQUIRED_INSTANCES, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.REQUIRED_INSTANCE_COUNT, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.ACTIVE_INSTANCE_COUNT, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.ADDITIONAL_INSTANCE_COUNT, AttributeType.INT));
+ payloadData.add(new Attribute(AutoscalerConstants.SCALING_REASON, AttributeType.STRING));
+ streamDefinition.setPayloadData(payloadData);
+ return streamDefinition;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create stream definition", e);
+ }
+ }
+
+ /**
+ * Publishing scaling decision to DAS.
+ *
+ * @param timestamp Scaling Time
+ * @param scalingDecisionId Scaling Decision Id
+ * @param clusterId Cluster Id
+ * @param minInstanceCount Minimum Instance Count
+ * @param maxInstanceCount Maximum Instance Count
+ * @param rifPredicted RIF Predicted
+ * @param rifThreshold RIF Threshold
+ * @param rifRequiredInstances RIF Required Instances
+ * @param mcPredicted MC Predicted
+ * @param mcThreshold MC Threshold
+ * @param mcRequiredInstances MC Required Instances
+ * @param laPredicted LA Predicted
+ * @param laThreshold LA Threshold
+ * @param laRequiredInstance LA Required Instance
+ * @param requiredInstanceCount Required Instance Count
+ * @param activeInstanceCount Active Instance Count
+ * @param additionalInstanceCount Additional Instance Needed
+ * @param scalingReason Scaling Reason
+ */
+ @Override
+ public void publish(final Long timestamp, final String scalingDecisionId, final String clusterId,
+ final int minInstanceCount, final int maxInstanceCount,
+ final int rifPredicted, final int rifThreshold, final int rifRequiredInstances,
+ final int mcPredicted, final int mcThreshold, final int mcRequiredInstances,
+ final int laPredicted, final int laThreshold, final int laRequiredInstance,
+ final int requiredInstanceCount, final int activeInstanceCount,
+ final int additionalInstanceCount, final String scalingReason) {
+ Runnable publisher = new Runnable() {
+ @Override
+ public void run() {
+ if (log.isDebugEnabled())
+
+ {
+ log.debug(String.format("Publishing scaling decision: [timestamp] %d [scaling_decision_id] %s " +
+ "[cluster_id] %s [min_instance_count] %d [max_instance_count] %d " +
+ "[rif_predicted] %d [rif_threshold] %d [rif_required_instances] %d " +
+ "[mc_predicted] %d [mc_threshold] %d [mc_required_instances] %d " +
+ "[la_predicted] %d [la_threshold] %d [la_required_instances] %d " +
+ "[required_instance_count] %d [active_instance_count] %d " +
+ "[addtitional_instance_count] %d [scaling_reason] %s",
+ timestamp, scalingDecisionId, clusterId, minInstanceCount, maxInstanceCount, rifPredicted,
+ rifThreshold, rifRequiredInstances, mcPredicted, mcThreshold, mcRequiredInstances,
+ laPredicted, laThreshold, laRequiredInstance, requiredInstanceCount, activeInstanceCount,
+ additionalInstanceCount, scalingReason));
+ }
+
+ //adding payload data
+ List<Object> payload = new ArrayList<Object>();
+ payload.add(timestamp);
+ payload.add(scalingDecisionId);
+ payload.add(clusterId);
+ payload.add(minInstanceCount);
+ payload.add(maxInstanceCount);
+ payload.add(rifPredicted);
+ payload.add(rifThreshold);
+ payload.add(rifRequiredInstances);
+ payload.add(mcPredicted);
+ payload.add(mcThreshold);
+ payload.add(mcRequiredInstances);
+ payload.add(laPredicted);
+ payload.add(laThreshold);
+ payload.add(laRequiredInstance);
+ payload.add(requiredInstanceCount);
+ payload.add(activeInstanceCount);
+ payload.add(additionalInstanceCount);
+ payload.add(scalingReason);
+ DASScalingDecisionPublisher.super.publish(payload.toArray());
+ }
+
+ };
+ executorService.execute(publisher);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
new file mode 100644
index 0000000..f7b0087
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/statistics/publisher/ScalingDecisionPublisher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
+
+/**
+ * Scaling Decision Publisher interface.
+ */
+public interface ScalingDecisionPublisher extends StatisticsPublisher {
+ /**
+ * Publishing scaling decision to DAS.
+ *
+ * @param timestamp Scaling Time
+ * @param scalingDecisionId Scaling Decision Id
+ * @param clusterId Cluster Id
+ * @param minInstanceCount Minimum Instance Count
+ * @param maxInstanceCount Maximum Instance Count
+ * @param rifPredicted RIF Predicted
+ * @param rifThreshold RIF Threshold
+ * @param rifRequiredInstances RIF Required Instances
+ * @param mcPredicted MC Predicted
+ * @param mcThreshold MC Threshold
+ * @param mcRequiredInstances MC Required Instances
+ * @param laPredicted LA Predicted
+ * @param laThreshold LA Threshold
+ * @param laRequiredInstance LA Required Instance
+ * @param requiredInstanceCount Required Instance Count
+ * @param activeInstanceCount Active Instance Count
+ * @param additionalInstanceCount Additional Instance Needed
+ * @param scalingReason Scaling Reason
+ */
+ public void publish(Long timestamp, String scalingDecisionId, String clusterId,
+ int minInstanceCount, int maxInstanceCount,
+ int rifPredicted, int rifThreshold, int rifRequiredInstances,
+ int mcPredicted, int mcThreshold, int mcRequiredInstances,
+ int laPredicted, int laThreshold, int laRequiredInstance,
+ int requiredInstanceCount, int activeInstanceCount, int additionalInstanceCount,
+ String scalingReason);
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index 788c79b..caea65a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -118,4 +118,23 @@ public final class AutoscalerConstants {
public static final String IDENTITY_APPLICATION_SERVICE_SFX = "services/IdentityApplicationManagementService";
public static final String TOKEN_ENDPOINT_SFX = "oauth2/token";
public static final String TERMINATE_DEPENDENTS = "terminate-dependents";
+ //scaling decision payload values
+ public static final String TIMESTAMP = "timestamp";
+ public static final String SCALING_DECISION_ID = "scaling_decision_id";
+ public static final String CLUSTER_ID = "cluster_id";
+ public static final String MIN_INSTANCE_COUNT = "min_instance_count";
+ public static final String MAX_INSTANCE_COUNT = "max_instance_count";
+ public static final String RIF_PREDICTED = "rif_predicted";
+ public static final String RIF_THRESHOLD = "rif_threshold";
+ public static final String RIF_REQUIRED_INSTANCES = "rif_required_instances";
+ public static final String MC_PREDICTED = "mc_predicted";
+ public static final String MC_THRESHOLD = "mc_threshold";
+ public static final String MC_REQUIRED_INSTANCES = "mc_required_instances";
+ public static final String LA_PREDICTED = "la_predicted";
+ public static final String LA_THRESHOLD = "la_threshold";
+ public static final String LA_REQUIRED_INSTANCES = "la_required_instances";
+ public static final String REQUIRED_INSTANCE_COUNT = "required_instance_count";
+ public static final String ACTIVE_INSTANCE_COUNT = "active_instance_count";
+ public static final String ADDITIONAL_INSTANCE_COUNT = "additional_instance_count";
+ public static final String SCALING_REASON = "scaling_reason";
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 936f9ec..7348b81 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -28,9 +28,13 @@ import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeExcepti
import org.apache.stratos.cloud.controller.exception.InvalidMemberException;
import org.apache.stratos.cloud.controller.iaases.kubernetes.KubernetesIaas;
import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
+import org.apache.stratos.cloud.controller.statistics.publisher.CloudControllerPublisherFactory;
+import org.apache.stratos.cloud.controller.statistics.publisher.MemberInformationPublisher;
+import org.apache.stratos.cloud.controller.statistics.publisher.MemberStatusPublisher;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
import org.apache.stratos.kubernetes.client.KubernetesConstants;
import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
@@ -148,7 +152,6 @@ public class TopologyBuilder {
}
-
public static void handleApplicationClustersCreated(String appId, List<Cluster> appClusters) {
TopologyManager.acquireWriteLock();
@@ -371,14 +374,15 @@ public class TopologyBuilder {
*/
public static void handleMemberCreatedEvent(MemberContext memberContext) {
Topology topology = TopologyManager.getTopology();
-
Service service = topology.getService(memberContext.getCartridgeType());
String clusterId = memberContext.getClusterId();
Cluster cluster = service.getCluster(clusterId);
+ String applicationId = service.getCluster(memberContext.getClusterId()).getAppId();
String memberId = memberContext.getMemberId();
String clusterInstanceId = memberContext.getClusterInstanceId();
String networkPartitionId = memberContext.getNetworkPartitionId();
String partitionId = memberContext.getPartition().getId();
+ String clusterAlias = CloudControllerUtil.getAliasFromClusterId(memberContext.getClusterId());
String lbClusterId = memberContext.getLbClusterId();
long initTime = memberContext.getInitTime();
@@ -396,6 +400,31 @@ public class TopologyBuilder {
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
cluster.addMember(member);
TopologyManager.updateTopology(topology);
+
+ //member created time
+ Long timestamp = System.currentTimeMillis();
+ //publishing member status to DAS
+ MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+
+ if (memStatusPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing Member Status to DAS");
+ }
+ memStatusPublisher.publish(timestamp,
+ applicationId,
+ memberContext.getClusterId(),
+ clusterAlias,
+ memberContext.getClusterInstanceId(),
+ memberContext.getCartridgeType(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getPartition().getId(),
+ memberContext.getMemberId(),
+ MemberStatus.Created.toString());
+ } else {
+ log.warn("Member Status Publisher is not enabled");
+ }
+
} finally {
TopologyManager.releaseWriteLock();
}
@@ -411,6 +440,7 @@ public class TopologyBuilder {
public static void handleMemberInitializedEvent(MemberContext memberContext) {
Topology topology = TopologyManager.getTopology();
Service service = topology.getService(memberContext.getCartridgeType());
+
if (service == null) {
log.warn(String.format("Service %s does not exist",
memberContext.getCartridgeType()));
@@ -423,6 +453,8 @@ public class TopologyBuilder {
return;
}
+ String applicationId = service.getCluster(memberContext.getClusterId()).getAppId();
+ String clusterAlias = CloudControllerUtil.getAliasFromClusterId(memberContext.getClusterId());
Member member = service.getCluster(memberContext.getClusterId()).
getMember(memberContext.getMemberId());
if (member == null) {
@@ -464,18 +496,48 @@ public class TopologyBuilder {
log.info("Member status updated to initialized");
TopologyManager.updateTopology(topology);
-
+ //member intialized time
+ Long timestamp = System.currentTimeMillis();
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
- //publishing data
- BAMUsageDataPublisher.publish(memberContext.getMemberId(),
- memberContext.getPartition().getId(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- null);
+ //publishing member information and status to DAS
+ MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory.
+ createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS);
+
+ MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+
+ if (memInfoPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.info("Publishing Member Information");
+ }
+ String scalingDecisionId = memberContext.getProperties().getProperty(
+ StratosConstants.SCALING_DECISION_ID).getValue();
+ memInfoPublisher.publish(memberContext.getMemberId(), scalingDecisionId,
+ memberContext.getInstanceMetadata());
+ } else {
+ log.warn("Member Information Publisher is not enabled");
+ }
+ if (memStatusPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing Member Status to DAS");
+ }
+ memStatusPublisher.publish(timestamp,
+ applicationId,
+ memberContext.getClusterId(),
+ clusterAlias,
+ memberContext.getClusterInstanceId(),
+ memberContext.getCartridgeType(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getPartition().getId(),
+ memberContext.getMemberId(),
+ MemberStatus.Initialized.toString());
+ } else {
+ log.warn("Member Status Publisher is not enabled");
+ }
}
- } finally {
+ } finally
+
+ {
TopologyManager.releaseWriteLock();
}
}
@@ -495,6 +557,7 @@ public class TopologyBuilder {
try {
Topology topology = TopologyManager.getTopology();
Service service = topology.getService(instanceStartedEvent.getServiceName());
+
if (service == null) {
log.warn(String.format("Service %s does not exist",
instanceStartedEvent.getServiceName()));
@@ -507,6 +570,8 @@ public class TopologyBuilder {
return;
}
+ String applicationId = service.getCluster(instanceStartedEvent.getClusterId()).getAppId();
+ String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceStartedEvent.getClusterId());
Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
Member member = cluster.getMember(instanceStartedEvent.getMemberId());
if (member == null) {
@@ -527,16 +592,31 @@ public class TopologyBuilder {
log.info("member started event adding status started");
TopologyManager.updateTopology(topology);
+ //member started time
+ Long timestamp = System.currentTimeMillis();
//memberStartedEvent.
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
- //publishing data
- BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName(),
- MemberStatus.Starting.toString(),
- null);
+ //publishing member status to DAS
+ MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+
+ if (memStatusPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing Member Status to DAS");
+ }
+ memStatusPublisher.publish(timestamp,
+ applicationId,
+ instanceStartedEvent.getClusterId(),
+ clusterAlias,
+ instanceStartedEvent.getClusterInstanceId(),
+ instanceStartedEvent.getServiceName(),
+ instanceStartedEvent.getNetworkPartitionId(),
+ instanceStartedEvent.getPartitionId(),
+ instanceStartedEvent.getMemberId(),
+ MemberStatus.Starting.toString());
+ } else {
+ log.warn("Member Status Publisher is not enabled");
+ }
}
} finally {
TopologyManager.releaseWriteLock();
@@ -549,9 +629,11 @@ public class TopologyBuilder {
}
}
+
public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
Topology topology = TopologyManager.getTopology();
Service service = topology.getService(instanceActivatedEvent.getServiceName());
+
if (service == null) {
log.warn(String.format("Service %s does not exist",
instanceActivatedEvent.getServiceName()));
@@ -565,6 +647,9 @@ public class TopologyBuilder {
return;
}
+ String applicationId = service.getCluster(instanceActivatedEvent.getClusterId()).getAppId();
+ String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceActivatedEvent.getClusterId());
+
Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
if (member == null) {
log.warn(String.format("Member %s does not exist",
@@ -587,7 +672,8 @@ public class TopologyBuilder {
TopologyManager.acquireWriteLock();
// try update lifecycle state
if (!member.isStateTransitionValid(MemberStatus.Active)) {
- log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
+ log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
+ MemberStatus.Active + "]");
return;
} else {
member.setStatus(MemberStatus.Active);
@@ -632,15 +718,31 @@ public class TopologyBuilder {
// Publish member activated event
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+ //member activated time
+ Long timestamp = System.currentTimeMillis();
+ // Publish member activated event
+ TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
- // Publish statistics data
- BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getServiceName(),
- MemberStatus.Active.toString(),
- null);
+ //publishing member status to DAS
+ MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+ if (memStatusPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing Member Status to DAS");
+ }
+ memStatusPublisher.publish(timestamp,
+ applicationId,
+ memberActivatedEvent.getClusterId(),
+ clusterAlias,
+ memberActivatedEvent.getClusterInstanceId(),
+ memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getNetworkPartitionId(),
+ memberActivatedEvent.getPartitionId(),
+ memberActivatedEvent.getMemberId(),
+ MemberStatus.Active.toString());
+ } else {
+ log.warn("Member Status Publisher is not enabled");
+ }
}
} finally {
TopologyManager.releaseWriteLock();
@@ -651,6 +753,7 @@ public class TopologyBuilder {
throws InvalidMemberException, InvalidCartridgeTypeException {
Topology topology = TopologyManager.getTopology();
Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
+
//update the status of the member
if (service == null) {
log.warn(String.format("Service %s does not exist",
@@ -665,6 +768,8 @@ public class TopologyBuilder {
return;
}
+ String applicationId = service.getCluster(instanceReadyToShutdownEvent.getClusterId()).getAppId();
+ String clusterAlias = CloudControllerUtil.getAliasFromClusterId(instanceReadyToShutdownEvent.getClusterId());
Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
if (member == null) {
@@ -679,6 +784,8 @@ public class TopologyBuilder {
instanceReadyToShutdownEvent.getMemberId(),
instanceReadyToShutdownEvent.getNetworkPartitionId(),
instanceReadyToShutdownEvent.getPartitionId());
+ //member ReadyToShutDown state change time
+ Long timestamp = null;
try {
TopologyManager.acquireWriteLock();
@@ -691,18 +798,31 @@ public class TopologyBuilder {
log.info("Member Ready to shut down event adding status started");
TopologyManager.updateTopology(topology);
+ timestamp = System.currentTimeMillis();
} finally {
TopologyManager.releaseWriteLock();
}
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- //publishing data
- BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getServiceName(),
- MemberStatus.ReadyToShutDown.toString(),
- null);
+ //publishing member status to DAS.
+ MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+ if (memStatusPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing Member Status to DAS");
+ }
+ memStatusPublisher.publish(timestamp,
+ applicationId,
+ instanceReadyToShutdownEvent.getClusterId(),
+ clusterAlias,
+ instanceReadyToShutdownEvent.getClusterInstanceId(),
+ instanceReadyToShutdownEvent.getServiceName(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getPartitionId(),
+ instanceReadyToShutdownEvent.getMemberId(),
+ MemberStatus.ReadyToShutDown.toString());
+ } else {
+ log.warn("Member Status Publisher is not enabled");
+ }
//termination of particular instance will be handled by autoscaler
}
@@ -786,6 +906,8 @@ public class TopologyBuilder {
return;
}
+ String applicationId = service.getCluster(clusterId).getAppId();
+ String clusterAlias = CloudControllerUtil.getAliasFromClusterId(clusterId);
Member member = cluster.getMember(memberId);
if (member == null) {
log.warn(String.format("Member %s does not exist",
@@ -795,6 +917,8 @@ public class TopologyBuilder {
String clusterInstanceId = member.getClusterInstanceId();
+ //member terminated time
+ Long timestamp = null;
try {
TopologyManager.acquireWriteLock();
properties = member.getProperties();
@@ -802,12 +926,34 @@ public class TopologyBuilder {
TopologyManager.updateTopology(topology);
} finally {
TopologyManager.releaseWriteLock();
+ timestamp = System.currentTimeMillis();
}
/* @TODO leftover from grouping_poc*/
String groupAlias = null;
TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
clusterInstanceId, networkPartitionId,
partitionId, properties, groupAlias);
+
+ //publishing member status to DAS.
+ MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory.
+ createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS);
+ if (memStatusPublisher.isEnabled()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Publishing Member Status to DAS");
+ }
+ memStatusPublisher.publish(timestamp,
+ applicationId,
+ member.getClusterId(),
+ clusterAlias,
+ member.getClusterInstanceId(),
+ member.getServiceName(),
+ member.getNetworkPartitionId(),
+ member.getPartitionId(),
+ member.getMemberId(),
+ MemberStatus.Terminated.toString());
+ } else {
+ log.warn("Member Status Publisher is not enabled");
+ }
}
public static void handleMemberSuspended() {
@@ -819,7 +965,8 @@ public class TopologyBuilder {
}
}
- public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) {
+ public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent
+ clusterStatusClusterActivatedEvent) {
Topology topology = TopologyManager.getTopology();
Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
@@ -855,7 +1002,7 @@ public class TopologyBuilder {
Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices(clusterStatusClusterActivatedEvent.getInstanceId());
if (kubernetesServices != null) {
-
+
try {
// Generate access URLs for kubernetes services
for (KubernetesService kubernetesService : kubernetesServices) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
index 37580eb..da23598 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
@@ -31,9 +31,7 @@ import org.apache.stratos.cloud.controller.exception.InvalidPartitionException;
import org.apache.stratos.cloud.controller.iaases.Iaas;
import org.apache.stratos.cloud.controller.iaases.PartitionValidator;
import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
import java.util.Properties;
@@ -49,7 +47,7 @@ public class CloudControllerServiceUtil {
}
/**
- * Update the topology, publish statistics to BAM, remove member context
+ * Update the topology, publish statistics to DAS, remove member context
* and persist cloud controller context.
*
* @param memberContext
@@ -66,15 +64,6 @@ public class CloudControllerServiceUtil {
memberContext.getClusterId(), memberContext.getNetworkPartitionId(),
partitionId, memberContext.getMemberId());
- // Publish statistics to BAM
- BAMUsageDataPublisher.publish(memberContext.getMemberId(),
- partitionId,
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Terminated.toString(),
- null);
-
// Remove member context
CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId());
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
index 77cfea2..afd7a23 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
@@ -27,9 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*;
import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException;
import org.apache.stratos.cloud.controller.iaases.Iaas;
import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-
import java.util.concurrent.locks.Lock;
/**
@@ -84,16 +81,6 @@ public class InstanceCreator implements Runnable {
// Update topology
TopologyBuilder.handleMemberInitializedEvent(memberContext);
-
- // Publish instance creation statistics to BAM
- BAMUsageDataPublisher.publish(
- memberContext.getMemberId(),
- memberContext.getPartition().getId(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- memberContext.getInstanceMetadata());
} catch (Exception e) {
String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s",
memberContext.getCartridgeType(), memberContext.getClusterId());
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
deleted file mode 100644
index 56c5f87..0000000
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.statistics.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.domain.Cartridge;
-import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.cloud.controller.domain.MemberContext;
-import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.wso2.carbon.base.ServerConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-import org.wso2.carbon.utils.CarbonUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Usage data publisher for publishing instance usage data to BAM.
- */
-public class BAMUsageDataPublisher {
-
- private static final Log log = LogFactory.getLog(BAMUsageDataPublisher.class);
-
- private static AsyncDataPublisher dataPublisher;
- private static StreamDefinition streamDefinition;
- private static final String cloudControllerEventStreamVersion = "1.0.0";
-
- public static void publish(String memberId,
- String partitionId,
- String networkId,
- String clusterId,
- String serviceName,
- String status,
- InstanceMetadata metadata) {
- if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) {
- return;
- }
- log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME + " cycle started.");
-
- if (dataPublisher == null) {
- createDataPublisher();
-
- //If we cannot create a data publisher we should give up
- //this means data will not be published
- if (dataPublisher == null) {
- log.error("Data Publisher cannot be created or found.");
- release();
- return;
- }
- }
-
- MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
- String cartridgeType = memberContext.getCartridgeType();
- Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
-
- //Construct the data to be published
- List<Object> payload = new ArrayList<Object>();
- // Payload values
- payload.add(memberId);
- payload.add(serviceName);
- payload.add(clusterId);
- payload.add(handleNull(memberContext.getLbClusterId()));
- payload.add(handleNull(partitionId));
- payload.add(handleNull(networkId));
- if (cartridge != null) {
- payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
- } else {
- payload.add("");
- }
- payload.add(handleNull(memberContext.getPartition().getProvider()));
- payload.add(handleNull(status));
-
- if (metadata != null) {
- payload.add(metadata.getHostname());
- payload.add(metadata.getHypervisor());
- payload.add(String.valueOf(metadata.getRam()));
- payload.add(metadata.getImageId());
- payload.add(metadata.getLoginPort());
- payload.add(metadata.getOperatingSystemName());
- payload.add(metadata.getOperatingSystemVersion());
- payload.add(metadata.getOperatingSystemArchitecture());
- payload.add(String.valueOf(metadata.isOperatingSystem64bit()));
- } else {
- payload.add("");
- payload.add("");
- payload.add("");
- payload.add("");
- payload.add(0);
- payload.add("");
- payload.add("");
- payload.add("");
- payload.add("");
- }
-
- payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs())));
- payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs())));
- payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs())));
-
- Event event = new Event();
- event.setPayloadData(payload.toArray());
- event.setArbitraryDataMap(new HashMap<String, String>());
-
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
- }
- dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
- } catch (AgentException e) {
- if (log.isErrorEnabled()) {
- log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
- }
- }
- }
-
- private static void release() {
- CloudControllerContext.getInstance().setPublisherRunning(false);
- }
-
- private static StreamDefinition initializeStream() throws Exception {
- streamDefinition = new StreamDefinition(
- CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM,
- cloudControllerEventStreamVersion);
- streamDefinition.setNickName("cloud.controller");
- streamDefinition.setDescription("Instances booted up by the Cloud Controller");
- // Payload definition
- List<Attribute> payloadData = new ArrayList<Attribute>();
- payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.IAAS_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.STATUS_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.RAM_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_LABEL, AttributeType.INT));
- payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_LABEL, AttributeType.STRING));
- payloadData.add(new Attribute(CloudControllerConstants.ALLOCATE_IP_LABEL, AttributeType.STRING));
- streamDefinition.setPayloadData(payloadData);
- return streamDefinition;
- }
-
-
- private static void createDataPublisher() {
- //creating the agent
-
- ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration();
- String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location");
- String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password");
- String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
- String adminUsername = CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername();
- String adminPassword = CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword();
-
- System.setProperty("javax.net.ssl.trustStore", trustStorePath);
- System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword);
-
-
- try {
- dataPublisher = new AsyncDataPublisher("tcp://" + bamServerUrl + "", adminUsername, adminPassword);
- CloudControllerContext.getInstance().setDataPublisher(dataPublisher);
- initializeStream();
- dataPublisher.addStreamDefinition(streamDefinition);
- } catch (Exception e) {
- String msg = "Unable to create a data publisher to " + bamServerUrl +
- ". Usage Agent will not function properly. ";
- log.error(msg, e);
- throw new CloudControllerException(msg, e);
- }
- }
-
- private static String handleNull(String val) {
- if (val == null) {
- return "";
- }
- return val;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
new file mode 100644
index 0000000..db68396
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/CloudControllerPublisherFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+
+/**
+ * Creating MemberInformationPublisher.
+ */
+public class CloudControllerPublisherFactory {
+ /**
+ * Create member information publisher
+ *
+ * @param type StatisticsPublisherType
+ * @return MemberInformationPublisher
+ */
+ public static MemberInformationPublisher createMemberInformationPublisher(StatisticsPublisherType type) {
+ if (type == StatisticsPublisherType.WSO2DAS) {
+ return new DASMemberInformationPublisher();
+ } else {
+ throw new RuntimeException("Unknown statistics publisher type");
+ }
+ }
+
+ /**
+ * Create member status publisher
+ *
+ * @param type StatisticsPublisherType
+ * @return MemberStatusPublisher
+ */
+ public static MemberStatusPublisher createMemberStatusPublisher(StatisticsPublisherType type) {
+ if (type == StatisticsPublisherType.WSO2DAS) {
+ return new DASMemberStatusPublisher();
+ } else {
+ throw new RuntimeException("Unknown statistics publisher type");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
new file mode 100644
index 0000000..2bab194
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberInformationPublisher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
+import org.apache.stratos.cloud.controller.domain.Cartridge;
+import org.apache.stratos.cloud.controller.domain.IaasProvider;
+import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
+import org.apache.stratos.cloud.controller.domain.MemberContext;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * MemberInfoPublisher to publish member information/metadata to DAS.
+ */
+public class DASMemberInformationPublisher extends ThriftStatisticsPublisher implements MemberInformationPublisher {
+
+ private static final Log log = LogFactory.getLog(DASMemberInformationPublisher.class);
+
+ private static final String DATA_STREAM_NAME = "member_info";
+ private static final String VERSION = "1.0.0";
+ private static final String DAS_THRIFT_CLIENT_NAME = "das";
+ private static final String NULL_VALUE = "Value Not Found";
+ private ExecutorService executorService;
+
+ public DASMemberInformationPublisher() {
+ super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
+ executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+ }
+
+ private static StreamDefinition createStreamDefinition() {
+ try {
+ // Create stream definition
+ StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+ streamDefinition.setNickName("Member Information");
+ streamDefinition.setDescription("Member Information");
+ List<Attribute> payloadData = new ArrayList<Attribute>();
+
+ // Set payload definition
+ payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.SCALING_DECISION_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.ALLOCATED_IP_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.CPU_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, AttributeType.INT));
+ payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, AttributeType.BOOL));
+ streamDefinition.setPayloadData(payloadData);
+ return streamDefinition;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create stream definition", e);
+ }
+ }
+
+ /**
+ * Publishing member info to DAS.
+ *
+ * @param memberId Member Id
+ * @param scalingDecisionId Scaling Decision Id
+ * @param metadata InstanceMetadata
+ */
+ @Override
+ public void publish(final String memberId, final String scalingDecisionId, final InstanceMetadata metadata) {
+
+ Runnable publisher = new Runnable() {
+ @Override
+ public void run() {
+
+ if (metadata == null) {
+ return;
+ } else {
+ MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
+ String cartridgeType = memberContext.getCartridgeType();
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
+ IaasProvider iaasProvider = CloudControllerContext.getInstance().getIaasProviderOfPartition(
+ cartridge.getType(), memberContext.getPartition().getId());
+ String instanceType = iaasProvider.getProperty(CloudControllerConstants.INSTANCE_TYPE);
+
+ //adding payload data
+ List<Object> payload = new ArrayList<Object>();
+ payload.add(memberId);
+ payload.add(handleNull(instanceType));
+ payload.add(scalingDecisionId);
+ payload.add(String.valueOf(cartridge.isMultiTenant()));
+ payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs())));
+ payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs())));
+ payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs())));
+ payload.add(handleNull(metadata.getHostname()));
+ payload.add(handleNull(metadata.getHypervisor()));
+ payload.add(handleNull(metadata.getCpu()));
+ payload.add(handleNull(metadata.getRam()));
+ payload.add(handleNull(metadata.getImageId()));
+ payload.add(metadata.getLoginPort());
+ payload.add(handleNull(metadata.getOperatingSystemName()));
+ payload.add(handleNull(metadata.getOperatingSystemVersion()));
+ payload.add(handleNull(metadata.getOperatingSystemArchitecture()));
+ payload.add(Boolean.valueOf(metadata.isOperatingSystem64bit()));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Publishing member information: [member_id] %s [instance_type] %s " +
+ "[scaling_decison_id] %s [is_multi_tenant] %s [private_IPs] %s " +
+ "[public_IPs] %s [allocated_IPs] %s [host_name] %s [hypervisor] %s [cpu] %s " +
+ "[ram] %s [image_id] %s [login_port] %d [os_name] %s " +
+ "[os_version] %s [os_arch] %s [is_os_64bit] %b",
+ memberId, instanceType, scalingDecisionId, String.valueOf(cartridge.isMultiTenant()),
+ memberContext.getPrivateIPs(), memberContext.getPublicIPs(),
+ memberContext.getAllocatedIPs(), metadata.getHostname(), metadata.getHypervisor(),
+ metadata.getCpu(), metadata.getRam(), metadata.getImageId(), metadata.getLoginPort(),
+ metadata.getOperatingSystemName(), metadata.getOperatingSystemVersion(),
+ metadata.getOperatingSystemArchitecture(), metadata.isOperatingSystem64bit()));
+ }
+ DASMemberInformationPublisher.super.publish(payload.toArray());
+ }
+ }
+ };
+ executorService.execute(publisher);
+ }
+
+ public static String handleNull(String param) {
+ if (null != param) {
+ return param;
+ }
+ return NULL_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/b38e27c9/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
new file mode 100644
index 0000000..877256d
--- /dev/null
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/DASMemberStatusPublisher.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Publishing member status to DAS.
+ */
+public class DASMemberStatusPublisher extends ThriftStatisticsPublisher implements MemberStatusPublisher {
+
+ private static final Log log = LogFactory.getLog(DASMemberStatusPublisher.class);
+ private static final String DATA_STREAM_NAME = "member_lifecycle";
+ private static final String VERSION = "1.0.0";
+ private static final String DAS_THRIFT_CLIENT_NAME = "das";
+ private ExecutorService executorService;
+
+ public DASMemberStatusPublisher() {
+ super(createStreamDefinition(), DAS_THRIFT_CLIENT_NAME);
+ executorService = StratosThreadPool.getExecutorService("cloud.controller.stats.publisher.thread.pool", 10);
+ }
+
+ private static StreamDefinition createStreamDefinition() {
+ try {
+ // Create stream definition
+ StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+ streamDefinition.setNickName("Member Lifecycle");
+ streamDefinition.setDescription("Member Lifecycle");
+ List<Attribute> payloadData = new ArrayList<Attribute>();
+
+ // Set payload definition
+ payloadData.add(new Attribute(CloudControllerConstants.TIMESTAMP_COL, AttributeType.LONG));
+ payloadData.add(new Attribute(CloudControllerConstants.APPLICATION_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ALIAS_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.SERVICE_NAME_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.NETWORK_PARTITION_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.MEMBER_STATUS_COL, AttributeType.STRING));
+ streamDefinition.setPayloadData(payloadData);
+ return streamDefinition;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create stream definition", e);
+ }
+ }
+
+ /**
+ * publishing Member Status to DAS.
+ *
+ * @param timestamp Status changed time
+ * @param applicationId Application Id
+ * @param clusterId Cluster Id
+ * @param clusterAlias Cluster Alias
+ * @param clusterInstanceId Cluster Instance Id
+ * @param networkPartitionId Network Partition Id
+ * @param partitionId Partition Id
+ * @param serviceName Service Name
+ * @param memberId Member Id
+ * @param status Member Status
+ * @parm tenantId Tenant Id
+ */
+ @Override
+ public void publish(final Long timestamp, final String applicationId, final String clusterId,
+ final String clusterAlias, final String clusterInstanceId,
+ final String serviceName, final String networkPartitionId, final String partitionId,
+ final String memberId, final String status) {
+
+ Runnable publisher = new Runnable() {
+ @Override
+ public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Publishing member status: [timestamp] %d application_id] %s " +
+ "[cluster_id] %s [cluster_alias] %s [cluster_instance_id] %s [service_name] %s " +
+ "[network_partition_id] %s [partition_id] %s " +
+ "[member_id] %s [member_status] %s ",
+ timestamp, applicationId, clusterId, clusterAlias, clusterInstanceId, serviceName,
+ networkPartitionId, partitionId, memberId, status));
+ }
+ //adding payload data
+ List<Object> payload = new ArrayList<Object>();
+ payload.add(timestamp);
+ payload.add(applicationId);
+ payload.add(clusterId);
+ payload.add(clusterAlias);
+ payload.add(clusterInstanceId);
+ payload.add(serviceName);
+ payload.add(networkPartitionId);
+ payload.add(partitionId);
+ payload.add(memberId);
+ payload.add(status);
+ DASMemberStatusPublisher.super.publish(payload.toArray());
+ }
+ };
+ executorService.execute(publisher);
+ }
+
+}