You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/08/20 08:44:45 UTC
[1/5] stratos git commit: Adding Metering and Monitoring Service
Implementation
Repository: stratos
Updated Branches:
refs/heads/tenant-isolation 4fdd431f0 -> 7575faa30
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/sparkscript/CCEvent
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/sparkscript/CCEvent b/extensions/das/artifacts/sparkscript/CCEvent
new file mode 100644
index 0000000..b8bb6fd
--- /dev/null
+++ b/extensions/das/artifacts/sparkscript/CCEvent
@@ -0,0 +1,18 @@
+CREATE TEMPORARY TABLE memberstatus
+USING CarbonAnalytics
+OPTIONS (tableName "ORG_APACHE_STRATOS_CLOUD_CONTROLLER");
+
+CREATE TEMPORARY TABLE memberstatusnew
+USING CarbonAnalytics
+OPTIONS (tableName "CLUSTER_MEMBER_NEW",
+ schema "startTime String, endTime String, clusterId STRING, activatedInstanceCount INT, terminatedInstanceCount INT, activeInstanceCount INT");
+
+;WITH InstanceCount as
+(select clusterId, count(case when status='Active' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as activatedInstanceCount, count(case when status='Terminated' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as terminatedInstanceCount, (sum(case when status='Active' then 1 else 0 end) - sum(case when status='Terminated' then 1 else 0 end))as activeInstanceCount from memberstatus group by clusterId)
+INSERT INTO table memberstatusnew select time(current_time(null)-60000),time(current_time(null)),clusterId, activatedInstanceCount, terminatedInstanceCount,activeInstanceCount from InstanceCount;
+
+CREATE TEMPORARY TABLE membersnew
+USING CarbonAnalytics
+OPTIONS (tableName "MEMBER_NEW",schema "clusterId STRING, clusterInstanceId STRING, networkId STRING, partitionId STRING, cartridgeType STRING, instanceType STRING, memberId STRING, scalingTime LONG,scalingReason STRING, timeStamp STRING");
+
+INSERT INTO TABLE membersnew select clusterId,clusterInstanceId,networkId,partitionId,cartridgeType,instanceType, memberId,scalingTime,scalingReason,time(timeStamp)as timeStamp FROM memberstatus where status='Created';
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/das/pom.xml b/extensions/das/pom.xml
new file mode 100644
index 0000000..d21d1be
--- /dev/null
+++ b/extensions/das/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>stratos-extensions</artifactId>
+ <groupId>org.apache.stratos</groupId>
+ <version>4.1.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>strats-das-extension</artifactId>
+ <packaging>pom</packaging>
+ <name>Apache Stratos - DAS Extension</name>
+ <description>Apache Stratos extensions for DAS.</description>
+ <modules>
+ <module>spark-udf</module>
+ </modules>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/spark-udf/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/das/spark-udf/pom.xml b/extensions/das/spark-udf/pom.xml
new file mode 100644
index 0000000..ced0f0a
--- /dev/null
+++ b/extensions/das/spark-udf/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>strats-das-extension</artifactId>
+ <groupId>org.apache.stratos</groupId>
+ <version>4.1.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>apache-stratos-spark-udf</artifactId>
+ <name>Apache Stratos - Spark UDF</name>
+ <description>Apache Stratos Spark UDF for DAS</description>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java
----------------------------------------------------------------------
diff --git a/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java
new file mode 100644
index 0000000..0b8f408
--- /dev/null
+++ b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.das.extension.spark.udf;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Implementing UDF for implementing spark sql query related to time.
+ */
+public class TimeUDF {
+ /**
+ * Convert time(ms) to DateFormat
+ *
+ * @param timeStamp time in ms
+ * @return date as String
+ */
+ public String time(Long timeStamp) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ Date date = new Date(timeStamp.longValue());
+ return sdf.format(date);
+ }
+
+ /**
+ * Get the current time in ms
+ *
+ * @param param
+ * @return
+ */
+ public long current_time(Integer param) {
+ return System.currentTimeMillis();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/pom.xml b/extensions/pom.xml
index ffbfa8e..1cc9038 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -17,7 +17,8 @@
~ specific language governing permissions and limitations
~ under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache.stratos</groupId>
@@ -36,6 +37,7 @@
<module>cep/stratos-cep-extension</module>
<module>cep/distribution/</module>
<module>load-balancer</module>
+ <module>das</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
index 56e9164..a8102da 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl
@@ -84,7 +84,9 @@ dialect "mvel"
log.info("[dependency-scale] [scale-up] Partition available, hence trying to spawn an instance to scale up!" );
log.debug("[dependency-scale] [scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId );
- delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary);
+ long scalingTime = System.currentTimeMillis();
+ String scalingReason = "Dependency scaling";
+ delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime);
count++;
} else {
partitionsAvailable = false;
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
index 96b60da..4eaab2b 100755
--- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl
@@ -84,7 +84,10 @@ dialect "mvel"
log.info("[min-check] Partition available, hence trying to spawn an instance to fulfil minimum count!" + " [cluster] " + clusterId);
log.debug("[min-check] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId);
- delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary);
+ long scalingTime = System.currentTimeMillis();
+ String scalingReason = "Scaling up to fulfil minimum count, [Cluster Min Members] "+clusterInstanceContext.getMinInstanceCount()+" [Additional instances to be created] " + additionalInstances;
+ delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime);
+
count++;
} else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
index e6f8f67..3b4a916 100644
--- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl
@@ -164,6 +164,10 @@ dialect "mvel"
boolean partitionsAvailable = true;
int count = 0;
+ String autoscalingReason = (numberOfRequiredInstances == numberOfInstancesReuquiredBasedOnRif)?"Scaling up due to RIF, [Predicted Value] "+rifPredictedValue+" [Threshold] "+rifThreshold:(numberOfRequiredInstances== numberOfInstancesReuquiredBasedOnMemoryConsumption)?"Scaling up due to MC, [Predicted Value] "+mcPredictedValue+" [Threshold] "+mcThreshold:"Scaling up due to LA, [Predicted Value] "+laPredictedValue+" [Threshold] "+laThreshold;
+ autoscalingReason += " [Number of required instances] "+numberOfRequiredInstances+" [Cluster Max Members] "+clusterMaxMembers+" [Additional instances to be created] " + additionalInstances;
+
+
while(count != additionalInstances && partitionsAvailable){
ClusterLevelPartitionContext partitionContext = (ClusterLevelPartitionContext) partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray());
@@ -182,7 +186,8 @@ dialect "mvel"
" [laPredictedValue] " + laPredictedValue + " [laThreshold] " + laThreshold);
log.debug("[scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId );
- delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary);
+ long scalingTime = System.currentTimeMillis();
+ delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,autoscalingReason,scalingTime);
count++;
} else {
[4/5] stratos git commit: Adding Metering and Monitoring Service
Implementation
Posted by ga...@apache.org.
Adding Metering and Monitoring Service Implementation
Conflicts:
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/54283eda
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/54283eda
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/54283eda
Branch: refs/heads/tenant-isolation
Commit: 54283eda982c5677402b1e4ac895d95c9cccebe7
Parents: 4fdd431
Author: Thanuja <th...@wso2.com>
Authored: Wed Jul 29 18:51:34 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Thu Aug 20 10:58:32 2015 +0530
----------------------------------------------------------------------
.../client/AutoscalerCloudControllerClient.java | 16 +-
.../autoscaler/rule/RuleTasksDelegator.java | 48 +-
.../publisher/HealthStatisticsNotifier.java | 10 +-
.../messaging/topology/TopologyBuilder.java | 1434 +++++++++---------
.../impl/CloudControllerServiceImpl.java | 6 +-
.../impl/CloudControllerServiceUtil.java | 15 +-
.../services/impl/InstanceCreator.java | 17 +-
.../publisher/BAMUsageDataPublisher.java | 44 +-
.../util/CloudControllerConstants.java | 4 +
.../common/constants/StratosConstants.java | 8 +-
.../publisher/HealthStatisticsPublisher.java | 3 +-
.../publisher/InFlightRequestPublisher.java | 4 +-
.../cep/WSO2CEPHealthStatisticsPublisher.java | 9 +-
.../cep/WSO2CEPInFlightRequestPublisher.java | 6 +-
.../LoadBalancerStatisticsNotifier.java | 3 +-
.../publisher/MockHealthStatisticsNotifier.java | 3 +
.../modules/healthstatspublisher/healthstats.py | 5 +-
.../HealthStatsEventFormatter.xml | 30 +
.../eventformatters/RIFEventFormatter.xml | 31 +
.../DASDefaultWSO2EventOutputAdaptor.xml | 29 +
.../streamdefinitions/stream-manager-config.xml | 486 +++---
extensions/das/README.md | 10 +
.../CloudControllerEventReceiver.xml | 29 +
.../eventreceivers/HealthStatsEventReceiver.xml | 29 +
.../eventreceivers/RIFEventReceiver.xml | 29 +
.../eventsink/cartridge_agent_health_stats.xml | 85 ++
.../artifacts/eventsink/in_flight_requests.xml | 64 +
.../org_apache_stratos_cloud_controller.xml | 211 +++
.../cartridge_agent_health_stats_1.0.0.json | 40 +
.../eventstreams/in_flight_requests_1.0.0.json | 28 +
...g.apache.stratos.cloud.controller_1.0.0.json | 112 ++
extensions/das/artifacts/sparkscript/CCEvent | 18 +
extensions/das/pom.xml | 40 +
extensions/das/spark-udf/pom.xml | 36 +
.../das/extension/spark/udf/TimeUDF.java | 49 +
extensions/pom.xml | 4 +-
.../src/main/conf/drools/dependent-scaling.drl | 4 +-
.../src/main/conf/drools/mincheck.drl | 5 +-
.../src/main/conf/drools/scaling.drl | 7 +-
39 files changed, 2007 insertions(+), 1004 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index 65e952f..f19531b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -84,7 +84,8 @@ public class AutoscalerCloudControllerClient {
public synchronized MemberContext startInstance(PartitionRef partition,
String clusterId, String clusterInstanceId,
String networkPartitionId, boolean isPrimary,
- int minMemberCount) throws SpawningException {
+ int minMemberCount, String autoscalingReason,
+ long scalingTime) throws SpawningException {
try {
if (log.isInfoEnabled()) {
log.info(String.format("Trying to spawn an instance via cloud controller: " +
@@ -115,8 +116,18 @@ public class AutoscalerCloudControllerClient {
minCountProp.setName(StratosConstants.MIN_COUNT);
minCountProp.setValue(String.valueOf(minMemberCount));
+ Property autoscalingReasonProp = new Property();
+ autoscalingReasonProp.setName(StratosConstants.SCALING_REASON);
+ autoscalingReasonProp.setValue(autoscalingReason);
+
+ Property scalingTimeProp = new Property();
+ scalingTimeProp.setName(StratosConstants.SCALING_TIME);
+ scalingTimeProp.setValue(String.valueOf(scalingTime));
+
memberContextProps.addProperty(isPrimaryProp);
memberContextProps.addProperty(minCountProp);
+ memberContextProps.addProperty(autoscalingReasonProp);
+ memberContextProps.addProperty(scalingTimeProp);
instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
long startTime = System.currentTimeMillis();
@@ -228,7 +239,8 @@ public class AutoscalerCloudControllerClient {
public void terminateAllInstances(String clusterId) throws RemoteException,
CloudControllerServiceInvalidClusterExceptionException {
if (log.isInfoEnabled()) {
- log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId));
+ log.info(String.format("Terminating all instances of cluster via cloud controller: " +
+ "[cluster] %s", clusterId));
}
long startTime = System.currentTimeMillis();
stub.terminateInstances(clusterId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index c2d2be6..5c335b1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -49,7 +49,8 @@ public class RuleTasksDelegator {
private static final Log log = LogFactory.getLog(RuleTasksDelegator.class);
- public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) {
+ public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative,
+ int timeInterval) {
double predictedValue;
// s = u * t + 0.5 * a * t * t
if (log.isDebugEnabled()) {
@@ -176,19 +177,21 @@ public class RuleTasksDelegator {
* @param clusterId Cluster id
* @param clusterInstanceId Instance id
* @param isPrimary Is a primary member
+ * @param autoscalingReason scaling reason for member
+ * @param scalingTime scaling time
*/
public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId,
- String clusterInstanceId, boolean isPrimary) {
+ String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) {
try {
String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId();
- String nwPartitionUuid=null;
- NetworkPartition[] networkPartitionList= CloudControllerServiceClient.getInstance().getNetworkPartitions();
- for(int i=0;i<networkPartitionList.length;i++){
- if(networkPartitionList[i].getUuid().equals(nwPartitionId)){
- nwPartitionUuid=networkPartitionList[i].getUuid();
- }
- }
+ String nwPartitionUuid = null;
+ NetworkPartition[] networkPartitionList = CloudControllerServiceClient.getInstance().getNetworkPartitions();
+ for (int i = 0; i < networkPartitionList.length; i++) {
+ if (networkPartitionList[i].getUuid().equals(nwPartitionId)) {
+ nwPartitionUuid = networkPartitionList[i].getUuid();
+ }
+ }
// Calculate accumulation of minimum counts of all the partition of current network partition
int minimumCountOfNetworkPartition;
@@ -204,18 +207,18 @@ public class RuleTasksDelegator {
MemberContext memberContext =
AutoscalerCloudControllerClient.getInstance()
.startInstance(clusterMonitorPartitionContext.getPartition(),
- clusterId,
- clusterInstanceId,
- nwPartitionUuid,
- isPrimary,
- minimumCountOfNetworkPartition);
+ clusterId,
+ clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(),
+ isPrimary,
+ minimumCountOfNetworkPartition, autoscalingReason, scalingTime);
if (memberContext != null) {
ClusterLevelPartitionContext partitionContext = clusterInstanceContext.
getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());
partitionContext.addPendingMember(memberContext);
partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId()));
if (log.isDebugEnabled()) {
- log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(),
+ log.debug(String.format("Pending member added, [member] %s [partition] %s",
+ memberContext.getMemberId(),
memberContext.getPartition().getId()));
}
@@ -254,13 +257,14 @@ public class RuleTasksDelegator {
clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
}
- public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) {
+ public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId,
+ String instanceId) {
if (log.isDebugEnabled()) {
log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId);
}
//Notify parent for checking scaling dependencies
ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
+ clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId);
}
public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
@@ -277,8 +281,8 @@ public class RuleTasksDelegator {
clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
} else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) {
- log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " +
- "[partition] %s [network partition] %s", memberId,
+ log.info(String.format("[scale-down] Moving pending member to termination pending list " +
+ "[member id] %s " + "[partition] %s [network partition] %s", memberId,
clusterMonitorPartitionContext.getPartitionId(),
clusterMonitorPartitionContext.getNetworkPartitionId()));
clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId);
@@ -289,7 +293,8 @@ public class RuleTasksDelegator {
}
}
- public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
+ public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext,
+ String memberId) {
try {
//calling SM to send the instance notification event.
if (log.isDebugEnabled()) {
@@ -374,7 +379,8 @@ public class RuleTasksDelegator {
float memberMemoryConsumptionAverage = memberStatsContext.getMemoryConsumption().getAverage();
float memberMemoryConsumptionGredient = memberStatsContext.getMemoryConsumption().getGradient();
- float memberMemoryConsumptionSecondDerivative = memberStatsContext.getMemoryConsumption().getSecondDerivative();
+ float memberMemoryConsumptionSecondDerivative =
+ memberStatsContext.getMemoryConsumption().getSecondDerivative();
double memberPredictedMemoryConsumption = getPredictedValueForNextMinute(memberMemoryConsumptionAverage,
memberMemoryConsumptionGredient, memberMemoryConsumptionSecondDerivative, 1);
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
index 74c5156..5ab2ebf 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
@@ -51,7 +51,8 @@ public class HealthStatisticsNotifier implements Runnable {
File pluginFile = new File(pluginFileName);
if ((pluginFile != null)
&& (pluginFile.exists())) {
- List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, IHealthStatisticsReader.class);
+ List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile,
+ IHealthStatisticsReader.class);
if (!pluginClass.isEmpty()) {
try {
log.trace("Instantiating new instance of plugin type " + pluginClass);
@@ -63,7 +64,8 @@ public class HealthStatisticsNotifier implements Runnable {
}
}
} else {
- log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : "Doesn't exist"));
+ log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" :
+ "Doesn't exist"));
}
}
if (this.statsReader == null) {
@@ -95,7 +97,7 @@ public class HealthStatisticsNotifier implements Runnable {
if (log.isDebugEnabled()) {
log.debug(String.format("Publishing memory consumption: %f", stats.getMemoryUsage()));
}
- statsPublisher.publish(
+ statsPublisher.publish(System.currentTimeMillis(),
CartridgeAgentConfiguration.getInstance().getClusterId(),
CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
@@ -108,7 +110,7 @@ public class HealthStatisticsNotifier implements Runnable {
if (log.isDebugEnabled()) {
log.debug(String.format("Publishing load average: %f", stats.getProcessorUsage()));
}
- statsPublisher.publish(
+ statsPublisher.publish(System.currentTimeMillis(),
CartridgeAgentConfiguration.getInstance().getClusterId(),
CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
[2/5] stratos git commit: Adding Metering and Monitoring Service
Implementation
Posted by ga...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
index 4015a77..adbe294 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
@@ -65,18 +65,21 @@ public class CloudControllerServiceUtil {
TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(),
memberContext.getClusterId(), memberContext.getNetworkPartitionId(),
partitionId, memberContext.getMemberId());
-
+ //member terminated time
+ Long timeStamp = System.currentTimeMillis();
// Publish statistics to BAM
BAMUsageDataPublisher.publish(memberContext.getMemberId(),
partitionId,
memberContext.getNetworkPartitionId(),
+ memberContext.getClusterInstanceId(),
memberContext.getClusterId(),
memberContext.getCartridgeType(),
MemberStatus.Terminated.toString(),
- null);
+ timeStamp, null, null, null);
// Remove member context
- CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId());
+ CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(),
+ memberContext.getMemberId());
// Persist cloud controller context
CloudControllerContext.getInstance().persist();
@@ -87,7 +90,8 @@ public class CloudControllerServiceUtil {
return isValid;
}
- public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException {
+ public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider)
+ throws InvalidPartitionException {
if (iaasProvider != null) {
// if this is a IaaS based partition
Iaas iaas = iaasProvider.getIaas();
@@ -104,7 +108,8 @@ public class CloudControllerServiceUtil {
}
}
- public static boolean validatePartition(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException {
+ public static boolean validatePartition(Partition partition, IaasProvider iaasProvider)
+ throws InvalidPartitionException {
validatePartitionAndGetIaasProvider(partition, iaasProvider);
return true;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
index 34a7c93..02730cf 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
@@ -27,8 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*;
import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException;
import org.apache.stratos.cloud.controller.iaases.Iaas;
import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
import java.util.concurrent.locks.Lock;
@@ -68,7 +66,8 @@ public class InstanceCreator implements Runnable {
memberContext = startInstance(iaas, memberContext, payload);
if (log.isInfoEnabled()) {
- log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s [instance-id] %s " +
+ log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s " +
+ "[instance-id] %s " +
"[default-private-ip] %s [default-public-ip] %s",
memberContext.getCartridgeType(), memberContext.getClusterId(),
memberContext.getInstanceId(), memberContext.getDefaultPrivateIP(),
@@ -85,15 +84,6 @@ public class InstanceCreator implements Runnable {
// Update topology
TopologyBuilder.handleMemberInitializedEvent(memberContext);
- // Publish instance creation statistics to BAM
- BAMUsageDataPublisher.publish(
- memberContext.getMemberId(),
- memberContext.getPartition().getUuid(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- memberContext.getInstanceMetadata());
} catch (Exception e) {
String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s",
memberContext.getCartridgeType(), memberContext.getClusterId());
@@ -105,7 +95,8 @@ public class InstanceCreator implements Runnable {
}
}
- private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException {
+ private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws
+ CartridgeNotFoundException {
memberContext = iaas.startInstance(memberContext, payload);
// Validate instance id
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
index d5aabbd..690bc59 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
@@ -52,12 +52,31 @@ public class BAMUsageDataPublisher {
private static StreamDefinition streamDefinition;
private static final String cloudControllerEventStreamVersion = "1.0.0";
+ /**
+ * Publish events to BAM
+ *
+ * @param memberId member id
+ * @param partitionId partition id
+ * @param networkId network partition id
+ * @param clusterId cluster id
+ * @param clusterInstanceId cluster instance id
+ * @param serviceName service name
+ * @param status member status
+ * @param timeStamp time
+ * @param autoscalingReason scaling reason related to member
+ * @param scalingTime scaling time
+ * @param metadata meta-data
+ */
public static void publish(String memberId,
String partitionId,
String networkId,
String clusterId,
+ String clusterInstanceId,
String serviceName,
String status,
+ Long timeStamp,
+ String autoscalingReason,
+ Long scalingTime,
InstanceMetadata metadata) {
if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) {
return;
@@ -79,16 +98,23 @@ public class BAMUsageDataPublisher {
MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
String cartridgeType = memberContext.getCartridgeType();
Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType);
+ String instanceType = CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridgeType,
+ partitionId).getProperty(CloudControllerConstants.INSTANCE_TYPE);
//Construct the data to be published
List<Object> payload = new ArrayList<Object>();
// Payload values
+ payload.add(timeStamp);
payload.add(memberId);
payload.add(serviceName);
payload.add(clusterId);
+ payload.add(clusterInstanceId);
payload.add(handleNull(memberContext.getLbClusterId()));
payload.add(handleNull(partitionId));
payload.add(handleNull(networkId));
+ payload.add(handleNull(instanceType));
+ payload.add(handleNull(autoscalingReason));
+ payload.add(handleNull(scalingTime));
if (cartridge != null) {
payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
} else {
@@ -129,12 +155,14 @@ public class BAMUsageDataPublisher {
try {
if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
+ log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(),
+ streamDefinition.getVersion()));
}
dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
} catch (AgentException e) {
if (log.isErrorEnabled()) {
- log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
+ log.error(String.format("Could not publish BAM event: [stream] %s [version] %s",
+ streamDefinition.getName(), streamDefinition.getVersion()), e);
}
}
}
@@ -151,12 +179,17 @@ public class BAMUsageDataPublisher {
streamDefinition.setDescription("Instances booted up by the Cloud Controller");
// Payload definition
List<Attribute> payloadData = new ArrayList<Attribute>();
+ payloadData.add(new Attribute(CloudControllerConstants.TIME_STAMP, AttributeType.LONG));
payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.SCALING_REASON, AttributeType.STRING));
+ payloadData.add(new Attribute(CloudControllerConstants.SCALING_TIME, AttributeType.LONG));
payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING));
payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING));
@@ -210,4 +243,11 @@ public class BAMUsageDataPublisher {
}
return val;
}
+
+ private static Long handleNull(Long val) {
+ if (val == null) {
+ return -1L;
+ }
+ return val;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index 5e6115f..2cb0c31 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -103,6 +103,7 @@ public final class CloudControllerConstants {
public static final String MEMBER_ID_COL = "memberId";
public static final String CARTRIDGE_TYPE_COL = "cartridgeType";
public static final String CLUSTER_ID_COL = "clusterId";
+ public static final String CLUSTER_INSTANCE_ID_COL = "clusterInstanceId";
public static final String PARTITION_ID_COL = "partitionId";
public static final String NETWORK_ID_COL = "networkId";
public static final String ALIAS_COL = "alias";
@@ -122,6 +123,9 @@ public final class CloudControllerConstants {
public static final String PRIV_IP_COL = "privateIPAddresses";
public static final String PUB_IP_COL = "publicIPAddresses";
public static final String ALLOCATE_IP_COL = "allocateIPAddresses";
+ public static final String TIME_STAMP = "timeStamp";
+ public static final String SCALING_REASON = "scalingReason";
+ public static final String SCALING_TIME = "scalingTime";
/**
* Properties
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
index 194bd81..55e97d9 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
@@ -91,7 +91,8 @@ public class StratosConstants {
// metering constants
public static final String THROTTLING_ALL_ACTION = "all_actions";
- public static final String THROTTLING_IN_DATA_ACTION = "in_data_action"; //this covers registry capacity + registry bandwidth
+ public static final String THROTTLING_IN_DATA_ACTION =
+ "in_data_action"; //this covers registry capacity + registry bandwidth
public static final String THROTTLING_OUT_DATA_ACTION = "out_data_action"; //this covers registry bandwidth
public static final String THROTTLING_ADD_USER_ACTION = "add_user_action";
public static final String THROTTLING_SERVICE_IN_BANDWIDTH_ACTION = "service_in_bandwith_action";
@@ -158,6 +159,8 @@ public class StratosConstants {
public static final String MAX_CHECK_DROOL_FILE = "maxcheck.drl";
public static final String OBSOLETE_CHECK_DROOL_FILE = "obsoletecheck.drl";
public static final String MIN_COUNT = "MIN_COUNT";
+ public static final String SCALING_REASON = "SCALING_REASON";
+ public static final String SCALING_TIME = "SCALING_TIME";
// Policy and definition related constants
public static final int PUBLIC_DEFINITION = 0;
@@ -165,7 +168,8 @@ public class StratosConstants {
// member expiry timeout constants
public static final String PENDING_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingMemberExpiryTimeout";
public static final String OBSOLETED_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.obsoletedMemberExpiryTimeout";
- public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout";
+ public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT =
+ "autoscaler.member.pendingTerminationMemberExpiryTimeout";
public static final String FILTER_VALUE_SEPARATOR = ",";
public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter";
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..95b04ff 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -27,6 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
/**
* Publish health statistics to complex event processor.
*
+ * @param timeStamp time
* @param clusterId Cluster id of the member
* @param clusterInstanceId Cluster instance id of the member
* @param networkPartitionId Network partition id of the member
@@ -35,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
* @param health Health type: memory_consumption | load_average
* @param value Health type value
*/
- void publish(String clusterId, String clusterInstanceId, String networkPartitionId,
+ void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
String memberId, String partitionId, String health, double value);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af9c8e9 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -27,10 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
/**
* Publish in-flight request count.
*
+ * @param timeStamp time
* @param clusterId Cluster id
* @param clusterInstanceId Cluster instance id
* @param networkPartitionId Network partition id of the cluster
* @param inFlightRequestCount In-flight request count of the cluster
*/
- void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount);
+ void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ int inFlightRequestCount);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index 1dc4240..d5c9265 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -52,6 +52,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
// Set payload definition
List<Attribute> payloadData = new ArrayList<Attribute>();
+ payloadData.add(new Attribute("time_stamp", AttributeType.LONG));
payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING));
payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
@@ -70,6 +71,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
/**
* Publish health statistics to cep.
*
+ * @param timeStamp
* @param clusterId
* @param clusterInstanceId
* @param networkPartitionId
@@ -79,13 +81,16 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
* @param value
*/
@Override
- public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) {
+ public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ String memberId, String partitionId, String health, double value) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f",
+ log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " +
+ "[partition] %s [member] %s [health] %s [value] %f",
clusterId, networkPartitionId, partitionId, memberId, health, value));
}
// Set payload values
List<Object> payload = new ArrayList<Object>();
+ payload.add(timeStamp);
payload.add(clusterId);
payload.add(clusterInstanceId);
payload.add(networkPartitionId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 2ed8883..f51eb91 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -51,6 +51,7 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
List<Attribute> payloadData = new ArrayList<Attribute>();
// Set payload definition
+ payloadData.add(new Attribute("time_stamp", AttributeType.LONG));
payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING));
payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
@@ -65,15 +66,18 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
/**
* Publish in-flight request count of a cluster.
*
+ * @param timeStamp
* @param clusterId
* @param clusterInstanceId
* @param networkPartitionId
* @param inFlightRequestCount
*/
@Override
- public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount) {
+ public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ int inFlightRequestCount) {
// Set payload values
List<Object> payload = new ArrayList<Object>();
+ payload.add(timeStamp);
payload.add(clusterId);
payload.add(clusterInstanceId);
payload.add(networkPartitionId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index dc2233d..1dd12c7 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -81,7 +81,8 @@ public class LoadBalancerStatisticsNotifier implements Runnable {
for (Cluster cluster : service.getClusters()) {
// Publish in-flight request count of load balancer's network partition
int requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
- inFlightRequestPublisher.publish(cluster.getClusterId(), clusterInstanceId,
+ inFlightRequestPublisher.publish(System.currentTimeMillis(), cluster.getClusterId(),
+ clusterInstanceId,
networkPartitionId, requestCount);
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
index c2d1c6c..0dc5e67 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
@@ -69,6 +69,7 @@ public class MockHealthStatisticsNotifier implements Runnable {
mockMemberContext.getMemberId(), memoryConsumption));
}
healthStatisticsPublisher.publish(
+ System.currentTimeMillis(),
mockMemberContext.getClusterId(),
mockMemberContext.getClusterInstanceId(),
mockMemberContext.getNetworkPartitionId(),
@@ -93,6 +94,7 @@ public class MockHealthStatisticsNotifier implements Runnable {
mockMemberContext.getMemberId(), loadAvereage));
}
healthStatisticsPublisher.publish(
+ System.currentTimeMillis(),
mockMemberContext.getClusterId(),
mockMemberContext.getClusterInstanceId(),
mockMemberContext.getNetworkPartitionId(),
@@ -116,6 +118,7 @@ public class MockHealthStatisticsNotifier implements Runnable {
mockMemberContext.getMemberId(), requestsInFlight));
}
inFlightRequestPublisher.publish(
+ System.currentTimeMillis(),
mockMemberContext.getClusterId(),
mockMemberContext.getClusterInstanceId(),
mockMemberContext.getNetworkPartitionId(),
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
index 9753c3e..aae9e9d 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
@@ -17,7 +17,7 @@
from threading import Thread
import multiprocessing
-
+import time
import psutil
from abstracthealthstatisticspublisher import *
@@ -124,6 +124,7 @@ class HealthStatisticsPublisher:
stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
# stream_def.add_payloaddata_attribute()
+ stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG)
stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING)
stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
@@ -141,6 +142,7 @@ class HealthStatisticsPublisher:
"""
event = ThriftEvent()
+ event.payloadData.append(int(round(time.time() * 1000)))
event.payloadData.append(self.cartridge_agent_config.cluster_id)
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
@@ -159,6 +161,7 @@ class HealthStatisticsPublisher:
"""
event = ThriftEvent()
+ event.payloadData.append(int(round(time.time() * 1000)))
event.payloadData.append(self.cartridge_agent_config.cluster_id)
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
new file mode 100644
index 0000000..bcef15f
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventFormatter name="HealthStatsEventFormatter"
+ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="cartridge_agent_health_stats" version="1.0.0"/>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" eventAdaptorType="wso2event">
+ <property name="stream">cartridge_agent_health_stats</property>
+ <property name="version">1.0.0</property>
+ </to>
+</eventFormatter>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
new file mode 100644
index 0000000..3cfd4a9
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventFormatter name="RIFEventFormatter" statistics="disable"
+ trace="enable" xmlns="http://wso2.org/carbon/eventformatter">
+ <from streamName="in_flight_requests" version="1.0.0"/>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" eventAdaptorType="wso2event">
+ <property name="stream">in_flight_requests</property>
+ <property name="version">1.0.0</property>
+ </to>
+</eventFormatter>
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
new file mode 100755
index 0000000..5cec300
--- /dev/null
+++ b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<outputEventAdaptor name="DASDefaultWSO2EventOutputAdaptor"
+ statistics="disable" trace="disable" type="wso2event"
+ xmlns="http://wso2.org/carbon/eventadaptormanager">
+ <property name="username">admin</property>
+ <property name="receiverURL">tcp://localhost:7612</property>
+ <property name="password">admin</property>
+ <property name="authenticatorURL">ssl://localhost:7712</property>
+</outputEventAdaptor>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
index 4c4c7e0..a256770 100644
--- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
+++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
@@ -23,287 +23,289 @@
<streamManagerConfiguration xmlns="http://wso2.org/carbon/streammanager">
<!-- in-flight requests stream definitions start -->
<streamDefinition name="in_flight_requests" version="1.0.0">
- <description>in-flight request count</description>
- <nickName>in-flight requests</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="in_flight_request_count" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>in-flight request count</description>
+ <nickName>in-flight requests</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="time_stamp" type="long"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="in_flight_request_count" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="gradient_in_flight_requests" version="1.0.0">
- <description>gradient of in flight request count</description>
- <nickName>gradient in flight requests</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="count" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>gradient of in flight request count</description>
+ <nickName>gradient in flight requests</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="count" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="average_in_flight_requests" version="1.0.0">
- <description>average of in-flight request count</description>
- <nickName>average in-flight requests</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="count" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>average of in-flight request count</description>
+ <nickName>average in-flight requests</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="count" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="second_derivative_in_flight_requests" version="1.0.0">
- <description>second derivative of in-flight request count</description>
- <nickName>second derivative in-flight requests</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="count" type="double"/>
- </payloadData>
+ <description>second derivative of in-flight request count</description>
+ <nickName>second derivative in-flight requests</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="count" type="double"/>
+ </payloadData>
</streamDefinition>
<!-- in-flight requests stream definitions end -->
<!-- cartridge agent health stats stream definitions start -->
<streamDefinition name="cartridge_agent_health_stats" version="1.0.0">
- <description>agent health stats</description>
- <nickName>agent health stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_id" type="String" />
- <property name="partition_id" type="String" />
- <property name="health_description" type="String"/>
- <property name="value" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>agent health stats</description>
+ <nickName>agent health stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="time_stamp" type="long"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_id" type="String"/>
+ <property name="partition_id" type="String"/>
+ <property name="health_description" type="String"/>
+ <property name="value" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="average_load_average_stats" version="1.0.0">
- <description>average load average stats</description>
- <nickName>average load average stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="average_load_average" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>average load average stats</description>
+ <nickName>average load average stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="average_load_average" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="average_memory_consumption_stats" version="1.0.0">
- <description>average memory consumption stats</description>
- <nickName>average memory consumption stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="average_memory_consumption" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>average memory consumption stats</description>
+ <nickName>average memory consumption stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="average_memory_consumption" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="gradient_load_average_stats" version="1.0.0">
- <description>gradient load average stats</description>
- <nickName>gradient load average stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="gradient_load_average" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>gradient load average stats</description>
+ <nickName>gradient load average stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="gradient_load_average" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="gradient_memory_consumption_stats" version="1.0.0">
- <description>gradient memoryconsumption stats</description>
- <nickName>gradient memoryconsumption stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="gradient_memory_consumption" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>gradient memoryconsumption stats</description>
+ <nickName>gradient memoryconsumption stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="gradient_memory_consumption" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="second_derivative_memory_consumption_stats" version="1.0.0">
- <description>second derivative memory consumption stats</description>
- <nickName>second derivative memory consumption stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="second_derivative_memory_consumption" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>second derivative memory consumption stats</description>
+ <nickName>second derivative memory consumption stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="second_derivative_memory_consumption" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="second_derivative_load_average_stats" version="1.0.0">
- <description>second derivative load average stats</description>
- <nickName>second derivative load average stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="second_derivative_load_average" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>second derivative load average stats</description>
+ <nickName>second derivative load average stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="second_derivative_load_average" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="fault_message" version="1.0.0">
- <description>fault message</description>
- <nickName>fault message</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_id" type="String"/>
- <property name="partition_id" type="String"/>
- </payloadData>
+ <description>fault message</description>
+ <nickName>fault message</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_id" type="String"/>
+ <property name="partition_id" type="String"/>
+ </payloadData>
</streamDefinition>
<!-- cartridge agent health stats stream definitions end -->
<!-- This is for member_id wise grouping-->
<streamDefinition name="member_average_load_average_stats" version="1.0.0">
- <description>average load average stats</description>
- <nickName>average load average stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="member_id" type="String" />
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_average_load_average" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>average load average stats</description>
+ <nickName>average load average stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="member_id" type="String"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_average_load_average" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="member_average_memory_consumption_stats" version="1.0.0">
- <description>average memory consumption stats</description>
- <nickName>average memory consumption stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="member_id" type="String"/>
- <property name="cluster_id" type="String"/>
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_average_memory_consumption" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>average memory consumption stats</description>
+ <nickName>average memory consumption stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="member_id" type="String"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_average_memory_consumption" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="member_gradient_load_average_stats" version="1.0.0">
- <description>gradient load average stats</description>
- <nickName>gradient load average stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="member_id" type="String" />
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_gradient_load_average" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>gradient load average stats</description>
+ <nickName>gradient load average stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="member_id" type="String"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_gradient_load_average" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="member_gradient_memory_consumption_stats" version="1.0.0">
- <description>gradient memoryconsumption stats</description>
- <nickName>gradient memoryconsumption stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="member_id" type="String" />
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_gradient_memory_consumption" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>gradient memoryconsumption stats</description>
+ <nickName>gradient memoryconsumption stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="member_id" type="String"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_gradient_memory_consumption" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="member_second_derivative_memory_consumption_stats" version="1.0.0">
- <description>second derivative memory consumption stats</description>
- <nickName>second derivative memory consumption stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="member_id" type="String" />
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_second_derivative_memory_consumption" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>second derivative memory consumption stats</description>
+ <nickName>second derivative memory consumption stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="member_id" type="String"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_second_derivative_memory_consumption" type="double"/>
+ </payloadData>
+ </streamDefinition>
<streamDefinition name="member_second_derivative_load_average_stats" version="1.0.0">
- <description>second derivative load average stats</description>
- <nickName>second derivative load average stats</nickName>
- <metaData>
- </metaData>
- <correlationData>
- </correlationData>
- <payloadData>
- <property name="member_id" type="String" />
- <property name="cluster_id" type="String" />
- <property name="cluster_instance_id" type="String"/>
- <property name="network_partition_id" type="String"/>
- <property name="member_second_derivative_load_average" type="double"/>
- </payloadData>
- </streamDefinition>
+ <description>second derivative load average stats</description>
+ <nickName>second derivative load average stats</nickName>
+ <metaData>
+ </metaData>
+ <correlationData>
+ </correlationData>
+ <payloadData>
+ <property name="member_id" type="String"/>
+ <property name="cluster_id" type="String"/>
+ <property name="cluster_instance_id" type="String"/>
+ <property name="network_partition_id" type="String"/>
+ <property name="member_second_derivative_load_average" type="double"/>
+ </payloadData>
+ </streamDefinition>
</streamManagerConfiguration>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/README.md
----------------------------------------------------------------------
diff --git a/extensions/das/README.md b/extensions/das/README.md
new file mode 100644
index 0000000..00be297
--- /dev/null
+++ b/extensions/das/README.md
@@ -0,0 +1,10 @@
+# Apache Stratos DAS Extensions
+
+Apache Stratos Data Analytics Server (DAS) extensions include DAS artifacts and spark udf to run spark script.
+These extensions need to be deployed manually when running DAS externally.
+
+Please refer below link for more information on WSO2 DAS.
+https://docs.wso2.com/display/DAS300/WSO2+Data+Analytics+Server+Documentation
+
+Thank you for using Apache Stratos!
+The Stratos Team
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml b/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml
new file mode 100644
index 0000000..05789db
--- /dev/null
+++ b/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventReceiver name="CloudControllerEventReceiver" statistics="disable"
+ trace="enable" xmlns="http://wso2.org/carbon/eventreceiver">
+ <from eventAdapterType="wso2event">
+ <property name="events.duplicated.in.cluster">false</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="org.apache.stratos.cloud.controller" version="1.0.0"/>
+</eventReceiver>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml b/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml
new file mode 100644
index 0000000..7e0a5ce
--- /dev/null
+++ b/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventReceiver name="HealthStatsEventReceiver" statistics="disable"
+ trace="enable" xmlns="http://wso2.org/carbon/eventreceiver">
+ <from eventAdapterType="wso2event">
+ <property name="events.duplicated.in.cluster">false</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="cartridge_agent_health_stats" version="1.0.0"/>
+</eventReceiver>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml b/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml
new file mode 100644
index 0000000..b11c016
--- /dev/null
+++ b/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventReceiver name="RIFEventReceiver" statistics="disable"
+ trace="enable" xmlns="http://wso2.org/carbon/eventreceiver">
+ <from eventAdapterType="wso2event">
+ <property name="events.duplicated.in.cluster">false</property>
+ </from>
+ <mapping customMapping="disable" type="wso2event"/>
+ <to streamName="in_flight_requests" version="1.0.0"/>
+</eventReceiver>
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml b/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml
new file mode 100644
index 0000000..b870bc2
--- /dev/null
+++ b/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<EventStoreConfiguration>
+ <TableSchema>
+ <ColumnDefinition>
+ <Name>time_stamp</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>LONG</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>cluster_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>cluster_instance_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>network_partition_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>member_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>partition_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>health_description</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>value</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>DOUBLE</Type>
+ </ColumnDefinition>
+ </TableSchema>
+ <Source>
+ <StreamId>cartridge_agent_health_stats:1.0.0</StreamId>
+ </Source>
+ <RecordStoreName>EVENT_STORE</RecordStoreName>
+</EventStoreConfiguration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventsink/in_flight_requests.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventsink/in_flight_requests.xml b/extensions/das/artifacts/eventsink/in_flight_requests.xml
new file mode 100644
index 0000000..d4ca48b
--- /dev/null
+++ b/extensions/das/artifacts/eventsink/in_flight_requests.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<EventStoreConfiguration>
+ <TableSchema>
+ <ColumnDefinition>
+ <Name>time_stamp</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>LONG</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>cluster_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>cluster_instance_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>network_partition_id</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>in_flight_request_count</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>DOUBLE</Type>
+ </ColumnDefinition>
+ </TableSchema>
+ <Source>
+ <StreamId>in_flight_requests:1.0.0</StreamId>
+ </Source>
+ <RecordStoreName>EVENT_STORE</RecordStoreName>
+</EventStoreConfiguration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml b/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml
new file mode 100644
index 0000000..f0dae09
--- /dev/null
+++ b/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<EventStoreConfiguration>
+ <TableSchema>
+ <ColumnDefinition>
+ <Name>timeStamp</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>LONG</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>memberId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>cartridgeType</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>clusterId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>clusterInstanceId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>lbclusterId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>partitionId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>networkId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>instanceType</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>scalingReason</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>scalingTime</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>LONG</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>isMultiTenant</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>iaas</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>status</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>hostName</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>hypervisor</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>ram</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>imageId</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>loginPort</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>INTEGER</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>osName</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>osVersion</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>osArch</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>is64bitOS</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>privateIPAddresses</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>publicIPAddresses</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ <ColumnDefinition>
+ <Name>allocateIPAddresses</Name>
+ <EnableIndexing>false</EnableIndexing>
+ <IsPrimaryKey>false</IsPrimaryKey>
+ <EnableScoreParam>false</EnableScoreParam>
+ <Type>STRING</Type>
+ </ColumnDefinition>
+ </TableSchema>
+ <Source>
+ <StreamId>org.apache.stratos.cloud.controller:1.0.0</StreamId>
+ </Source>
+ <RecordStoreName>EVENT_STORE</RecordStoreName>
+</EventStoreConfiguration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json b/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json
new file mode 100644
index 0000000..ec61229
--- /dev/null
+++ b/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json
@@ -0,0 +1,40 @@
+{
+ "name": "cartridge_agent_health_stats",
+ "version": "1.0.0",
+ "nickName": "",
+ "description": "",
+ "payloadData": [
+ {
+ "name": "time_stamp",
+ "type": "LONG"
+ },
+ {
+ "name": "cluster_id",
+ "type": "STRING"
+ },
+ {
+ "name": "cluster_instance_id",
+ "type": "STRING"
+ },
+ {
+ "name": "network_partition_id",
+ "type": "STRING"
+ },
+ {
+ "name": "member_id",
+ "type": "STRING"
+ },
+ {
+ "name": "partition_id",
+ "type": "STRING"
+ },
+ {
+ "name": "health_description",
+ "type": "STRING"
+ },
+ {
+ "name": "value",
+ "type": "DOUBLE"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json b/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json
new file mode 100644
index 0000000..8c5232a
--- /dev/null
+++ b/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json
@@ -0,0 +1,28 @@
+{
+ "name": "in_flight_requests",
+ "version": "1.0.0",
+ "nickName": "",
+ "description": "",
+ "payloadData": [
+ {
+ "name": "time_stamp",
+ "type": "LONG"
+ },
+ {
+ "name": "cluster_id",
+ "type": "STRING"
+ },
+ {
+ "name": "cluster_instance_id",
+ "type": "STRING"
+ },
+ {
+ "name": "network_partition_id",
+ "type": "STRING"
+ },
+ {
+ "name": "in_flight_request_count",
+ "type": "DOUBLE"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json
----------------------------------------------------------------------
diff --git a/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json b/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json
new file mode 100644
index 0000000..de1025f
--- /dev/null
+++ b/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json
@@ -0,0 +1,112 @@
+{
+ "name": "org.apache.stratos.cloud.controller",
+ "version": "1.0.0",
+ "nickName": "cloud.controller",
+ "description": "Instances booted up by the Cloud Controller",
+ "payloadData": [
+ {
+ "name": "timeStamp",
+ "type": "LONG"
+ },
+ {
+ "name": "memberId",
+ "type": "STRING"
+ },
+ {
+ "name": "cartridgeType",
+ "type": "STRING"
+ },
+ {
+ "name": "clusterId",
+ "type": "STRING"
+ },
+ {
+ "name": "clusterInstanceId",
+ "type": "STRING"
+ },
+ {
+ "name": "lbclusterId",
+ "type": "STRING"
+ },
+ {
+ "name": "partitionId",
+ "type": "STRING"
+ },
+ {
+ "name": "networkId",
+ "type": "STRING"
+ },
+ {
+ "name": "instanceType",
+ "type": "STRING"
+ },
+ {
+ "name": "scalingReason",
+ "type": "STRING"
+ },
+ {
+ "name": "scalingTime",
+ "type": "LONG"
+ },
+ {
+ "name": "isMultiTenant",
+ "type": "STRING"
+ },
+ {
+ "name": "iaas",
+ "type": "STRING"
+ },
+ {
+ "name": "status",
+ "type": "STRING"
+ },
+ {
+ "name": "hostName",
+ "type": "STRING"
+ },
+ {
+ "name": "hypervisor",
+ "type": "STRING"
+ },
+ {
+ "name": "ram",
+ "type": "STRING"
+ },
+ {
+ "name": "imageId",
+ "type": "STRING"
+ },
+ {
+ "name": "loginPort",
+ "type": "INT"
+ },
+ {
+ "name": "osName",
+ "type": "STRING"
+ },
+ {
+ "name": "osVersion",
+ "type": "STRING"
+ },
+ {
+ "name": "osArch",
+ "type": "STRING"
+ },
+ {
+ "name": "is64bitOS",
+ "type": "STRING"
+ },
+ {
+ "name": "privateIPAddresses",
+ "type": "STRING"
+ },
+ {
+ "name": "publicIPAddresses",
+ "type": "STRING"
+ },
+ {
+ "name": "allocateIPAddresses",
+ "type": "STRING"
+ }
+ ]
+}
\ No newline at end of file
[5/5] stratos git commit: Refactoring Monitoring Service Classes
Posted by ga...@apache.org.
Refactoring Monitoring Service Classes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7575faa3
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7575faa3
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7575faa3
Branch: refs/heads/tenant-isolation
Commit: 7575faa30b90a477931c857768b70b369f7fa9c3
Parents: 54283ed
Author: Thanuja <th...@wso2.com>
Authored: Wed Aug 12 18:16:54 2015 +0530
Committer: Thanuja <th...@wso2.com>
Committed: Thu Aug 20 10:59:16 2015 +0530
----------------------------------------------------------------------
.../publisher/HealthStatisticsPublisher.java | 4 +-
.../publisher/InFlightRequestPublisher.java | 4 +-
.../publisher/StatisticsPublisherType.java | 2 +-
.../publisher/ThriftClientConfig.java | 81 +++++++++++
.../publisher/ThriftClientConfigParser.java | 139 +++++++++++++++++++
.../statistics/publisher/ThriftClientInfo.java | 63 +++++++++
.../publisher/ThriftStatisticsPublisher.java | 115 +++++++++++++++
.../publisher/wso2/cep/ThriftClientConfig.java | 81 -----------
.../wso2/cep/ThriftClientConfigParser.java | 139 -------------------
.../publisher/wso2/cep/ThriftClientInfo.java | 63 ---------
.../cep/WSO2CEPHealthStatisticsPublisher.java | 26 ++--
.../cep/WSO2CEPInFlightRequestPublisher.java | 28 ++--
.../wso2/cep/WSO2CEPStatisticsPublisher.java | 114 ---------------
.../test/ThriftClientConfigParserTest.java | 13 +-
.../modules/healthstatspublisher/healthstats.py | 4 +-
.../streamdefinitions/stream-manager-config.xml | 4 +-
16 files changed, 447 insertions(+), 433 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index 95b04ff..6af1317 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -27,7 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
/**
* Publish health statistics to complex event processor.
*
- * @param timeStamp time
+ * @param timestamp Time
* @param clusterId Cluster id of the member
* @param clusterInstanceId Cluster instance id of the member
* @param networkPartitionId Network partition id of the member
@@ -36,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher {
* @param health Health type: memory_consumption | load_average
* @param value Health type value
*/
- void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
String memberId, String partitionId, String health, double value);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index af9c8e9..e4e65c0 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -27,12 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher {
/**
* Publish in-flight request count.
*
- * @param timeStamp time
+ * @param timestamp Time
* @param clusterId Cluster id
* @param clusterInstanceId Cluster instance id
* @param networkPartitionId Network partition id of the cluster
* @param inFlightRequestCount In-flight request count of the cluster
*/
- void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
int inFlightRequestCount);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
index 77c5b78..d4b9a87 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/StatisticsPublisherType.java
@@ -23,5 +23,5 @@ package org.apache.stratos.common.statistics.publisher;
* Statistics publisher type enumneration.
*/
public enum StatisticsPublisherType {
- WSO2CEP
+ WSO2CEP, WSO2DAS
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
new file mode 100644
index 0000000..25ee897
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Thrift Client configuration.
+ */
+public class ThriftClientConfig {
+
+ public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path";
+
+ private static volatile ThriftClientConfig instance;
+ private ThriftClientInfo thriftClientInfo;
+
+ /*
+ * A private Constructor prevents any other
+ * class from instantiating.
+ */
+ ThriftClientConfig() {
+ }
+
+ public static ThriftClientConfig getInstance() {
+ if (instance == null) {
+ synchronized (ThriftClientConfig.class) {
+ if (instance == null) {
+ String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH);
+ if (StringUtils.isBlank(configFilePath)) {
+ throw new RuntimeException(String.format("Thrift client configuration file path system " +
+ "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH));
+ }
+ instance = ThriftClientConfigParser.parse(configFilePath);
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Returns an ThriftClientInfo Object that stores the credential information.
+ * Thrift client credential information can be found under thrift-client-config.xml file
+ * These credential information then get parsed and assigned into ThriftClientInfo
+ * Object.
+ * <p/>
+ * This method is used to return the assigned values in ThriftClientInfo Object
+ *
+ * @return ThriftClientInfo object which consists of username,password,ip and port values
+ */
+ public ThriftClientInfo getThriftClientInfo() {
+ return thriftClientInfo;
+ }
+
+ /**
+ * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken
+ * from thrift-client-config.xml file.
+ *
+ * @param thriftClientInfo Object of the ThriftClientInfo
+ */
+ public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) {
+ this.thriftClientInfo = thriftClientInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
new file mode 100644
index 0000000..aa05d6f
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientConfigParser.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.util.AxiomXpathParserUtil;
+import org.wso2.securevault.SecretResolver;
+import org.wso2.securevault.SecretResolverFactory;
+
+import java.io.File;
+import java.util.Iterator;
+
+/**
+ * Thrift client config parser.
+ */
+public class ThriftClientConfigParser {
+
+ private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class);
+
+ /**
+ * Fields to be read from the thrift-client-config.xml file
+ */
+ private static final String USERNAME_ELEMENT = "username";
+ private static final String PASSWORD_ELEMENT = "password";
+ private static final String IP_ELEMENT = "ip";
+ private static final String PORT_ELEMENT = "port";
+
+ /**
+ * This method reads thrift-client-config.xml file and assign necessary credential
+ * values into thriftClientInfo object. A singleton design has been implemented
+ * with the use of thriftClientIConfig class.
+ * <p/>
+ * The filePath argument is the path to thrift-client-config.xml file
+ *
+ * @param filePath the path to thrift-client-config.xml file
+ * @return ThriftClientConfig object
+ */
+ public static ThriftClientConfig parse(String filePath) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Parsing thrift client config file: %s", filePath));
+ }
+
+ ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
+ ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
+ thriftClientIConfig.setThriftClientInfo(thriftClientInfo);
+
+ File configFile = new File(filePath);
+ if (!configFile.exists()) {
+ throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath));
+ }
+ OMElement document = AxiomXpathParserUtil.parse(configFile);
+ Iterator thriftClientIterator = document.getChildElements();
+
+ //Initialize the SecretResolver providing the configuration element.
+ SecretResolver secretResolver = SecretResolverFactory.create(document, false);
+
+ String userNameValuesStr = null;
+ String passwordValueStr = null;
+ String ipValuesStr = null;
+ String portValueStr = null;
+
+ //same entry used in cipher-text.properties and cipher-tool.properties.
+ String secretAlias = "thrift.client.configuration.password";
+
+ // Iterate the thrift-client-config.xml file and read child element
+ // consists of credential information necessary for ThriftStatisticsPublisher
+ while (thriftClientIterator.hasNext()) {
+ OMElement thriftClientElement = (OMElement) thriftClientIterator.next();
+
+ if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+ userNameValuesStr = thriftClientElement.getText();
+ thriftClientInfo.setUsername(userNameValuesStr);
+ }
+ //password field protected using Secure vault
+ if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+ if ((secretResolver != null) && (secretResolver.isInitialized())) {
+ if (secretResolver.isTokenProtected(secretAlias)) {
+ passwordValueStr = secretResolver.resolve(secretAlias);
+ } else {
+ passwordValueStr = thriftClientElement.getText();
+ }
+ } else {
+ passwordValueStr = thriftClientElement.getText();
+ }
+ thriftClientInfo.setPassword(passwordValueStr);
+ }
+
+ if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+ ipValuesStr = thriftClientElement.getText();
+ thriftClientInfo.setIp(ipValuesStr);
+ }
+
+ if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
+ portValueStr = thriftClientElement.getText();
+ thriftClientInfo.setPort(portValueStr);
+ }
+ }
+
+ if (userNameValuesStr == null) {
+ throw new RuntimeException("Username value not found in thrift client configuration");
+ }
+ if (passwordValueStr == null) {
+ throw new RuntimeException("Password not found in thrift client configuration ");
+ }
+
+ if (ipValuesStr == null) {
+ throw new RuntimeException("Ip values not found in thrift client configuration ");
+ }
+
+ if (portValueStr == null) {
+ throw new RuntimeException("Port not found in thrift client configuration ");
+ }
+
+ return thriftClientIConfig;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not parse thrift client configuration", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
new file mode 100644
index 0000000..514d907
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftClientInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+/**
+ * Thrift Client Info
+ */
+public class ThriftClientInfo {
+ private String username;
+ private String password;
+ private String ip;
+ private String port;
+
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
new file mode 100644
index 0000000..6a9e955
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/ThriftStatisticsPublisher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.HashMap;
+
+/**
+ * Thrift statistics publisher.
+ */
+public class ThriftStatisticsPublisher implements StatisticsPublisher {
+
+ private static final Log log = LogFactory.getLog(ThriftStatisticsPublisher.class);
+
+ private StreamDefinition streamDefinition;
+ private AsyncDataPublisher asyncDataPublisher;
+ private String ip;
+ private String port;
+ private String username;
+ private String password;
+ private boolean enabled = false;
+
+ /**
+ * Credential information stored inside thrift-client-config.xml file
+ * is parsed and assigned into ip,port,username and password fields
+ *
+ * @param streamDefinition Thrift Event Stream Definition
+ */
+ public ThriftStatisticsPublisher(StreamDefinition streamDefinition, String statsPublisherEnabled) {
+ ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
+ ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo();
+
+ this.streamDefinition = streamDefinition;
+ this.ip = thriftClientInfo.getIp();
+ this.port = thriftClientInfo.getPort();
+ this.username = thriftClientInfo.getUsername();
+ this.password = thriftClientInfo.getPassword();
+
+ enabled = Boolean.getBoolean(statsPublisherEnabled);
+ if (enabled) {
+ init();
+ }
+ }
+
+ private void init() {
+ AgentConfiguration agentConfiguration = new AgentConfiguration();
+ Agent agent = new Agent(agentConfiguration);
+
+ // Initialize asynchronous data publisher
+ asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
+ asyncDataPublisher.addStreamDefinition(streamDefinition);
+ }
+
+ @Override
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ if (this.enabled) {
+ init();
+ }
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public void publish(Object[] payload) {
+ if (!isEnabled()) {
+ throw new RuntimeException("Statistics publisher is not enabled");
+ }
+
+ Event event = new Event();
+ event.setPayloadData(payload);
+ event.setArbitraryDataMap(new HashMap<String, String>());
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Publishing thrift event: [stream] %s [version] %s",
+ streamDefinition.getName(), streamDefinition.getVersion()));
+ }
+ asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
+ } catch (AgentException e) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Could not publish thrift event: [stream] %s [version] %s",
+ streamDefinition.getName(), streamDefinition.getVersion()), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
deleted file mode 100644
index 178c5a6..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfig.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-
-import org.apache.commons.lang.StringUtils;
-
-/**
- * Thrift Client configuration.
- */
-public class ThriftClientConfig {
-
- public static final String THRIFT_CLIENT_CONFIG_FILE_PATH = "thrift.client.config.file.path";
-
- private static volatile ThriftClientConfig instance;
- private ThriftClientInfo thriftClientInfo;
-
- /*
- * A private Constructor prevents any other
- * class from instantiating.
- */
- ThriftClientConfig() {
- }
-
- public static ThriftClientConfig getInstance() {
- if (instance == null) {
- synchronized (ThriftClientConfig.class) {
- if (instance == null) {
- String configFilePath = System.getProperty(THRIFT_CLIENT_CONFIG_FILE_PATH);
- if (StringUtils.isBlank(configFilePath)) {
- throw new RuntimeException(String.format("Thrift client configuration file path system " +
- "property is not set: %s", THRIFT_CLIENT_CONFIG_FILE_PATH));
- }
- instance = ThriftClientConfigParser.parse(configFilePath);
- }
- }
- }
- return instance;
- }
-
- /**
- * Returns an ThriftClientInfo Object that stores the credential information.
- * CEP credential information can be found under thrift-client-config.xml file
- * These credential information then get parsed and assigned into ThriftClientInfo
- * Object.
- * <p/>
- * This method is used to return the assigned values in ThriftClientInfo Object
- *
- * @return ThriftClientInfo object which consists of username,password,ip and port values
- */
- public ThriftClientInfo getThriftClientInfo() {
- return thriftClientInfo;
- }
-
- /**
- * Parsed values will be assigned to ThriftClientInfo object. Required fields will be taken
- * from thrift-client-config.xml file.
- *
- * @param thriftClientInfo Object of the ThriftClientInfo
- */
- public void setThriftClientInfo(ThriftClientInfo thriftClientInfo) {
- this.thriftClientInfo = thriftClientInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
deleted file mode 100644
index ae3c692..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientConfigParser.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.util.AxiomXpathParserUtil;
-import org.wso2.securevault.SecretResolver;
-import org.wso2.securevault.SecretResolverFactory;
-
-import java.io.File;
-import java.util.Iterator;
-
-/**
- * Thrift client config parser.
- */
-public class ThriftClientConfigParser {
-
- private static final Log log = LogFactory.getLog(ThriftClientConfigParser.class);
-
- /**
- * Fields to be read from the thrift-client-config.xml file
- */
- private static final String USERNAME_ELEMENT = "username";
- private static final String PASSWORD_ELEMENT = "password";
- private static final String IP_ELEMENT = "ip";
- private static final String PORT_ELEMENT = "port";
-
- /**
- * This method reads thrift-client-config.xml file and assign necessary credential
- * values into thriftClientInfo object. A singleton design has been implemented
- * with the use of thriftClientIConfig class.
- * <p/>
- * The filePath argument is the path to thrift-client-config.xml file
- *
- * @param filePath the path to thrift-client-config.xml file
- * @return ThriftClientConfig object
- */
- public static ThriftClientConfig parse(String filePath) {
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Parsing thrift client config file: %s", filePath));
- }
-
- ThriftClientConfig thriftClientIConfig = new ThriftClientConfig();
- ThriftClientInfo thriftClientInfo = new ThriftClientInfo();
- thriftClientIConfig.setThriftClientInfo(thriftClientInfo);
-
- File configFile = new File(filePath);
- if (!configFile.exists()) {
- throw new RuntimeException(String.format("Thrift client config file does not exist: %s", filePath));
- }
- OMElement document = AxiomXpathParserUtil.parse(configFile);
- Iterator thriftClientIterator = document.getChildElements();
-
- //Initialize the SecretResolver providing the configuration element.
- SecretResolver secretResolver = SecretResolverFactory.create(document, false);
-
- String userNameValuesStr = null;
- String passwordValueStr = null;
- String ipValuesStr = null;
- String portValueStr = null;
-
- //same entry used in cipher-text.properties and cipher-tool.properties.
- String secretAlias = "thrift.client.configuration.password";
-
- // Iterate the thrift-client-config.xml file and read child element
- // consists of credential information necessary for WSO2CEPStatisticsPublisher
- while (thriftClientIterator.hasNext()) {
- OMElement thriftClientElement = (OMElement) thriftClientIterator.next();
-
- if (USERNAME_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
- userNameValuesStr = thriftClientElement.getText();
- thriftClientInfo.setUsername(userNameValuesStr);
- }
- //password field protected using Secure vault
- if (PASSWORD_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
- if ((secretResolver != null) && (secretResolver.isInitialized())) {
- if (secretResolver.isTokenProtected(secretAlias)) {
- passwordValueStr = secretResolver.resolve(secretAlias);
- } else {
- passwordValueStr = thriftClientElement.getText();
- }
- } else {
- passwordValueStr = thriftClientElement.getText();
- }
- thriftClientInfo.setPassword(passwordValueStr);
- }
-
- if (IP_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
- ipValuesStr = thriftClientElement.getText();
- thriftClientInfo.setIp(ipValuesStr);
- }
-
- if (PORT_ELEMENT.equals(thriftClientElement.getQName().getLocalPart())) {
- portValueStr = thriftClientElement.getText();
- thriftClientInfo.setPort(portValueStr);
- }
- }
-
- if (userNameValuesStr == null) {
- throw new RuntimeException("Username value not found in thrift client configuration");
- }
- if (passwordValueStr == null) {
- throw new RuntimeException("Password not found in thrift client configuration ");
- }
-
- if (ipValuesStr == null) {
- throw new RuntimeException("Ip values not found in thrift client configuration ");
- }
-
- if (portValueStr == null) {
- throw new RuntimeException("Port not found in thrift client configuration ");
- }
-
- return thriftClientIConfig;
- } catch (Exception e) {
- throw new RuntimeException("Could not parse thrift client configuration", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
deleted file mode 100644
index 1a9ba81..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/ThriftClientInfo.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-/**
- * Thrift Client Info
- */
-public class ThriftClientInfo {
- private String username;
- private String password;
- private String ip;
- private String port;
-
-
- public String getUsername() {
- return username;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- public String getPort() {
- return port;
- }
-
- public void setPort(String port) {
- this.port = port;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index d5c9265..33cf0b5 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -22,6 +22,7 @@ package org.apache.stratos.common.statistics.publisher.wso2.cep;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.statistics.publisher.HealthStatisticsPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -32,15 +33,16 @@ import java.util.List;
/**
* Health statistics publisher for publishing statistics to WSO2 CEP.
*/
-public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher implements HealthStatisticsPublisher {
+public class WSO2CEPHealthStatisticsPublisher extends ThriftStatisticsPublisher implements HealthStatisticsPublisher {
private static final Log log = LogFactory.getLog(WSO2CEPHealthStatisticsPublisher.class);
+ private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
private static final String DATA_STREAM_NAME = "cartridge_agent_health_stats";
private static final String VERSION = "1.0.0";
public WSO2CEPHealthStatisticsPublisher() {
- super(createStreamDefinition());
+ super(createStreamDefinition(), STATS_PUBLISHER_ENABLED);
}
private static StreamDefinition createStreamDefinition() {
@@ -71,17 +73,17 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
/**
* Publish health statistics to cep.
*
- * @param timeStamp
- * @param clusterId
- * @param clusterInstanceId
- * @param networkPartitionId
- * @param memberId
- * @param partitionId
- * @param health
- * @param value
+ * @param timestamp Time
+ * @param clusterId Cluster id of the member
+ * @param clusterInstanceId Cluster instance id of the member
+ * @param networkPartitionId Network partition id of the member
+ * @param memberId Member id
+ * @param partitionId Partition id of the member
+ * @param health Health type: memory_consumption | load_average
+ * @param value Health type value
*/
@Override
- public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
String memberId, String partitionId, String health, double value) {
if (log.isDebugEnabled()) {
log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " +
@@ -90,7 +92,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher
}
// Set payload values
List<Object> payload = new ArrayList<Object>();
- payload.add(timeStamp);
+ payload.add(timestamp);
payload.add(clusterId);
payload.add(clusterInstanceId);
payload.add(networkPartitionId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index f51eb91..f853f23 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -19,7 +19,10 @@
package org.apache.stratos.common.statistics.publisher.wso2.cep;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
+import org.apache.stratos.common.statistics.publisher.ThriftStatisticsPublisher;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.AttributeType;
import org.wso2.carbon.databridge.commons.StreamDefinition;
@@ -33,13 +36,15 @@ import java.util.List;
* In-flight request count:
* Number of requests being served at a given moment could be identified as in-flight request count.
*/
-public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher implements InFlightRequestPublisher {
+public class WSO2CEPInFlightRequestPublisher extends ThriftStatisticsPublisher implements InFlightRequestPublisher {
+ private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestPublisher.class);
+ private static final String STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
private static final String DATA_STREAM_NAME = "in_flight_requests";
private static final String VERSION = "1.0.0";
public WSO2CEPInFlightRequestPublisher() {
- super(createStreamDefinition());
+ super(createStreamDefinition(), STATS_PUBLISHER_ENABLED);
}
private static StreamDefinition createStreamDefinition() {
@@ -66,18 +71,23 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher
/**
* Publish in-flight request count of a cluster.
*
- * @param timeStamp
- * @param clusterId
- * @param clusterInstanceId
- * @param networkPartitionId
- * @param inFlightRequestCount
+ * @param timestamp Time
+ * @param clusterId Cluster id
+ * @param clusterInstanceId Cluster instance id
+ * @param networkPartitionId Network partition id of the cluster
+ * @param inFlightRequestCount In-flight request count of the cluster
*/
@Override
- public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId,
+ public void publish(Long timestamp, String clusterId, String clusterInstanceId, String networkPartitionId,
int inFlightRequestCount) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Publishing health statistics: [timestamp] %d [cluster] %s " +
+ "[cluster-instance] %s [network-partition] %s [in-flight-request-count] %d",
+ timestamp, clusterId, clusterInstanceId, networkPartitionId, inFlightRequestCount));
+ }
// Set payload values
List<Object> payload = new ArrayList<Object>();
- payload.add(timeStamp);
+ payload.add(timestamp);
payload.add(clusterId);
payload.add(clusterInstanceId);
payload.add(networkPartitionId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
deleted file mode 100644
index 653288d..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPStatisticsPublisher.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.common.statistics.publisher.wso2.cep;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.statistics.publisher.StatisticsPublisher;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.HashMap;
-
-/**
- * WSO2 CEP statistics publisher.
- */
-public class WSO2CEPStatisticsPublisher implements StatisticsPublisher {
-
- private static final Log log = LogFactory.getLog(WSO2CEPStatisticsPublisher.class);
-
- private StreamDefinition streamDefinition;
- private AsyncDataPublisher asyncDataPublisher;
- private String ip;
- private String port;
- private String username;
- private String password;
- private boolean enabled = false;
-
- /**
- * Credential information stored inside thrift-client-config.xml file
- * is parsed and assigned into ip,port,username and password fields
- *
- * @param streamDefinition
- */
- public WSO2CEPStatisticsPublisher(StreamDefinition streamDefinition) {
- ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
- thriftClientConfig.getThriftClientInfo();
-
- this.streamDefinition = streamDefinition;
- this.ip = thriftClientConfig.getThriftClientInfo().getIp();
- this.port = thriftClientConfig.getThriftClientInfo().getPort();
- this.username = thriftClientConfig.getThriftClientInfo().getUsername();
- this.password = thriftClientConfig.getThriftClientInfo().getPassword();
-
- enabled = Boolean.getBoolean("cep.stats.publisher.enabled");
- if (enabled) {
- init();
- }
- }
-
- private void init() {
- AgentConfiguration agentConfiguration = new AgentConfiguration();
- Agent agent = new Agent(agentConfiguration);
-
- // Initialize asynchronous data publisher
- asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
- asyncDataPublisher.addStreamDefinition(streamDefinition);
- }
-
- @Override
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
- if (this.enabled) {
- init();
- }
- }
-
- @Override
- public boolean isEnabled() {
- return enabled;
- }
-
- @Override
- public void publish(Object[] payload) {
- if (!isEnabled()) {
- throw new RuntimeException("Statistics publisher is not enabled");
- }
-
- Event event = new Event();
- event.setPayloadData(payload);
- event.setArbitraryDataMap(new HashMap<String, String>());
-
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
- }
- asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
- } catch (AgentException e) {
- if (log.isErrorEnabled()) {
- log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
index 09d5b5e..352041d 100644
--- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
+++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/ThriftClientConfigParserTest.java
@@ -20,7 +20,8 @@
package org.apache.stratos.common.test;
import junit.framework.TestCase;
-import org.apache.stratos.common.statistics.publisher.wso2.cep.ThriftClientConfig;
+import org.apache.stratos.common.statistics.publisher.ThriftClientConfig;
+import org.apache.stratos.common.statistics.publisher.ThriftClientInfo;
import org.junit.Test;
import java.net.URL;
@@ -41,11 +42,11 @@ public class ThriftClientConfigParserTest extends TestCase {
URL configFileUrl = ThriftClientConfigParserTest.class.getResource("/thrift-client-config.xml");
System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH, configFileUrl.getPath());
ThriftClientConfig thriftClientConfig = ThriftClientConfig.getInstance();
- thriftClientConfig.getThriftClientInfo();
+ ThriftClientInfo thriftClientInfo = thriftClientConfig.getThriftClientInfo();
- assertEquals("Incorrect Password", "admin", thriftClientConfig.getThriftClientInfo().getUsername());
- assertEquals("Incorrect Password", "1234", thriftClientConfig.getThriftClientInfo().getPassword());
- assertEquals("Incorrect IP", "192.168.10.10", thriftClientConfig.getThriftClientInfo().getIp());
- assertEquals("Incorrect Port", "9300", thriftClientConfig.getThriftClientInfo().getPort());
+ assertEquals("Incorrect Username", "admin", thriftClientInfo.getUsername());
+ assertEquals("Incorrect Password", "1234", thriftClientInfo.getPassword());
+ assertEquals("Incorrect IP", "192.168.10.10", thriftClientInfo.getIp());
+ assertEquals("Incorrect Port", "9300", thriftClientInfo.getPort());
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
index aae9e9d..c0b3981 100644
--- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
+++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
@@ -124,7 +124,7 @@ class HealthStatisticsPublisher:
stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION
# stream_def.add_payloaddata_attribute()
- stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG)
+ stream_def.add_payloaddata_attribute("timestamp", StreamDefinition.LONG)
stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING)
stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING)
stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING)
@@ -199,7 +199,7 @@ class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader):
(one, five, fifteen) = os.getloadavg()
cores = multiprocessing.cpu_count()
- return (one/cores) * 100
+ return (one / cores) * 100
class CEPPublisherConfiguration:
http://git-wip-us.apache.org/repos/asf/stratos/blob/7575faa3/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
index a256770..9e0a833 100644
--- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
+++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml
@@ -30,7 +30,7 @@
<correlationData>
</correlationData>
<payloadData>
- <property name="time_stamp" type="long"/>
+ <property name="timestamp" type="long"/>
<property name="cluster_id" type="String"/>
<property name="cluster_instance_id" type="String"/>
<property name="network_partition_id" type="String"/>
@@ -93,7 +93,7 @@
<correlationData>
</correlationData>
<payloadData>
- <property name="time_stamp" type="long"/>
+ <property name="timestamp" type="long"/>
<property name="cluster_id" type="String"/>
<property name="cluster_instance_id" type="String"/>
<property name="network_partition_id" type="String"/>
[3/5] stratos git commit: Adding Metering and Monitoring Service
Implementation
Posted by ga...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index dab6827..f76c928 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl
import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.*;
@@ -67,8 +68,11 @@ public class TopologyBuilder {
TopologyManager.acquireWriteLock();
for (Cartridge cartridge : cartridgeList) {
if (!topology.serviceExists(cartridge.getType())) {
- ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant;
+
+ ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant :
+ ServiceType.SingleTenant;
service = new Service(cartridge.getType(), serviceType, cartridge.getUuid());
+
Properties properties = new Properties();
try {
@@ -199,867 +203,899 @@ public class TopologyBuilder {
}
log.debug("Creating cluster port mappings: [application-id] " + appUuid);
- for(Cluster cluster : appClusters) {
+ for (Cluster cluster : appClusters) {
String cartridgeUuid = cluster.getServiceUuid();
Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeUuid);
- if(cartridge == null) {
+ if (cartridge == null) {
throw new CloudControllerException("Cartridge not found: [cartridge-uuid] " + cartridgeUuid);
}
- for(PortMapping portMapping : cartridge.getPortMappings()) {
+ for (PortMapping portMapping : cartridge.getPortMappings()) {
ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appUuid,
cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(),
- portMapping.getProxyPort());
- CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
- log.debug("Cluster port mapping created: " + clusterPortMapping.toString());
- }
- }
-
- // Persist cluster port mappings
- CloudControllerContext.getInstance().persist();
-
- // Send application clusters created event
- TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
- }
-
- public static void handleApplicationClustersRemoved(String appId,
- Set<ClusterDataHolder> clusterData) {
- TopologyManager.acquireWriteLock();
-
- List<Cluster> removedClusters = new ArrayList<Cluster>();
- CloudControllerContext context = CloudControllerContext.getInstance();
- try {
- Topology topology = TopologyManager.getTopology();
-
- if (clusterData != null) {
- // remove clusters from CC topology model and remove runtime information
- for (ClusterDataHolder aClusterData : clusterData) {
- Service aService = topology.getService(aClusterData.getServiceUuid());
- if (aService != null) {
- removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
- } else {
- log.warn("Service " + aClusterData.getServiceType() + " not found, " +
- "unable to remove Cluster " + aClusterData.getClusterId());
+ portMapping.getProxyPort());
+ CloudControllerContext.getInstance().addClusterPortMapping(clusterPortMapping);
+ log.debug("Cluster port mapping created: " + clusterPortMapping.toString());
}
- // remove runtime data
- context.removeClusterContext(aClusterData.getClusterId());
-
- log.info("Removed application [ " + appId + " ]'s Cluster " +
- "[ " + aClusterData.getClusterId() + " ] from the topology");
}
- // persist runtime data changes
- CloudControllerContext.getInstance().persist();
- } else {
- log.info("No cluster data found for application " + appId + " to remove");
- }
- TopologyManager.updateTopology(topology);
+ // Persist cluster port mappings
+ CloudControllerContext.getInstance().persist();
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ // Send application clusters created event
+ TopologyEventPublisher.sendApplicationClustersCreated(appUuid, appClusters);
+ }
- // Remove cluster port mappings of application
- CloudControllerContext.getInstance().removeClusterPortMappings(appId);
- CloudControllerContext.getInstance().persist();
+ public static void handleApplicationClustersRemoved (String appId,
+ Set < ClusterDataHolder > clusterData){
+ TopologyManager.acquireWriteLock();
- TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
+ List<Cluster> removedClusters = new ArrayList<Cluster>();
+ CloudControllerContext context = CloudControllerContext.getInstance();
+ try {
+ Topology topology = TopologyManager.getTopology();
+
+ if (clusterData != null) {
+ // remove clusters from CC topology model and remove runtime information
+ for (ClusterDataHolder aClusterData : clusterData) {
+ Service aService = topology.getService(aClusterData.getServiceUuid());
+ if (aService != null) {
+ removedClusters.add(aService.removeCluster(aClusterData.getClusterId()));
+ } else {
+ log.warn("Service " + aClusterData.getServiceType() + " not found, " +
+ "unable to remove Cluster " + aClusterData.getClusterId());
+ }
+ // remove runtime data
+ context.removeClusterContext(aClusterData.getClusterId());
- }
+ log.info("Removed application [ " + appId + " ]'s Cluster " +
+ "[ " + aClusterData.getClusterId() + " ] from the topology");
+ }
+ // persist runtime data changes
+ CloudControllerContext.getInstance().persist();
+ } else {
+ log.info("No cluster data found for application " + appId + " to remove");
+ }
- public static void handleClusterReset(ClusterStatusClusterResetEvent event) {
- TopologyManager.acquireWriteLock();
+ TopologyManager.updateTopology(topology);
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
- if (service == null) {
- log.error("Service " + event.getServiceName() +
- " not found in Topology, unable to update the cluster status to Created");
- return;
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- Cluster cluster = service.getCluster(event.getClusterId());
- if (cluster == null) {
- log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
- "status to Created");
- return;
- }
+ // Remove cluster port mappings of application
+ CloudControllerContext.getInstance().removeClusterPortMappings(appId);
+ CloudControllerContext.getInstance().persist();
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
- }
- ClusterStatus status = ClusterStatus.Created;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Created adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
- event.getClusterId(), event.getInstanceId());
- } else {
- log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- }
+ TopologyEventPublisher.sendApplicationClustersRemoved(appId, clusterData);
- } finally {
- TopologyManager.releaseWriteLock();
}
+ public static void handleClusterReset (ClusterStatusClusterResetEvent event){
+ TopologyManager.acquireWriteLock();
- }
-
- public static void handleClusterInstanceCreated(String serviceUuid, String clusterId,
- String alias, String instanceId, String partitionId,
- String networkPartitionUuid) {
-
- TopologyManager.acquireWriteLock();
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(event.getServiceName());
+ if (service == null) {
+ log.error("Service " + event.getServiceName() +
+ " not found in Topology, unable to update the cluster status to Created");
+ return;
+ }
- try {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceUuid);
- if (service == null) {
- log.error("Service " + serviceUuid +
- " not found in Topology, unable to update the cluster status to Created");
- return;
- }
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.error("Cluster " + event.getClusterId() + " not found in Topology, unable to update " +
+ "status to Created");
+ return;
+ }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
- "status to Created");
- return;
- }
+ ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Created;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Created adding status started for" + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ TopologyEventPublisher.sendClusterResetEvent(event.getAppId(), event.getServiceName(),
+ event.getClusterId(), event.getInstanceId());
+ } else {
+ log.warn(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ }
- if (cluster.getInstanceContexts(instanceId) != null) {
- log.warn("The Instance context for the cluster already exists for [cluster] " +
- clusterId + " [instance-id] " + instanceId);
- return;
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId);
- clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
- clusterInstance.setPartitionId(partitionId);
- cluster.addInstanceContext(instanceId, clusterInstance);
- TopologyManager.updateTopology(topology);
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
- clusterInstance);
- clusterInstanceCreatedEvent.setPartitionId(partitionId);
- TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
-
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void handleClusterRemoved(ClusterContext ctxt) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(ctxt.getCartridgeUuid());
- String deploymentPolicy;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- ctxt.getCartridgeUuid()));
- return;
}
- if (!service.clusterExists(ctxt.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist for service %s",
- ctxt.getClusterId(),
- ctxt.getCartridgeUuid()));
- return;
- }
+ public static void handleClusterInstanceCreated (String serviceUuid, String clusterId,
+ String alias, String instanceId, String partitionId,
+ String networkPartitionUuid){
- try {
TopologyManager.acquireWriteLock();
- Cluster cluster = service.removeCluster(ctxt.getClusterId());
- deploymentPolicy = cluster.getDeploymentPolicyUuid();
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
- }
- /**
- * Add member object to the topology and publish member created event
- *
- * @param memberContext
- */
- public static void handleMemberCreatedEvent(MemberContext memberContext) {
- Topology topology = TopologyManager.getTopology();
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceUuid);
+ if (service == null) {
+ log.error("Service " + serviceUuid +
+ " not found in Topology, unable to update the cluster status to Created");
+ return;
+ }
- Service service = topology.getService(memberContext.getCartridgeType());
- String clusterId = memberContext.getClusterId();
- Cluster cluster = service.getCluster(clusterId);
- String memberId = memberContext.getMemberId();
- String clusterInstanceId = memberContext.getClusterInstanceId();
- String networkPartitionId = memberContext.getNetworkPartitionId();
- String partitionId = memberContext.getPartition().getUuid();
- String lbClusterId = memberContext.getLbClusterId();
- long initTime = memberContext.getInitTime();
-
- if (cluster.memberExists(memberId)) {
- log.warn(String.format("Member %s already exists", memberId));
- return;
- }
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.error("Cluster " + clusterId + " not found in Topology, unable to update " +
+ "status to Created");
+ return;
+ }
- try {
- TopologyManager.acquireWriteLock();
- Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
- networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
- member.setStatus(MemberStatus.Created);
- member.setLbClusterId(lbClusterId);
- member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
- cluster.addMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ if (cluster.getInstanceContexts(instanceId) != null) {
+ log.warn("The Instance context for the cluster already exists for [cluster] " +
+ clusterId + " [instance-id] " + instanceId);
+ return;
+ }
- TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
- }
+ ClusterInstance clusterInstance = new ClusterInstance(alias, clusterId, instanceId);
+ clusterInstance.setNetworkPartitionUuid(networkPartitionUuid);
+ clusterInstance.setPartitionId(partitionId);
+ cluster.addInstanceContext(instanceId, clusterInstance);
+ TopologyManager.updateTopology(topology);
- /**
- * Update member status to initialized and publish member initialized event
- *
- * @param memberContext
- */
- public static void handleMemberInitializedEvent(MemberContext memberContext) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(memberContext.getCartridgeType());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- memberContext.getCartridgeType()));
- return;
- }
- if (!service.clusterExists(memberContext.getClusterId())) {
- log.warn(String.format("Cluster %s does not exist in service %s",
- memberContext.getClusterId(),
- memberContext.getCartridgeType()));
- return;
- }
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ new ClusterInstanceCreatedEvent(serviceUuid, clusterId,
+ clusterInstance);
+ clusterInstanceCreatedEvent.setPartitionId(partitionId);
+ TopologyEventPublisher.sendClusterInstanceCreatedEvent(clusterInstanceCreatedEvent);
- Member member = service.getCluster(memberContext.getClusterId()).
- getMember(memberContext.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberContext.getMemberId()));
- return;
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
}
- try {
- TopologyManager.acquireWriteLock();
- // Set ip addresses
- member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
- if (memberContext.getPrivateIPs() != null) {
- member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
- }
- member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
- if (memberContext.getPublicIPs() != null) {
- member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+ public static void handleClusterRemoved (ClusterContext ctxt){
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(ctxt.getCartridgeUuid());
+ String deploymentPolicy;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ ctxt.getCartridgeUuid()));
+ return;
}
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
- log.error("Invalid state transition from " + member.getStatus() + " to " +
- MemberStatus.Initialized);
+ if (!service.clusterExists(ctxt.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist for service %s",
+ ctxt.getClusterId(),
+ ctxt.getCartridgeUuid()));
return;
- } else {
- member.setStatus(MemberStatus.Initialized);
- log.info("Member status updated to initialized");
+ }
+ try {
+ TopologyManager.acquireWriteLock();
+ Cluster cluster = service.removeCluster(ctxt.getClusterId());
+ deploymentPolicy = cluster.getDeploymentPolicyUuid();
TopologyManager.updateTopology(topology);
-
- TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
- //publishing data
- BAMUsageDataPublisher.publish(memberContext.getMemberId(),
- memberContext.getPartition().getUuid(),
- memberContext.getNetworkPartitionId(),
- memberContext.getClusterId(),
- memberContext.getCartridgeType(),
- MemberStatus.Initialized.toString(),
- null);
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- } finally {
- TopologyManager.releaseWriteLock();
+ TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy);
}
- }
- private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices,
- PortMapping portMapping) {
- for (KubernetesService kubernetesService : kubernetesServices) {
- if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
- return kubernetesService.getPort();
+ /**
+ * Add member object to the topology and publish member created event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberCreatedEvent (MemberContext memberContext){
+ Topology topology = TopologyManager.getTopology();
+
+ Service service = topology.getService(memberContext.getCartridgeType());
+ String clusterId = memberContext.getClusterId();
+ Cluster cluster = service.getCluster(clusterId);
+ String memberId = memberContext.getMemberId();
+ String clusterInstanceId = memberContext.getClusterInstanceId();
+ String networkPartitionId = memberContext.getNetworkPartitionId();
+ String partitionId = memberContext.getPartition().getUuid();
+ String lbClusterId = memberContext.getLbClusterId();
+ long initTime = memberContext.getInitTime();
+ String autoscalingReason = memberContext.getProperties().getProperty(
+ StratosConstants.SCALING_REASON).getValue();
+ long scalingTime = Long.parseLong(memberContext.getProperties().getProperty(
+ StratosConstants.SCALING_TIME).getValue());
+
+
+ if (cluster.memberExists(memberId)) {
+ log.warn(String.format("Member %s already exists", memberId));
+ return;
}
+
+ try {
+ TopologyManager.acquireWriteLock();
+ Member member = new Member(service.getServiceName(), clusterId, memberId, clusterInstanceId,
+ networkPartitionId, partitionId, memberContext.getLoadBalancingIPType(), initTime);
+ member.setStatus(MemberStatus.Created);
+ member.setLbClusterId(lbClusterId);
+ member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
+ cluster.addMember(member);
+ TopologyManager.updateTopology(topology);
+ //member created time
+ Long timeStamp = System.currentTimeMillis();
+ //publishing to BAM
+ BAMUsageDataPublisher
+ .publish(memberContext.getMemberId(),
+ memberContext.getPartition().getId(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getClusterId(),
+ memberContext.getClusterInstanceId(),
+ memberContext.getCartridgeType(),
+ MemberStatus.Created.toString(),
+ timeStamp, autoscalingReason,
+ scalingTime, null);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+
+ TopologyEventPublisher.sendMemberCreatedEvent(memberContext);
}
- throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] "
- + portMapping.getPort());
- }
- public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) {
- try {
+ /**
+ * Update member status to initialized and publish member initialized event
+ *
+ * @param memberContext
+ */
+ public static void handleMemberInitializedEvent (MemberContext memberContext){
Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceStartedEvent.getServiceName());
+ Service service = topology.getService(memberContext.getCartridgeType());
if (service == null) {
log.warn(String.format("Service %s does not exist",
- instanceStartedEvent.getServiceName()));
+ memberContext.getCartridgeType()));
return;
}
- if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+ if (!service.clusterExists(memberContext.getClusterId())) {
log.warn(String.format("Cluster %s does not exist in service %s",
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName()));
+ memberContext.getClusterId(),
+ memberContext.getCartridgeType()));
return;
}
- Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
- Member member = cluster.getMember(instanceStartedEvent.getMemberId());
+ Member member = service.getCluster(memberContext.getClusterId()).
+ getMember(memberContext.getMemberId());
if (member == null) {
log.warn(String.format("Member %s does not exist",
- instanceStartedEvent.getMemberId()));
+ memberContext.getMemberId()));
return;
}
try {
TopologyManager.acquireWriteLock();
+
+ // Set ip addresses
+ member.setDefaultPrivateIP(memberContext.getDefaultPrivateIP());
+ if (memberContext.getPrivateIPs() != null) {
+ member.setMemberPrivateIPs(Arrays.asList(memberContext.getPrivateIPs()));
+ }
+ member.setDefaultPublicIP(memberContext.getDefaultPublicIP());
+ if (memberContext.getPublicIPs() != null) {
+ member.setMemberPublicIPs(Arrays.asList(memberContext.getPublicIPs()));
+ }
+
// try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Starting)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " +
- MemberStatus.Starting);
+ if (!member.isStateTransitionValid(MemberStatus.Initialized)) {
+ log.error("Invalid state transition from " + member.getStatus() + " to " +
+ MemberStatus.Initialized);
return;
} else {
- member.setStatus(MemberStatus.Starting);
- log.info("member started event adding status started");
+ member.setStatus(MemberStatus.Initialized);
+ log.info("Member status updated to initialized");
TopologyManager.updateTopology(topology);
- //memberStartedEvent.
- TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+ //member intialized time
+ Long timeStamp = System.currentTimeMillis();
+ TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
//publishing data
- BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
- instanceStartedEvent.getPartitionId(),
- instanceStartedEvent.getNetworkPartitionId(),
- instanceStartedEvent.getClusterId(),
- instanceStartedEvent.getServiceName(),
- MemberStatus.Starting.toString(),
- null);
+ BAMUsageDataPublisher.publish(memberContext.getMemberId(),
+ memberContext.getPartition().getUuid(),
+ memberContext.getNetworkPartitionId(),
+ memberContext.getClusterInstanceId(),
+ memberContext.getClusterId(),
+ memberContext.getCartridgeType(),
+ MemberStatus.Initialized.toString(),
+ timeStamp, null, null, null);
}
} finally {
TopologyManager.releaseWriteLock();
}
- } catch (Exception e) {
- String message = String.format("Could not handle member started event: [application-id] %s " +
- "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(),
- instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId());
- log.warn(message, e);
- }
- }
-
- public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceActivatedEvent.getServiceName());
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceActivatedEvent.getServiceName()));
- return;
}
- Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceActivatedEvent.getClusterId()));
- return;
+ private static int findKubernetesServicePort (String clusterId, List < KubernetesService > kubernetesServices,
+ PortMapping portMapping){
+ for (KubernetesService kubernetesService : kubernetesServices) {
+ if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
+ return kubernetesService.getPort();
+ }
+ }
+ throw new RuntimeException("Kubernetes service port not found: [cluster-id] " + clusterId + " [port] "
+ + portMapping.getPort());
}
- Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceActivatedEvent.getMemberId()));
- return;
- }
+ public static void handleMemberStarted (InstanceStartedEvent instanceStartedEvent){
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceStartedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceStartedEvent.getServiceName()));
+ return;
+ }
+ if (!service.clusterExists(instanceStartedEvent.getClusterId())) {
+ log.warn(String.format("Cluster %s does not exist in service %s",
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName()));
+ return;
+ }
- MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
- instanceActivatedEvent.getServiceName(),
- instanceActivatedEvent.getClusterId(),
- instanceActivatedEvent.getClusterInstanceId(),
- instanceActivatedEvent.getMemberId(),
- instanceActivatedEvent.getNetworkPartitionId(),
- instanceActivatedEvent.getPartitionId());
-
- // grouping - set grouid
- //TODO
- memberActivatedEvent.setApplicationId(null);
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.Active)) {
- log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]");
- return;
- } else {
- member.setStatus(MemberStatus.Active);
+ Cluster cluster = service.getCluster(instanceStartedEvent.getClusterId());
+ Member member = cluster.getMember(instanceStartedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceStartedEvent.getMemberId()));
+ return;
+ }
- // Set member ports
try {
- Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
- if (cartridge == null) {
- throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
- service.getServiceName()));
- }
-
- Port port;
- int portValue;
- List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.Starting)) {
+ log.error("Invalid State Transition from " + member.getStatus() + " to " +
+ MemberStatus.Starting);
+ return;
+ } else {
+ member.setStatus(MemberStatus.Starting);
+ log.info("member started event adding status started");
- for (PortMapping portMapping : portMappings) {
- if (kubernetesServices != null) {
- portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
- } else {
- portValue = portMapping.getPort();
- }
- port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
- member.addPort(port);
- memberActivatedEvent.addPort(port);
+ TopologyManager.updateTopology(topology);
+ //member started time
+ Long timeStamp = System.currentTimeMillis();
+ //memberStartedEvent.
+ TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
+ //publishing data
+ BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
+ instanceStartedEvent.getPartitionId(),
+ instanceStartedEvent.getNetworkPartitionId(),
+ instanceStartedEvent.getClusterInstanceId(),
+ instanceStartedEvent.getClusterId(),
+ instanceStartedEvent.getServiceName(),
+ MemberStatus.Starting.toString(),
+ timeStamp, null, null, null);
}
- } catch (Exception e) {
- String message = String.format("Could not add member ports: [service-name] %s [member-id] %s",
- memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId());
- log.error(message, e);
+ } finally {
+ TopologyManager.releaseWriteLock();
}
-
- // Set member ip addresses
- memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
- memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
- memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
- memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
- TopologyManager.updateTopology(topology);
-
- // Publish member activated event
- TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
-
- // Publish statistics data
- BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
- memberActivatedEvent.getPartitionId(),
- memberActivatedEvent.getNetworkPartitionId(),
- memberActivatedEvent.getClusterId(),
- memberActivatedEvent.getServiceName(),
- MemberStatus.Active.toString(),
- null);
+ } catch (Exception e) {
+ String message = String.format("Could not handle member started event: [application-id] %s " +
+ "[service-name] %s [member-id] %s", instanceStartedEvent.getApplicationId(),
+ instanceStartedEvent.getServiceName(), instanceStartedEvent.getMemberId());
+ log.warn(message, e);
}
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
-
- public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceReadyToShutdownEvent.getServiceName()));
- return;
- }
-
- Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceReadyToShutdownEvent.getClusterId()));
- return;
}
+ public static void handleMemberActivated (InstanceActivatedEvent instanceActivatedEvent){
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceActivatedEvent.getServiceName());
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceActivatedEvent.getServiceName()));
+ return;
+ }
- Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceReadyToShutdownEvent.getMemberId()));
- return;
- }
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
- instanceReadyToShutdownEvent.getServiceName(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getClusterInstanceId(),
- instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
+ Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceActivatedEvent.getClusterId()));
+ return;
+ }
- if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to " +
- MemberStatus.ReadyToShutDown);
+ Member member = cluster.getMember(instanceActivatedEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceActivatedEvent.getMemberId()));
return;
}
- member.setStatus(MemberStatus.ReadyToShutDown);
- log.info("Member Ready to shut down event adding status started");
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- //publishing data
- BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
- instanceReadyToShutdownEvent.getPartitionId(),
- instanceReadyToShutdownEvent.getNetworkPartitionId(),
- instanceReadyToShutdownEvent.getClusterId(),
- instanceReadyToShutdownEvent.getServiceName(),
- MemberStatus.ReadyToShutDown.toString(),
- null);
- //termination of particular instance will be handled by autoscaler
- }
+ MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(
+ instanceActivatedEvent.getServiceName(),
+ instanceActivatedEvent.getClusterId(),
+ instanceActivatedEvent.getClusterInstanceId(),
+ instanceActivatedEvent.getMemberId(),
+ instanceActivatedEvent.getNetworkPartitionId(),
+ instanceActivatedEvent.getPartitionId());
+
+ // grouping - set grouid
+ //TODO
+ memberActivatedEvent.setApplicationId(null);
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.Active)) {
+ log.error("Invalid state transition from [" + member.getStatus() + "] to [" +
+ MemberStatus.Active + "]");
+ return;
+ } else {
+ member.setStatus(MemberStatus.Active);
- public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
- throws InvalidMemberException, InvalidCartridgeTypeException {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
- //update the status of the member
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- instanceMaintenanceModeEvent.getServiceName()));
- return;
- }
+ // Set member ports
+ try {
+ Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(service.getServiceUuid());
+ if (cartridge == null) {
+ throw new RuntimeException(String.format("Cartridge not found: [cartridge-type] %s",
+ service.getServiceName()));
+ }
- Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- instanceMaintenanceModeEvent.getClusterId()));
- return;
- }
+ Port port;
+ int portValue;
+ List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
- Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- instanceMaintenanceModeEvent.getMemberId()));
- return;
+ for (PortMapping portMapping : portMappings) {
+ if (kubernetesServices != null) {
+ portValue = findKubernetesServicePort(clusterId, kubernetesServices, portMapping);
+ } else {
+ portValue = portMapping.getPort();
+ }
+ port = new Port(portMapping.getProtocol(), portValue, portMapping.getProxyPort());
+ member.addPort(port);
+ memberActivatedEvent.addPort(port);
+ }
+ } catch (Exception e) {
+ String message = String.format("Could not add member ports: [service-name] %s [member-id] %s",
+ memberActivatedEvent.getServiceName(), memberActivatedEvent.getMemberId());
+ log.error(message, e);
+ }
+
+ // Set member ip addresses
+ memberActivatedEvent.setDefaultPrivateIP(member.getDefaultPrivateIP());
+ memberActivatedEvent.setMemberPrivateIPs(member.getMemberPrivateIPs());
+ memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
+ memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
+ TopologyManager.updateTopology(topology);
+ //member activated time
+ Long timeStamp = System.currentTimeMillis();
+ // Publish member activated event
+ TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
+
+ // Publish statistics data
+ BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
+ memberActivatedEvent.getPartitionId(),
+ memberActivatedEvent.getNetworkPartitionId(),
+ memberActivatedEvent.getClusterInstanceId(),
+ memberActivatedEvent.getClusterId(),
+ memberActivatedEvent.getServiceName(),
+ MemberStatus.Active.toString(),
+ timeStamp, null, null, null);
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
}
+ public static void handleMemberReadyToShutdown (InstanceReadyToShutdownEvent instanceReadyToShutdownEvent)
+ throws InvalidMemberException, InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceReadyToShutdownEvent.getServiceName()));
+ return;
+ }
- MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
- instanceMaintenanceModeEvent.getServiceName(),
- instanceMaintenanceModeEvent.getClusterId(),
- instanceMaintenanceModeEvent.getClusterInstanceId(),
- instanceMaintenanceModeEvent.getMemberId(),
- instanceMaintenanceModeEvent.getNetworkPartitionId(),
- instanceMaintenanceModeEvent.getPartitionId());
- try {
- TopologyManager.acquireWriteLock();
- // try update lifecycle state
- if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
- log.error("Invalid State Transition from " + member.getStatus() + " to "
- + MemberStatus.In_Maintenance);
+ Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceReadyToShutdownEvent.getClusterId()));
return;
}
- member.setStatus(MemberStatus.In_Maintenance);
- log.info("member maintenance mode event adding status started");
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- //publishing data
- TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
- }
+ Member member = cluster.getMember(instanceReadyToShutdownEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceReadyToShutdownEvent.getMemberId()));
+ return;
+ }
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent(
+ instanceReadyToShutdownEvent.getServiceName(),
+ instanceReadyToShutdownEvent.getClusterId(),
+ instanceReadyToShutdownEvent.getClusterInstanceId(),
+ instanceReadyToShutdownEvent.getMemberId(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getPartitionId());
+ //member ReadyToShutDown state change time
+ Long timeStamp = null;
+ try {
+ TopologyManager.acquireWriteLock();
- /**
- * Remove member from topology and send member terminated event.
- *
- * @param serviceName
- * @param clusterId
- * @param networkPartitionId
- * @param partitionId
- * @param memberId
- */
- public static void handleMemberTerminated(String serviceName, String clusterId,
- String networkPartitionId, String partitionId,
- String memberId) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(serviceName);
- Properties properties;
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- serviceName));
- return;
- }
- Cluster cluster = service.getCluster(clusterId);
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterId));
- return;
- }
+ if (!member.isStateTransitionValid(MemberStatus.ReadyToShutDown)) {
+ log.error("Invalid State Transition from " + member.getStatus() + " to " +
+ MemberStatus.ReadyToShutDown);
+ return;
+ }
+ member.setStatus(MemberStatus.ReadyToShutDown);
+ log.info("Member Ready to shut down event adding status started");
- Member member = cluster.getMember(memberId);
- if (member == null) {
- log.warn(String.format("Member %s does not exist",
- memberId));
- return;
+ TopologyManager.updateTopology(topology);
+ timeStamp = System.currentTimeMillis();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ //publishing data
+ BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
+ instanceReadyToShutdownEvent.getPartitionId(),
+ instanceReadyToShutdownEvent.getNetworkPartitionId(),
+ instanceReadyToShutdownEvent.getClusterInstanceId(),
+ instanceReadyToShutdownEvent.getClusterId(),
+ instanceReadyToShutdownEvent.getServiceName(),
+ MemberStatus.ReadyToShutDown.toString(),
+ timeStamp, null, null, null);
+ //termination of particular instance will be handled by autoscaler
}
- String clusterInstanceId = member.getClusterInstanceId();
+ public static void handleMemberMaintenance (InstanceMaintenanceModeEvent instanceMaintenanceModeEvent)
+ throws InvalidMemberException, InvalidCartridgeTypeException {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName());
+ //update the status of the member
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ instanceMaintenanceModeEvent.getServiceName()));
+ return;
+ }
- try {
- TopologyManager.acquireWriteLock();
- properties = member.getProperties();
- cluster.removeMember(member);
- TopologyManager.updateTopology(topology);
- } finally {
- TopologyManager.releaseWriteLock();
- }
- /* @TODO leftover from grouping_poc*/
- String groupAlias = null;
- TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
- clusterInstanceId, networkPartitionId,
- partitionId, properties, groupAlias);
- }
+ Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ instanceMaintenanceModeEvent.getClusterId()));
+ return;
+ }
- public static void handleMemberSuspended() {
- //TODO
- try {
- TopologyManager.acquireWriteLock();
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ Member member = cluster.getMember(instanceMaintenanceModeEvent.getMemberId());
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ instanceMaintenanceModeEvent.getMemberId()));
+ return;
+ }
- public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterStatusClusterActivatedEvent.getServiceName()));
- return;
- }
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent(
+ instanceMaintenanceModeEvent.getServiceName(),
+ instanceMaintenanceModeEvent.getClusterId(),
+ instanceMaintenanceModeEvent.getClusterInstanceId(),
+ instanceMaintenanceModeEvent.getMemberId(),
+ instanceMaintenanceModeEvent.getNetworkPartitionId(),
+ instanceMaintenanceModeEvent.getPartitionId());
+ try {
+ TopologyManager.acquireWriteLock();
+ // try update lifecycle state
+ if (!member.isStateTransitionValid(MemberStatus.In_Maintenance)) {
+ log.error("Invalid State Transition from " + member.getStatus() + " to "
+ + MemberStatus.In_Maintenance);
+ return;
+ }
+ member.setStatus(MemberStatus.In_Maintenance);
+ log.info("member maintenance mode event adding status started");
- Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterStatusClusterActivatedEvent.getClusterId()));
- return;
- }
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ //publishing data
+ TopologyEventPublisher.sendMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
- String clusterId = cluster.getClusterId();
- ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
- if (clusterContext == null) {
- log.warn("Cluster context not found: [cluster-id] " + clusterId);
- return;
}
- ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
- new ClusterInstanceActivatedEvent(
- clusterStatusClusterActivatedEvent.getAppId(),
- clusterStatusClusterActivatedEvent.getServiceName(),
- clusterStatusClusterActivatedEvent.getClusterId(),
- clusterStatusClusterActivatedEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
- cluster.setKubernetesServices(kubernetesServices);
- clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
-
- ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster instance context is not found for [cluster] " +
- clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
- clusterStatusClusterActivatedEvent.getInstanceId());
+ /**
+ * Remove member from topology and send member terminated event.
+ *
+ * @param serviceName
+ * @param clusterId
+ * @param networkPartitionId
+ * @param partitionId
+ * @param memberId
+ */
+ public static void handleMemberTerminated (String serviceName, String clusterId,
+ String networkPartitionId, String partitionId,
+ String memberId){
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(serviceName);
+ Properties properties;
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ serviceName));
return;
}
- ClusterStatus status = ClusterStatus.Active;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster activated adding status started for " + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- // publish event
- TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(),
- context.getStatus(), status));
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterId));
return;
}
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ log.warn(String.format("Member %s does not exist",
+ memberId));
+ return;
+ }
- public static void handleClusterInactivateEvent(
- ClusterStatusClusterInactivateEvent clusterInactivateEvent) {
- Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(clusterInactivateEvent.getServiceName());
- //update the status of the cluster
- if (service == null) {
- log.warn(String.format("Service %s does not exist",
- clusterInactivateEvent.getServiceName()));
- return;
+ String clusterInstanceId = member.getClusterInstanceId();
+
+ try {
+ TopologyManager.acquireWriteLock();
+ properties = member.getProperties();
+ cluster.removeMember(member);
+ TopologyManager.updateTopology(topology);
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
+ /* @TODO leftover from grouping_poc*/
+ String groupAlias = null;
+ TopologyEventPublisher.sendMemberTerminatedEvent(serviceName, clusterId, memberId,
+ clusterInstanceId, networkPartitionId,
+ partitionId, properties, groupAlias);
}
- Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId());
- if (cluster == null) {
- log.warn(String.format("Cluster %s does not exist",
- clusterInactivateEvent.getClusterId()));
- return;
+ public static void handleMemberSuspended () {
+ //TODO
+ try {
+ TopologyManager.acquireWriteLock();
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
}
- ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
- new ClusterInstanceInactivateEvent(
- clusterInactivateEvent.getAppId(),
- clusterInactivateEvent.getServiceName(),
- clusterInactivateEvent.getClusterId(),
- clusterInactivateEvent.getInstanceId());
- try {
- TopologyManager.acquireWriteLock();
- ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- clusterInactivateEvent.getClusterId() + " [instance-id] " +
- clusterInactivateEvent.getInstanceId());
+ public static void handleClusterActivatedEvent (ClusterStatusClusterActivatedEvent
+ clusterStatusClusterActivatedEvent){
+
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ clusterStatusClusterActivatedEvent.getServiceName()));
return;
}
- ClusterStatus status = ClusterStatus.Inactive;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
- context.getStatus(), status));
+
+ Cluster cluster = service.getCluster(clusterStatusClusterActivatedEvent.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ clusterStatusClusterActivatedEvent.getClusterId()));
return;
}
- } finally {
- TopologyManager.releaseWriteLock();
- }
- }
+ String clusterId = cluster.getClusterId();
+ ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+ if (clusterContext == null) {
+ log.warn("Cluster context not found: [cluster-id] " + clusterId);
+ return;
+ }
- private static void deleteAppResourcesFromMetadataService(ApplicationInstanceTerminatedEvent event) {
- try {
- MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
- metadataClient.deleteApplicationProperties(event.getAppId());
- } catch (Exception e) {
- log.error("Error occurred while deleting the application resources frm metadata service ", e);
- }
- }
-
- public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEvent event) {
+ ClusterInstanceActivatedEvent clusterInstanceActivatedEvent =
+ new ClusterInstanceActivatedEvent(
+ clusterStatusClusterActivatedEvent.getAppId(),
+ clusterStatusClusterActivatedEvent.getServiceName(),
+ clusterStatusClusterActivatedEvent.getClusterId(),
+ clusterStatusClusterActivatedEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ cluster.setKubernetesServices(kubernetesServices);
+ clusterInstanceActivatedEvent.setKubernetesServices(kubernetesServices);
+
+ ClusterInstance context = cluster.getInstanceContexts(clusterStatusClusterActivatedEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster instance context is not found for [cluster] " +
+ clusterStatusClusterActivatedEvent.getClusterId() + " [instance-id] " +
+ clusterStatusClusterActivatedEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Active;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster activated adding status started for " + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ // publish event
+ TopologyEventPublisher.sendClusterActivatedEvent(clusterInstanceActivatedEvent);
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ clusterStatusClusterActivatedEvent.getClusterId(),
+ clusterStatusClusterActivatedEvent.getInstanceId(),
+ context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
+ }
- TopologyManager.acquireWriteLock();
+ }
- try {
+ public static void handleClusterInactivateEvent (
+ ClusterStatusClusterInactivateEvent clusterInactivateEvent){
Topology topology = TopologyManager.getTopology();
- Service service = topology.getService(event.getServiceName());
-
+ Service service = topology.getService(clusterInactivateEvent.getServiceName());
//update the status of the cluster
if (service == null) {
log.warn(String.format("Service %s does not exist",
- event.getServiceName()));
+ clusterInactivateEvent.getServiceName()));
return;
}
- Cluster cluster = service.getCluster(event.getClusterId());
+ Cluster cluster = service.getCluster(clusterInactivateEvent.getClusterId());
if (cluster == null) {
log.warn(String.format("Cluster %s does not exist",
- event.getClusterId()));
+ clusterInactivateEvent.getClusterId()));
return;
}
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
+ ClusterInstanceInactivateEvent clusterInactivatedEvent1 =
+ new ClusterInstanceInactivateEvent(
+ clusterInactivateEvent.getAppId(),
+ clusterInactivateEvent.getServiceName(),
+ clusterInactivateEvent.getClusterId(),
+ clusterInactivateEvent.getInstanceId());
+ try {
+ TopologyManager.acquireWriteLock();
+ ClusterInstance context = cluster.getInstanceContexts(clusterInactivateEvent.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ clusterInactivateEvent.getClusterId() + " [instance-id] " +
+ clusterInactivateEvent.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Inactive;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Inactive adding status started for" + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ TopologyEventPublisher.sendClusterInactivateEvent(clusterInactivatedEvent1);
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ clusterInactivateEvent.getClusterId(), clusterInactivateEvent.getInstanceId(),
+ context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- ClusterStatus status = ClusterStatus.Terminated;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminated adding status started for and removing the cluster instance"
- + cluster.getClusterId());
- cluster.removeInstanceContext(event.getInstanceId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(), event.getInstanceId());
+ }
- TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
- return;
+
+ private static void deleteAppResourcesFromMetadataService (ApplicationInstanceTerminatedEvent event){
+ try {
+ MetaDataServiceClient metadataClient = new DefaultMetaDataServiceClient();
+ metadataClient.deleteApplicationProperties(event.getAppId());
+ } catch (Exception e) {
+ log.error("Error occurred while deleting the application resources frm metadata service ", e);
}
- } finally {
- TopologyManager.releaseWriteLock();
}
+ public static void handleClusterTerminatedEvent (ClusterStatusClusterTerminatedEvent event){
- }
+ TopologyManager.acquireWriteLock();
- public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminatingEvent event) {
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Service service = topology.getService(event.getServiceName());
- TopologyManager.acquireWriteLock();
+ //update the status of the cluster
+ if (service == null) {
+ log.warn(String.format("Service %s does not exist",
+ event.getServiceName()));
+ return;
+ }
- try {
- Topology topology = TopologyManager.getTopology();
- Cluster cluster = topology.getService(event.getServiceName()).
- getCluster(event.getClusterId());
+ Cluster cluster = service.getCluster(event.getClusterId());
+ if (cluster == null) {
+ log.warn(String.format("Cluster %s does not exist",
+ event.getClusterId()));
+ return;
+ }
- if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) {
- log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " +
- ClusterStatus.Terminating);
- }
- ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
- if (context == null) {
- log.warn("Cluster Instance Context is not found for [cluster] " +
- event.getClusterId() + " [instance-id] " +
- event.getInstanceId());
- return;
+ ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminated;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminated adding status started for and removing the cluster instance"
+ + cluster.getClusterId());
+ cluster.removeInstanceContext(event.getInstanceId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(
+ event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
+
+ TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ return;
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- ClusterStatus status = ClusterStatus.Terminating;
- if (context.isStateTransitionValid(status)) {
- context.setStatus(status);
- log.info("Cluster Terminating started for " + cluster.getClusterId());
- TopologyManager.updateTopology(topology);
- //publishing data
- ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(),
- event.getServiceName(), event.getClusterId(), event.getInstanceId());
- TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
- // Remove kubernetes services if available
- ClusterContext clusterContext =
- CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
- if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
- KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+ }
+
+ public static void handleClusterTerminatingEvent (ClusterStatusClusterTerminatingEvent event){
+
+ TopologyManager.acquireWriteLock();
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ Cluster cluster = topology.getService(event.getServiceName()).
+ getCluster(event.getClusterId());
+
+ if (!cluster.isStateTransitionValid(ClusterStatus.Terminating, event.getInstanceId())) {
+ log.error("Invalid state transfer from " + cluster.getStatus(event.getInstanceId()) + " to " +
+ ClusterStatus.Terminating);
}
- } else {
- log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
- " [instance-id] %s [current-status] %s [status-requested] %s",
- event.getClusterId(), event.getInstanceId(),
- context.getStatus(), status));
+ ClusterInstance context = cluster.getInstanceContexts(event.getInstanceId());
+ if (context == null) {
+ log.warn("Cluster Instance Context is not found for [cluster] " +
+ event.getClusterId() + " [instance-id] " +
+ event.getInstanceId());
+ return;
+ }
+ ClusterStatus status = ClusterStatus.Terminating;
+ if (context.isStateTransitionValid(status)) {
+ context.setStatus(status);
+ log.info("Cluster Terminating started for " + cluster.getClusterId());
+ TopologyManager.updateTopology(topology);
+ //publishing data
+ ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(
+ event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId());
+
+ TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
+
+ // Remove kubernetes services if available
+ ClusterContext clusterContext =
+ CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
+ if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+ KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId());
+ }
+ } else {
+ log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " +
+ " [instance-id] %s [current-status] %s [status-requested] %s",
+ event.getClusterId(), event.getInstanceId(),
+ context.getStatus(), status));
+ }
+ } finally {
+ TopologyManager.releaseWriteLock();
}
- } finally {
- TopologyManager.releaseWriteLock();
}
}
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/54283eda/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 316984a..96bb38c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -460,13 +460,13 @@ public class CloudControllerServiceImpl implements CloudControllerService {
clusterContext.setVolumes(volumes);
}
- // Handle member created event
- TopologyBuilder.handleMemberCreatedEvent(memberContext);
-
// Persist member context
CloudControllerContext.getInstance().addMemberContext(memberContext);
CloudControllerContext.getInstance().persist();
+ // Handle member created event
+ TopologyBuilder.handleMemberCreatedEvent(memberContext);
+
// Start instance in a new thread
if (log.isDebugEnabled()) {
log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " +