You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/11/01 18:38:49 UTC
[2/3] git commit: Changes in Autoscaler for M1
Changes in Autoscaler for M1
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/79e07c13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/79e07c13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/79e07c13
Branch: refs/heads/master
Commit: 79e07c133ba517be447dcb1ae188560514b1b85c
Parents: c1af995
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Fri Nov 1 22:50:55 2013 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Fri Nov 1 22:50:55 2013 +0530
----------------------------------------------------------------------
.../stratos/autoscaler/ClusterContext.java | 80 ++++---
.../apache/stratos/autoscaler/Constants.java | 76 ++++++
.../algorithm/AutoscaleAlgorithm.java | 10 +-
.../autoscaler/algorithm/OneAfterAnother.java | 198 +++++++++-------
.../autoscaler/algorithm/RoundRobin.java | 230 +++++++++++--------
.../cloud/controller/CloudControllerClient.java | 30 +--
.../AverageRequestInFlightEventProcessor.java | 92 --------
...GradientOfRequestInFlightEventProcessor.java | 86 -------
.../processor/HealthStatEventProcessor.java | 31 ---
...rivativeOfRequestInFlightEventProcessor.java | 89 -------
.../message/receiver/TopologyManager.java | 3 +-
.../health/HealthEventMessageDelegator.java | 133 ++++++++---
.../health/HealthEventMessageReceiver.java | 1 -
.../topology/TopologyEventMessageReceiver.java | 8 +-
.../autoscaler/policy/model/Partition.java | 7 +-
.../rule/AutoscalerRuleEvaluator.java | 42 ++--
.../stratos/autoscaler/rule/Evaluator.java | 190 ---------------
.../autoscaler/rule/ExecutorTaskScheduler.java | 23 +-
.../src/main/resources/autoscaler.drl | 188 ++++++++-------
19 files changed, 668 insertions(+), 849 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
index 5b9c423..152afab 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
@@ -37,22 +37,25 @@ public class ClusterContext {
private float requestsInFlightSecondDerivative;
private float requestsInFlightGradient;
+
+ private int memberCount;
//This map will keep number of instance count against partitionId
- private Map<String, Integer> partitionCountMap;
+// TODO private Map<String, Integer> partitionCountMap;
private int currentPartitionIndex;
private Properties properties;
- private Map<String, MemberContext> memberStatMap;
+ private Map<String, MemberContext> memberContextMap;
public ClusterContext(String clusterId, String serviceId) {
this.clusterId = clusterId;
this.serviceId = serviceId;
- memberStatMap = new HashMap<String, MemberContext>();
- partitionCountMap = new HashMap<String, Integer>();
+ memberContextMap = new HashMap<String, MemberContext>();
+ //TODO partitionCountMap = new HashMap<String, Integer>();
+ memberCount = 0;
}
public String getClusterId() {
@@ -105,7 +108,7 @@ public class ClusterContext {
*/
public void addMemberContext(MemberContext memberContext) {
- memberStatMap.put(memberContext.getMemberId(), memberContext);
+ memberContextMap.put(memberContext.getMemberId(), memberContext);
}
/**
@@ -114,40 +117,49 @@ public class ClusterContext {
*/
public void removeMemberContext(String memberId){
- memberStatMap.remove(memberId);
+ memberContextMap.remove(memberId);
}
- public void increaseMemberCountInPartition(String partitionId, int count){
+ public void increaseMemberCount(int count){
+ memberCount += count;
- partitionCountMap.put(partitionId, partitionCountMap.get(partitionId) + count);
}
+ public void decreaseMemberCount(){
+ memberCount --;
- public void decreaseMemberCountInPartition(String partitionId, int count){
-
- partitionCountMap.put(partitionId, partitionCountMap.get(partitionId) - count);
- }
-
- public void addPartitionCount(String partitionId, int count){
-
- partitionCountMap.put(partitionId, count);
- }
-
- public void removePartitionCount(String partitionId){
-
- partitionCountMap.remove(partitionId);
}
- public boolean partitionCountExists(String partitionId){
- return partitionCountMap.containsKey(partitionId);
- }
-
- public int getPartitionCount(String partitionId){
- return partitionCountMap.get(partitionId);
- }
+// TODO public void increaseMemberCountInPartition(String partitionId, int count){
+//
+// partitionCountMap.put(partitionId, partitionCountMap.get(partitionId) + count);
+// }
+//
+// public void decreaseMemberCountInPartition(String partitionId, int count){
+//
+// partitionCountMap.put(partitionId, partitionCountMap.get(partitionId) - count);
+// }
+//
+// public void addPartitionCount(String partitionId, int count){
+//
+// partitionCountMap.put(partitionId, count);
+// }
+//
+// public void removePartitionCount(String partitionId){
+//
+// partitionCountMap.remove(partitionId);
+// }
+//
+// public boolean partitionCountExists(String partitionId){
+// return partitionCountMap.containsKey(partitionId);
+// }
+//
+// public int getPartitionCount(String partitionId){
+// return partitionCountMap.get(partitionId);
+// }
- public void setMemberStatMap(Map<String, MemberContext> memberStatMap) {
+ public void setMemberContextMap(Map<String, MemberContext> memberContextMap) {
- this.memberStatMap = memberStatMap;
+ this.memberContextMap = memberContextMap;
}
public String getServiceId() {
@@ -161,4 +173,12 @@ public class ClusterContext {
public void setCurrentPartitionIndex(int currentPartitionIndex) {
this.currentPartitionIndex = currentPartitionIndex;
}
+
+ public int getMemberCount() {
+ return memberCount;
+ }
+
+ public void setMemberCount(int memberCount) {
+ this.memberCount = memberCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
index ce78b84..18e215e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
@@ -7,4 +7,80 @@ public class Constants {
public static String ROUND_ROBIN_ALGORITHM_ID = "round-robin";
public static String ONE_AFTER_ANOTHER_ALGORITHM_ID = "one-after-another";
+
+ public static String GRADIENT_OF_REQUESTS_IN_FLIGHT = "gradient_of_requests_in_flight";
+ public static String AVERAGE_REQUESTS_IN_FLIGHT = "average_requests_in_flight";
+ public static String SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT = "second_derivative_of_requests_in_flight";
+
+// public void a(){
+// Cluster cluster = null;
+//// log.info("cluster " + clusterId);
+// String clusterId = cluster.getClusterId();
+//
+//
+// ClusterContext clusterContext = null;
+//// = $context.getClusterContext(clusterId);
+// AutoscalerContext context = null;
+// if(null==clusterContext){
+//
+// clusterContext = new ClusterContext(cluster.getClusterId(), cluster.getServiceName()) ;
+// AutoscalePolicy policy = PolicyManager.getInstance().getPolicy(cluster.getAutoscalePolicyName());
+//
+// for(Partition partition: policy.getHAPolicy().getPartitions()){
+// clusterContext.addPartitionCount(partition.getId(), 0);
+// }
+// context.addClusterContext(clusterContext);
+//
+// }
+//
+//
+// float lbStatAverage = clusterContext.getAverageRequestsInFlight();
+// float lbStatGradient = clusterContext.getRequestsInFlightGradient();
+// float lbStatSecondDerivative = clusterContext.getRequestsInFlightSecondDerivative();
+//
+// LoadThresholds loadThresholds = manager.getPolicy(cluster.autoscalePolicyName).getLoadThresholds();
+// float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
+// float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
+// float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative()
+// String partitionAlgorithm = manager.getPolicy(cluster.autoscalePolicyName).getHAPolicy().getPartitionAlgo();
+// log.info("partitionAlgorithm " + partitionAlgorithm);
+//
+// AutoscaleAlgorithm autoscaleAlgorithm = null;
+// if(Constants.ROUND_ROBIN_ALGORITHM_ID.equals(partitionAlgorithm)){
+//
+// autoscaleAlgorithm = new RoundRobin();
+// } else if(Constants.ONE_AFTER_ANOTHER_ALGORITHM_ID.equals(partitionAlgorithm)){
+//
+// autoscaleAlgorithm = new OneAfterAnother();
+// }
+//
+//
+// if(lbStatAverage > averageLimit && lbStatGradient > gradientLimit){
+//// log.info("in scale up " );
+// int i = 0;
+// Partition partition = autoscaleAlgorithm.getScaleUpPartition(clusterId);
+// if(partition != null){
+//// log.info("gonna scale up " );
+// if(lbStatSecondDerivative > secondDerivative){
+// int numberOfInstancesToBeSpawned = 2; // take from a config
+// log.info("gonna scale up by 2 " );
+// evaluator.delegateSpawn(partition,clusterId, numberOfInstancesToBeSpawned);
+// //spawnInstances Two
+//
+// } else {
+// int numberOfInstancesToBeSpawned = 1;
+// evaluator.delegateSpawn(partition,clusterId, numberOfInstancesToBeSpawned);
+// //spawnInstances one
+// }
+// }
+// } else if(lbStatAverage < averageLimit && lbStatGradient < gradientLimit){
+//// log.info("in scale down " );
+// //terminate one
+// Partition partition = autoscaleAlgorithm.getScaleDownPartition(clusterId);
+// if(partition != null){
+//// log.info("gonna scale down " );
+// evaluator.delegateTerminate(partition,clusterId);
+// }
+// }
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
index 25a9fd5..8bccfe5 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
@@ -19,12 +19,14 @@
package org.apache.stratos.autoscaler.algorithm;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-
/**
*
*/
public interface AutoscaleAlgorithm {
- public Partition getNextScaleUpPartition(String clusterId);
- public Partition getNextScaleDownPartition(String clusterId);
+// public Partition getNextScaleUpPartition(String clusterId);
+// public Partition getNextScaleDownPartition(String clusterId);
+// public Partition getScaleUpPartition(String clusterId);
+// public Partition getScaleDownPartition(String clusterId);
+// public boolean scaleUpPartitionAvailable(String clusterId);
+// public boolean scaleDownPartitionAvailable(String clusterId);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
index 3dab3a6..92329cf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
@@ -19,90 +19,126 @@
package org.apache.stratos.autoscaler.algorithm;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-
/**
* Completes partitions in the order defined in autoscaler policy, go to next if current one reached the max limit
*/
public class OneAfterAnother implements AutoscaleAlgorithm{
- public Partition getNextScaleUpPartition(String clusterId){
-
- String policyId = null;
- int nextPartitionIndex;
- ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
- int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
-
- String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
-
- //Find relevant policyId using topology
- policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
-
-
- int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
-
- //Here in "one after another" algorithm, next partition is also the current partition unless it reached its max
- nextPartitionIndex = currentPartitionIndex;
-
- //Set next partition as current partition in Autoscaler Context
- AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
-
- //Find next partition
- Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
- .get(nextPartitionIndex);
- String nextPartitionId = nextPartition.getId();
-
- if(clusterContext.partitionCountExists(nextPartitionId)){
-
- //If the current partitions max is reached, it will try next partition
- if(clusterContext.getPartitionCount(nextPartitionId) >= nextPartition.getPartitionMax()){
-
- nextPartition = getNextScaleUpPartition(clusterId);
- }
- } else {
-
- //Add the partition count entry to cluster context
- AutoscalerContext.getInstance().getClusterContext(clusterId).addPartitionCount(nextPartitionId, 1);
- }
- return nextPartition;
- }
-
- public Partition getNextScaleDownPartition(String clusterId){
-
- String policyId = null;
- int nextPartitionIndex;
- ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
- int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
-
- String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
-
- //Find relevant policyId using topology
- policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
-
-
- int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
-
- //Here in "one after another" algorithm, next partition is also the current partition unless it reached its max
- nextPartitionIndex = currentPartitionIndex;
-
- //Set next partition as current partition in Autoscaler Context
- AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
-
- //Find next partition
- Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
- .get(nextPartitionIndex);
- String nextPartitionId = nextPartition.getId();
-
- if(clusterContext.partitionCountExists(nextPartitionId) &&
- (clusterContext.getPartitionCount(nextPartitionId) <= nextPartition.getPartitionMin())){
-
- //If the current partitions max is reached, it will try next partition
- nextPartition = getNextScaleDownPartition(clusterId);
- }
- return nextPartition;
- }
+// public Partition getNextScaleUpPartition(String clusterId){
+//
+// String policyId = null;
+// int nextPartitionIndex;
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
+//
+// String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
+//
+// //Find relevant policyId using topology
+// policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
+//
+//
+// int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
+//
+// //Here in "one after another" algorithm, next partition is also the current partition unless it reached its max
+// nextPartitionIndex = currentPartitionIndex;
+//
+// //Set next partition as current partition in Autoscaler Context
+// AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
+//
+// //Find next partition
+// Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
+// .get(nextPartitionIndex);
+// String nextPartitionId = nextPartition.getId();
+//
+// if(clusterContext.partitionCountExists(nextPartitionId)){
+//
+// //If the current partitions max is reached, it will try next partition
+// if(clusterContext.getPartitionCount(nextPartitionId) >= nextPartition.getPartitionMembersMax()){
+//
+// nextPartition = getNextScaleUpPartition(clusterId);
+// }
+// } else {
+//
+// //Add the partition count entry to cluster context
+// AutoscalerContext.getInstance().getClusterContext(clusterId).addPartitionCount(nextPartitionId, 1);
+// }
+// return nextPartition;
+// }
+//
+// public Partition getNextScaleDownPartition(String clusterId){
+//
+// String policyId = null;
+// int nextPartitionIndex;
+// Partition nextPartition = null;
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
+//
+// String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
+//
+// //Find relevant policyId using topology
+// policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
+//
+// //Find number of partitions relevant for this clusters policy
+// int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
+//
+// //Here in "one after another" algorithm, next partition is also the current partition unless it reached its max
+// nextPartitionIndex = currentPartitionIndex;
+//
+// //Set next partition as current partition in Autoscaler Context
+// AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
+//
+// //Find next partition
+// nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
+// .get(nextPartitionIndex);
+// String nextPartitionId = nextPartition.getId();
+//
+// if(clusterContext.partitionCountExists(nextPartitionId) &&
+// (clusterContext.getPartitionCount(nextPartitionId) <= nextPartition.getPartitionMembersMin())){
+//
+// //If the current partitions max is reached, it will try next partition
+// nextPartition = getNextScaleDownPartition(clusterId);
+// } else {
+//
+// }
+// return nextPartition;
+// }
+
+// public Partition getScaleDownPartition(String clusterId){
+// Partition partition = PolicyManager.getInstance().getPolicy("economyPolicy").getHAPolicy().getPartitions()
+// .get(0);
+//
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int currentPartitionMemberCount = 0;
+//
+// if(clusterContext.partitionCountExists(partition.getId())){
+//
+// currentPartitionMemberCount = clusterContext.getPartitionCount(partition.getId());
+// }
+// if(currentPartitionMemberCount <= partition.getPartitionMembersMin()) {
+//
+// partition = null;
+// }
+//
+// return partition;
+// }
+//
+//
+//
+// public Partition getScaleUpPartition(String clusterId){
+// Partition partition = PolicyManager.getInstance().getPolicy("economyPolicy").getHAPolicy().getPartitions()
+// .get(0);
+//
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int currentPartitionMemberCount = 0;
+//
+// if(clusterContext.partitionCountExists(partition.getId())){
+//
+// currentPartitionMemberCount = clusterContext.getPartitionCount(partition.getId());
+// }
+// if(currentPartitionMemberCount >= partition.getPartitionMembersMax()) {
+// partition = null;
+// }
+//
+// return partition;
+// }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
index e9faf27..e62affe 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
@@ -19,101 +19,147 @@
package org.apache.stratos.autoscaler.algorithm;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-
/**
* Select partition in round robin manner and return
*/
public class RoundRobin implements AutoscaleAlgorithm{
- public Partition getNextScaleUpPartition(String clusterId){
-
- String policyId;
- int nextPartitionIndex;
- ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
- int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
-
- String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
-
- //Find relevant policyId using topology
- policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
-
-
- int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
-
- if (currentPartitionIndex + 1 >= noOfPartitions) {
-
- nextPartitionIndex = 0;
- } else {
-
- nextPartitionIndex = currentPartitionIndex++;
- }
-
- //Set next partition as current partition in Autoscaler Context
- AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
-
- //Find next partition
- Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
- .get(nextPartitionIndex);
- String nextPartitionId = nextPartition.getId();
-
- if(clusterContext.partitionCountExists(nextPartitionId)){
-
- //If the current partitions max is reached, it will try next partition
- if(clusterContext.getPartitionCount(nextPartitionId) >= nextPartition.getPartitionMax()){
-
- nextPartition = getNextScaleUpPartition(clusterId);
- }
- } else {
-
- //Add the partition count entry to cluster context
- AutoscalerContext.getInstance().getClusterContext(clusterId).addPartitionCount(nextPartitionId, 1);
- }
- return nextPartition;
- }
-
-
- public Partition getNextScaleDownPartition(String clusterId){
-
- String policyId;
- int nextPartitionIndex;
- ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
- int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
-
- String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
-
- //Find relevant policyId using topology
- policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
-
-
- int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
-
- if (currentPartitionIndex - 1 >= noOfPartitions) {
-
- nextPartitionIndex = 0;
- } else {
-
- nextPartitionIndex = currentPartitionIndex--;
- }
-
- //Set next partition as current partition in Autoscaler Context
- AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
-
- //Find next partition
- Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
- .get(nextPartitionIndex);
- String nextPartitionId = nextPartition.getId();
-
- if(clusterContext.partitionCountExists(nextPartitionId)
- && (clusterContext.getPartitionCount(nextPartitionId) <= nextPartition.getPartitionMin())){
-
- //If the current partitions max is reached, it will try next partition
- nextPartition = getNextScaleDownPartition(clusterId);
- }
- return nextPartition;
- }
+// public Partition getNextScaleUpPartition(String clusterId){
+//
+// String policyId;
+// int nextPartitionIndex;
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
+//
+// String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
+//
+// //Find relevant policyId using topology
+// policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
+//
+//
+// int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
+//
+// if (currentPartitionIndex + 1 >= noOfPartitions) {
+//
+// nextPartitionIndex = 0;
+// } else {
+//
+// nextPartitionIndex = currentPartitionIndex++;
+// }
+//
+// //Set next partition as current partition in Autoscaler Context
+// AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
+//
+// //Find next partition
+// Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
+// .get(nextPartitionIndex);
+// String nextPartitionId = nextPartition.getId();
+//
+// if(clusterContext.partitionCountExists(nextPartitionId)){
+//
+// //If the current partitions max is reached, it will try next partition
+// if(clusterContext.getPartitionCount(nextPartitionId) >= nextPartition.getPartitionMembersMax()){
+//
+// nextPartition = getNextScaleUpPartition(clusterId);
+// }
+// } else {
+//
+// //Add the partition count entry to cluster context
+// AutoscalerContext.getInstance().getClusterContext(clusterId).addPartitionCount(nextPartitionId, 1);
+// }
+// return nextPartition;
+// }
+//
+//
+// public Partition getNextScaleDownPartition(String clusterId){
+//
+// String policyId;
+// int nextPartitionIndex;
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int currentPartitionIndex = clusterContext.getCurrentPartitionIndex();
+//
+// String serviceId = AutoscalerContext.getInstance().getClusterContext(clusterId).getServiceId();
+//
+// //Find relevant policyId using topology
+// policyId = TopologyManager.getTopology().getService(serviceId).getCluster(clusterId).getAutoscalePolicyName();
+//
+//
+// int noOfPartitions = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions().size();
+//
+// if (currentPartitionIndex - 1 >= noOfPartitions) {
+//
+// nextPartitionIndex = 0;
+// } else {
+//
+// nextPartitionIndex = currentPartitionIndex--;
+// }
+//
+// //Set next partition as current partition in Autoscaler Context
+// AutoscalerContext.getInstance().getClusterContext(clusterId).setCurrentPartitionIndex(nextPartitionIndex);
+//
+// //Find next partition
+// Partition nextPartition = PolicyManager.getInstance().getPolicy(policyId).getHAPolicy().getPartitions()
+// .get(nextPartitionIndex);
+// String nextPartitionId = nextPartition.getId();
+//
+// if(clusterContext.partitionCountExists(nextPartitionId)
+// && (clusterContext.getPartitionCount(nextPartitionId) <= nextPartition.getPartitionMembersMin())){
+//
+//
+// //If the current partitions max is reached, it will try next partition
+// nextPartition = getNextScaleDownPartition(clusterId);
+// }
+// return nextPartition;
+// }
+
+
+// public Partition getScaleDownPartition(String clusterId){
+// Partition partition = PolicyManager.getInstance().getPolicy("economyPolicy").getHAPolicy().getPartitions()
+// .get(0);
+//
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int partitionMemberCount = clusterContext.getPartitionCount(partition.getId());
+//
+// if(partitionMemberCount >= partition.getPartitionMembersMin()) {
+//
+// clusterContext.increaseMemberCountInPartition(partition.getId(), partitionMemberCount - 1);
+// } else{
+// partition = null;
+// }
+// return partition;
+// }
+
+
+// @Override
+// public boolean scaleUpPartitionAvailable(String clusterId) {
+// return false; //To change body of implemented methods use File | Settings | File Templates.
+// }
+//
+// @Override
+// public boolean scaleDownPartitionAvailable(String clusterId) {
+// return false; //To change body of implemented methods use File | Settings | File Templates.
+// } partition = null;
+// }
+//
+// return partition;
+// }
+
+
+
+// public Partition getScaleUpPartition(String clusterId){
+// Partition partition = PolicyManager.getInstance().getPolicy("economyPolicy").getHAPolicy().getPartitions()
+// .get(0);
+//
+// ClusterContext clusterContext = AutoscalerContext.getInstance().getClusterContext(clusterId);
+// int partitionMemberCount = clusterContext.getPartitionCount(partition.getId());
+//
+// if(partitionMemberCount <= partition.getPartitionMembersMax()) {
+//
+// clusterContext.increaseMemberCountInPartition(partition.getId(), partitionMemberCount + 1);
+// } else{
+// partition = null;
+// }
+//
+// return partition;
+// }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index 7603b12..dea4915 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -50,20 +50,25 @@ public class CloudControllerClient {
public void spawnInstances(Partition partition, String clusterId, int memberCountToBeIncreased) throws SpawningException {
//call CC spawnInstances method
+ log.info("Calling CC for spawning instances in cluster " + clusterId);
LocationScope locationScope = new LocationScope();
locationScope.setCloud(partition.getIaas());
locationScope.setRegion(partition.getZone());
try {
- stub.startInstances(clusterId, locationScope, memberCountToBeIncreased);
+ for(int i =0; i< memberCountToBeIncreased; i++){
+ stub.startInstance(clusterId, locationScope);
+ }
} catch (RemoteException e) {
- throw new SpawningException("Error occurred in cloud controller side while spawning instance", e );
+ log.error("Error occurred in cloud controller side while spawning instance");
+// throw new SpawningException("Error occurred in cloud controller side while spawning instance", e );
}
}
public void spawnAnInstance(Partition partition, String clusterId) throws SpawningException {
//call CC spawnInstances method
-
+
+ log.info("Calling CC for spawning an instance in cluster " + clusterId);
LocationScope locationScope = new LocationScope();
locationScope.setCloud(partition.getIaas());
locationScope.setRegion(partition.getZone());
@@ -71,14 +76,16 @@ public class CloudControllerClient {
try {
stub.startInstance(clusterId, locationScope);
} catch (RemoteException e) {
- throw new SpawningException("Error occurred in cloud controller side while spawning instance", e );
+
+ log.error("Error occurred in cloud controller side while spawning instance");
+// throw new SpawningException("Error occurred in cloud controller side while spawning instance", e );
}
}
public void terminate(Partition partition, String clusterId) throws TerminationException {
//call CC terminate method
-
+ log.info("Calling CC for terminating an instance in cluster " + clusterId);
LocationScope locationScope = new LocationScope();
locationScope.setCloud(partition.getIaas());
locationScope.setRegion(partition.getZone());
@@ -86,19 +93,12 @@ public class CloudControllerClient {
try {
stub.terminateInstance(clusterId, locationScope);
} catch (RemoteException e) {
- throw new TerminationException("Error occurred in cloud controller side while terminating instance", e );
+
+ log.error("Error occurred in cloud controller side while terminating instance");
+// throw new TerminationException("Error occurred in cloud controller side while terminating instance", e );
}
}
- public void terminateAll(String clusterId) throws TerminationException {
-
- try {
- stub.terminateAllInstances(clusterId);
- } catch (RemoteException e) {
- throw new TerminationException("Error occurred in cloud controller side while terminating instances", e );
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/AverageRequestInFlightEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/AverageRequestInFlightEventProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/AverageRequestInFlightEventProcessor.java
deleted file mode 100644
index 4bdbd24..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/AverageRequestInFlightEventProcessor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.event.processor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.autoscaler.event.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Map;
-
-public class AverageRequestInFlightEventProcessor implements HealthStatEventProcessor {
-
- private static final Log log = LogFactory.getLog(AverageRequestInFlightEventProcessor.class);
- private HealthStatEventProcessor nextMsgProcessor;
-
- @Override
- public void setNext(HealthStatEventProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message) {
- try {
- if (AverageRequestsInFlightEvent.class.getName().equals(type)) {
-
- // Parse complete message and build event
- AverageRequestsInFlightEvent event = (AverageRequestsInFlightEvent) Util.jsonToObject(message,
- AverageRequestsInFlightEvent.class);
-
- String clusterId = event.getClusterId();
- //Get all values of services Map from topology
- for (Service service : ((Map<String, Service>) TopologyManager.getTopology().getServices()).values()){
-
- if(service.clusterExists(clusterId)){
-
- AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
- if(!autoscalerContext.clusterExists(clusterId)){
-
- ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName());
- autoscalerContext.addClusterContext(clusterContext);
- }
- autoscalerContext.getClusterContext(clusterId).setAverageRequestsInFlight(event.getValue());
- break;
- }
- }
-
- return true;
-
- } else {
-
- if (nextMsgProcessor != null) {
-
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message);
- }
- }
- } catch (Exception e) {
-
- if (nextMsgProcessor != null) {
-
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message);
- } else {
-
- throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the" +
- " available processors.", message, type));
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/GradientOfRequestInFlightEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/GradientOfRequestInFlightEventProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/GradientOfRequestInFlightEventProcessor.java
deleted file mode 100644
index 4d1dcb6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/GradientOfRequestInFlightEventProcessor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.event.processor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.event.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Map;
-
-public class GradientOfRequestInFlightEventProcessor implements HealthStatEventProcessor {
-
- private static final Log log = LogFactory.getLog(GradientOfRequestInFlightEventProcessor.class);
- private HealthStatEventProcessor nextMsgProcessor;
-
- @Override
- public void setNext(HealthStatEventProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message) {
- try {
- if (GradientOfRequestsInFlightEvent.class.getName().equals(type)) {
-
- // Parse complete message and build event
- GradientOfRequestsInFlightEvent event
- = (GradientOfRequestsInFlightEvent) Util.jsonToObject(message, GradientOfRequestsInFlightEvent.class);
-
- String clusterId = event.getClusterId();
- //Get all values of services Map from topology
- for (Service service : ((Map<String, Service>) TopologyManager.getTopology().getServices()).values()){
-
- if(service.clusterExists(clusterId)){
-
- AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
- if(!autoscalerContext.clusterExists(clusterId)){
-
- ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName());
- autoscalerContext.addClusterContext(clusterContext);
- }
- autoscalerContext.getClusterContext(clusterId).setRequestsInFlightGradient(event.getValue());
- break;
- }
- }
- return true;
-
- } else {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message);
- }
- }
- } catch (Exception e) {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message);
- } else {
- throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the available processors.",
- message, type));
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/HealthStatEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/HealthStatEventProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/HealthStatEventProcessor.java
deleted file mode 100644
index de7c23f..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/HealthStatEventProcessor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event.processor;
-
-/**
- * Interface which defined health event processor interface
- */
-public interface HealthStatEventProcessor {
-
- public void setNext(HealthStatEventProcessor nextProcessor);
-
- public boolean process(String type, String message);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/SecondDerivativeOfRequestInFlightEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/SecondDerivativeOfRequestInFlightEventProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/SecondDerivativeOfRequestInFlightEventProcessor.java
deleted file mode 100644
index f986a86..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/processor/SecondDerivativeOfRequestInFlightEventProcessor.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.event.processor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.ClusterContext;
-//import org.apache.stratos.autoscaler.event.AverageRequestsInFlightEvent;
-import org.apache.stratos.autoscaler.event.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.util.Util;
-
-import java.util.Map;
-
-public class SecondDerivativeOfRequestInFlightEventProcessor implements HealthStatEventProcessor {
-
- private static final Log log = LogFactory.getLog(SecondDerivativeOfRequestInFlightEventProcessor.class);
- private HealthStatEventProcessor nextMsgProcessor;
-
- @Override
- public void setNext(HealthStatEventProcessor nextProcessor) {
- nextMsgProcessor = nextProcessor;
- }
-
- @Override
- public boolean process(String type, String message) {
- try {
- if (SecondDerivativeOfRequestsInFlightEvent.class.getName().equals(type)) {
-
- // Parse complete message and build event
- SecondDerivativeOfRequestsInFlightEvent event
- = (SecondDerivativeOfRequestsInFlightEvent) Util.jsonToObject(message,
- SecondDerivativeOfRequestsInFlightEvent.class);
-
- String clusterId = event.getClusterId();
- //Get all values of services Map from topology
- for (Service service : ((Map<String, Service>) TopologyManager.getTopology().getServices()).values()){
-
- if(service.clusterExists(clusterId)){
-
- AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
- if(!autoscalerContext.clusterExists(clusterId)){
-
- ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName());
- autoscalerContext.addClusterContext(clusterContext);
- }
- autoscalerContext.getClusterContext(clusterId).setRequestsInFlightSecondDerivative(event.getValue());
- break;
- }
- }
- return true;
-
-
- } else {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message);
- }
- }
- } catch (Exception e) {
- if (nextMsgProcessor != null) {
- // ask the next processor to take care of the message.
- return nextMsgProcessor.process(type, message);
- } else {
- throw new RuntimeException(String.format("Failed to process the message: %s of type %s using any of the available processors.",
- message, type));
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/TopologyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/TopologyManager.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/TopologyManager.java
index 2bd42c2..93d1f24 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/TopologyManager.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/TopologyManager.java
@@ -2,6 +2,7 @@ package org.apache.stratos.autoscaler.message.receiver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageReceiver;
import org.apache.stratos.autoscaler.message.receiver.topology.TopologyEventMessageReceiver;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
import org.apache.stratos.messaging.domain.topology.Topology;
@@ -58,7 +59,7 @@ public class TopologyManager {
}
TopicSubscriber healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
- healthStatTopicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+ healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
healthStatTopicSubscriberThread.start();
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
index bae1370..92b7960 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
@@ -18,15 +18,22 @@
*/
package org.apache.stratos.autoscaler.message.receiver.health;
+import com.google.gson.stream.JsonReader;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.event.processor.AverageRequestInFlightEventProcessor;
-import org.apache.stratos.autoscaler.event.processor.GradientOfRequestInFlightEventProcessor;
-import org.apache.stratos.autoscaler.event.processor.SecondDerivativeOfRequestInFlightEventProcessor;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.Constants;
import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.StringReader;
/**
@@ -35,45 +42,111 @@ import javax.jms.TextMessage;
public class HealthEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class);
-
+ private String eventName;
+ private float value;
+ private String clusterId;
+
@Override
public void run() {
log.info("Health stat event message processor started");
- // instantiate all the relevant processors
- AverageRequestInFlightEventProcessor processor1 = new AverageRequestInFlightEventProcessor();
- GradientOfRequestInFlightEventProcessor processor2 = new GradientOfRequestInFlightEventProcessor();
- SecondDerivativeOfRequestInFlightEventProcessor processor3 = new SecondDerivativeOfRequestInFlightEventProcessor();
-
- // link all the relevant processors in the required order
- processor1.setNext(processor2);
- processor2.setNext(processor3);
-
while (true) {
try {
TextMessage message = HealthEventQueue.getInstance().take();
- // retrieve the header
- String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);
- // retrieve the actual message
- String json = message.getText();
+ String messageText = message.getText();
+ messageText = messageText.substring(messageText.indexOf('>') +1, messageText.lastIndexOf('<'));
+
+ setEventValues(messageText);
+
+ log.info(clusterId);
+ log.info(value);
+ log.info(eventName);
+ for (Service service : TopologyManager.getTopology().getServices()){
+
+ if(service.clusterExists(clusterId)){
+
+ if(!AutoscalerContext.getInstance().clusterExists(clusterId)){
+
+ Cluster cluster = service.getCluster(clusterId);
+ AutoscalePolicy autoscalePolicy = PolicyManager.getInstance().getPolicy(cluster.getAutoscalePolicyName());
+
+ ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName());
+
+ LoadThresholds loadThresholds = autoscalePolicy.getLoadThresholds();
+ float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
+ float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
+ float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
+
+ clusterContext.setAverageRequestsInFlight(averageLimit);
+ clusterContext.setRequestsInFlightGradient(gradientLimit);
+ clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+
+ AutoscalerContext.getInstance().addClusterContext(clusterContext);
+ }
+ break;
+ }
+ }
+ if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){
+ AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(value);
+
+ } else if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
+ AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightGradient(value);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Event message received from queue: %s", type));
- }
+ } else if(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
+ AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightSecondDerivative(value);
- try {
- TopologyManager.acquireWriteLock();
- processor1.process(type, json);
- } finally {
- TopologyManager.releaseWriteLock();
- }
+ }
} catch (Exception e) {
- String error = "Failed to retrieve the topology event message.";
- log.error(error, e);
- throw new RuntimeException(error, e);
+ String error = "Failed to retrieve the health stat event message.";
+ log.error(error);
}
}
}
+
+ public void setEventValues(String json) {
+
+ try {
+
+ BufferedReader bufferedReader = new BufferedReader(new StringReader(json));
+ JsonReader reader = new JsonReader(bufferedReader);
+ reader.beginObject();
+
+ if(reader.hasNext()) {
+
+ eventName = reader.nextName();
+ reader.beginObject();
+ if("cluster_id".equals(reader.nextName())) {
+
+ if(reader.hasNext()){
+
+ clusterId = reader.nextString();
+ }
+ }
+ if(reader.hasNext()) {
+
+ if ("value".equals(reader.nextName())) {
+
+ if(reader.hasNext()){
+
+ String stringValue = reader.nextString();
+ try {
+
+ value = Float.parseFloat(stringValue);
+ } catch (NumberFormatException ex) {
+ log.error("Error while converting health stat message value to float", ex);
+ }
+ }
+ }
+ }
+ }
+ reader.close();
+
+ } catch (Exception e) {
+ log.error( "Could not extract message header");
+// throw new RuntimeException("Could not extract message header", e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
index 85c04dd..866cd87 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
@@ -34,7 +34,6 @@ public class HealthEventMessageReceiver implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage receivedMessage = (TextMessage) message;
- log.info(receivedMessage);
try {
if (log.isDebugEnabled()) {
log.debug("Message received: " + ((TextMessage) message).getText());
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/TopologyEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/TopologyEventMessageReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/TopologyEventMessageReceiver.java
index ca5fc5b..2b1f4c9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/TopologyEventMessageReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/TopologyEventMessageReceiver.java
@@ -20,6 +20,7 @@ package org.apache.stratos.autoscaler.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -34,7 +35,12 @@ public class TopologyEventMessageReceiver implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage receivedMessage = (TextMessage) message;
- log.info(receivedMessage);
+ String header = null;
+ try {
+ header = receivedMessage.getStringProperty(Constants.EVENT_CLASS_NAME);
+ } catch (JMSException e) {
+ log.info(e.getMessage());
+ }
try {
if (log.isDebugEnabled()) {
log.debug("Message received: " + ((TextMessage) message).getText());
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/model/Partition.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/model/Partition.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/model/Partition.java
index 403c968..d8cb78d 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/model/Partition.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/model/Partition.java
@@ -29,12 +29,15 @@ public class Partition {
private String id;
private String iaas;
private String zone;
+ private boolean maxReached;
+ private boolean minReached;
+
/**
* Gets the value of the partitionMax property.
*
*/
- public int getPartitionMax() {
+ public int getPartitionMembersMax() {
return partitionMax;
}
@@ -50,7 +53,7 @@ public class Partition {
* Gets the value of the partitionMin property.
*
*/
- public int getPartitionMin() {
+ public int getPartitionMembersMin() {
return partitionMin;
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
index 01ac657..2766a83 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
@@ -44,6 +44,7 @@ public class AutoscalerRuleEvaluator {
private static AutoscalerRuleEvaluator instance = null;
private static final String DRL_FILE_NAME = "autoscaler.drl";
+ //TODO move .drl file outside jar
private KnowledgeBase kbase;
private StatefulKnowledgeSession ksession;
@@ -59,7 +60,6 @@ public class AutoscalerRuleEvaluator {
public void evaluate(Service service){
try {
- log.info("Evaluating rule for service " + service.getServiceName());
ksession = kbase.newStatefulKnowledgeSession();
ksession.setGlobal("$context", AutoscalerContext.getInstance());
@@ -77,28 +77,34 @@ public class AutoscalerRuleEvaluator {
public boolean delegateSpawn(Partition partition, String clusterId) {
CloudControllerClient cloudControllerClient = new CloudControllerClient();
try {
- cloudControllerClient.spawnAnInstance(partition, clusterId);
+
+ Partition partition1 = PolicyManager.getInstance().getPolicy("economyPolicy").getHAPolicy().getPartitions().get(0);
+
+ log.info("partition1.getId() " + partition1.getId());
+ int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+ log.info("Current member count is " + currentMemberCount );
+
+ if(currentMemberCount < partition.getPartitionMembersMax()) {
+ AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(1);
+ cloudControllerClient.spawnAnInstance(partition, clusterId);
+ }
+
} catch (Throwable e) {
log.error("Cannot spawn an instance", e);
}
return false;
}
-
- public boolean delegateTerminateAll(String clusterId) {
- CloudControllerClient cloudControllerClient = new CloudControllerClient();
- try {
- cloudControllerClient.terminateAll(clusterId);
- return true;
- } catch (Throwable e) {
- log.error("Cannot terminate instance", e);
- }
- return false;
- }
public boolean delegateTerminate(Partition partition, String clusterId) {
CloudControllerClient cloudControllerClient = new CloudControllerClient();
try {
- cloudControllerClient.terminate(partition, clusterId);
+
+ int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+ log.info("Current member count is " + currentMemberCount );
+ if(currentMemberCount > partition.getPartitionMembersMin()) {
+ AutoscalerContext.getInstance().getClusterContext(clusterId).decreaseMemberCount();
+ cloudControllerClient.terminate(partition, clusterId);
+ }
return true;
} catch (Throwable e) {
log.error("Cannot terminate instance", e);
@@ -109,7 +115,13 @@ public class AutoscalerRuleEvaluator {
public boolean delegateSpawn(Partition partition, String clusterId, int memberCountToBeIncreased) {
CloudControllerClient cloudControllerClient = new CloudControllerClient();
try {
- cloudControllerClient.spawnAnInstance(partition, clusterId);
+ int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+ log.info("Current member count is " + currentMemberCount );
+
+ if(currentMemberCount < partition.getPartitionMembersMax()) {
+ AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(memberCountToBeIncreased);
+ cloudControllerClient.spawnInstances(partition, clusterId, memberCountToBeIncreased);
+ }
return true;
} catch (Throwable e) {
log.error("Cannot spawn an instance", e);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/Evaluator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/Evaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/Evaluator.java
deleted file mode 100644
index e9acbe3..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/Evaluator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.rule;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.Constants;
-import org.apache.stratos.autoscaler.algorithm.AutoscaleAlgorithm;
-import org.apache.stratos.autoscaler.algorithm.OneAfterAnother;
-import org.apache.stratos.autoscaler.algorithm.RoundRobin;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.SpawningException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
-
-/**
- * This is the rules evaluator
- */
-public class Evaluator {
-
- private static final Log log = LogFactory.getLog(Evaluator.class);
-
- CloudControllerClient cloudControllerClient = new CloudControllerClient();
-
- public void evaluatePojos() {
-
- // Get all values of services Map from topology
- Topology topology = TopologyManager.getTopology();
- for (String clusterId : AutoscalerContext.getInstance().getClusterContexes().keySet()) {
- boolean clusterAvailable = false;
- for (Service service : topology.getServices()) {
- if (service.clusterExists(clusterId)) {
- clusterAvailable = true;
- }
- }
- if (!clusterAvailable) {
- try {
- cloudControllerClient.terminateAll(clusterId);
- AutoscalerContext.getInstance().removeClusterContext(clusterId);
- } catch (Throwable e) {
- log.info("Error occurred " ,e);
- }
-
- }
- }
-
- for (Service service : topology.getServices()) {
- // Get all values of clusters from service
- for (Cluster cluster : service.getClusters()) {
- if (AutoscalerContext.getInstance().getClusterContext(cluster.getClusterId()) == null) {
- ClusterContext cC = new ClusterContext(cluster.getClusterId(),
- cluster.getServiceName());
-
- AutoscalePolicy policy = PolicyManager.getInstance().getPolicy(
- cluster.getAutoscalePolicyName());
- for (Partition partition : policy.getHAPolicy().getPartitions()) {
- cC.addPartitionCount(partition.getId(), 0);
- }
- AutoscalerContext.getInstance().addClusterContext(cC);
- }
- minimumCheck(cluster.getClusterId(), cluster.getServiceName(),
- cluster.getAutoscalePolicyName());
-
- healthCheck(cluster.getClusterId(), cluster.getAutoscalePolicyName());
-
- }
- }
- }
-
- public void minimumCheck(String clusterId, String serviceId, String autoscalePolicyId) {
-
- AutoscalePolicy policy = PolicyManager.getInstance().getPolicy(autoscalePolicyId);
-
- for (Partition partition : policy.getHAPolicy().getPartitions()) {
- String partitionId = partition.getId();
- int partitionMin = partition.getPartitionMin();
-
- int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId)
- .getPartitionCount(partitionId);
-
- if (currentMemberCount < partitionMin) {
-
- int memberCountToBeIncreased = partitionMin - currentMemberCount;
- try {
- cloudControllerClient.spawnInstances(partition, clusterId,
- memberCountToBeIncreased);
- } catch (SpawningException e) {
- log.info("Error occurred " ,e);
- }
- // catch if there is error, and try to spawnInstances again
- // update current member count if success
- AutoscalerContext.getInstance().getClusterContext(clusterId)
- .increaseMemberCountInPartition(partitionId, memberCountToBeIncreased);
- }
-
- }
- }
-
- public void healthCheck(String clusterId, String autoscalePolicyId) {
-
- ClusterContext clusterContext = AutoscalerContext.getInstance()
- .getClusterContext(clusterId);
-
- float lbStatAverage = clusterContext.getAverageRequestsInFlight();
- float lbStatGradient = clusterContext.getRequestsInFlightGradient();
- float lbStatSecondDerivative = clusterContext.getRequestsInFlightSecondDerivative();
-
- LoadThresholds loadThresholds = PolicyManager.getInstance().getPolicy(autoscalePolicyId)
- .getLoadThresholds();
- float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
- float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
- float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
-
- String partitionAlgorithm = PolicyManager.getInstance().getPolicy(autoscalePolicyId)
- .getHAPolicy().getPartitionAlgo();
-
- AutoscaleAlgorithm autoscaleAlgorithm = null;
- if (Constants.ROUND_ROBIN_ALGORITHM_ID.equals(partitionAlgorithm)) {
-
- autoscaleAlgorithm = new RoundRobin();
- } else if (Constants.ONE_AFTER_ANOTHER_ALGORITHM_ID.equals(partitionAlgorithm)) {
-
- autoscaleAlgorithm = new OneAfterAnother();
- }
-
- if (lbStatAverage > averageLimit && lbStatGradient > gradientLimit) {
-
- Partition partition = autoscaleAlgorithm.getNextScaleUpPartition(clusterId);
-
- if (lbStatSecondDerivative > secondDerivative) {
-
- int numberOfInstancesToBeSpawned = 2; // take from a config
-
- try {
- cloudControllerClient.spawnInstances(partition, clusterId,
- numberOfInstancesToBeSpawned);
- } catch (SpawningException e) {
- log.info("Error occurred " ,e);
- }
- // spawnInstances Two
-
- } else {
-
- try {
- cloudControllerClient.spawnAnInstance(partition, clusterId);
- } catch (SpawningException e) {
- log.info("Error occurred " ,e);
- }
- // spawnInstances one
- }
- } else if (lbStatAverage < averageLimit && lbStatGradient < gradientLimit) {
-
- // terminate one
- Partition partition = autoscaleAlgorithm.getNextScaleDownPartition(clusterId);
- try {
- cloudControllerClient.terminate(partition, clusterId);
- } catch (TerminationException e) {
- log.info("Error occurred " ,e);
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/79e07c13/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
index bcc2296..54ab376 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
@@ -19,15 +19,17 @@
package org.apache.stratos.autoscaler.rule;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.message.receiver.TopologyManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* This class is responsible for scheduling the task of evaluating the current details of topology, statistics, and health
* status against the rules set(written in Drools)
@@ -41,6 +43,19 @@ public class ExecutorTaskScheduler {
try {
for (Service service : TopologyManager.getTopology().getServices()) {
+
+ //Remove cluster context if its already removed from Topology
+ for(String clusterContextId : AutoscalerContext.getInstance().getClusterContexes().keySet()){
+ boolean clusterAvailable = false;
+ for (Cluster cluster: service.getClusters()) {
+ if(cluster.getClusterId().equals(clusterContextId)){
+ clusterAvailable = true;
+ }
+ }
+ if(!clusterAvailable){
+ AutoscalerContext.getInstance().removeClusterContext(clusterContextId);
+ }
+ }
AutoscalerRuleEvaluator.getInstance().evaluate(service);
}