You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/12/04 13:31:14 UTC
[3/6] stratos git commit: adding status checking before monitoring
and fixing network partition issue
adding status checking before monitoring and fixing network partition issue
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ad400d98
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ad400d98
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ad400d98
Branch: refs/heads/master
Commit: ad400d984f9420acc1f3ed592b55cec1bf8b04df
Parents: 761b88f
Author: reka <rt...@gmail.com>
Authored: Thu Dec 4 17:05:41 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Thu Dec 4 17:05:41 2014 +0530
----------------------------------------------------------------------
.../context/cluster/ClusterInstanceContext.java | 13 +-
.../context/cluster/VMClusterContext.java | 20 +-
.../monitor/cluster/VMClusterMonitor.java | 186 +++++++++----------
3 files changed, 103 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/ad400d98/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
index bf6a051..a6fe42a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
@@ -66,31 +66,22 @@ public class ClusterInstanceContext extends InstanceContext {
private int maxMembers;
//details required for partition selection algorithms
private int currentPartitionIndex;
- private ChildLevelPartition[] partitions;
private String networkPartitionId;
public ClusterInstanceContext(String clusterInstanceId, String partitionAlgo,
- ChildLevelPartition[] partitions,
- int min, String networkPartitionId) {
+ int min, int max, String networkPartitionId) {
super(clusterInstanceId);
this.networkPartitionId = networkPartitionId;
this.setMinMembers(min);
- if (partitions == null) {
- this.partitions = new ChildLevelPartition[0];
- } else {
- this.partitions = Arrays.copyOf(partitions, partitions.length);
- }
partitionCtxts = new ArrayList<ClusterLevelPartitionContext>();
this.partitionAlgorithm = partitionAlgo;
//partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>();
requestsInFlight = new RequestsInFlight();
loadAverage = new LoadAverage();
memoryConsumption = new MemoryConsumption();
- for (ChildLevelPartition partition : partitions) {
- maxInstanceCount += partition.getMax();
- }
+ maxInstanceCount = max;
requiredInstanceCountBasedOnStats = minInstanceCount;
requiredInstanceCountBasedOnDependencies = minInstanceCount;
http://git-wip-us.apache.org/repos/asf/stratos/blob/ad400d98/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
index 2505cbe..f5cdb32 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
@@ -227,9 +227,9 @@ public class VMClusterContext extends AbstractClusterContext {
ChildLevelPartition childLevelPartition)
throws PolicyValidationException, PartitionValidationException {
//Getting the associated network partition
- ChildLevelNetworkPartition networkPartition = deploymentPolicy.
- getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());
- if (networkPartition == null) {
+ /* ChildLevelNetworkPartition networkPartition = deploymentPolicy.
+ getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());*/
+ if (clusterLevelNetworkPartitionContext == null) {
String msg =
"Network Partition is null in deployment policy: [policy-name]: " +
deploymentPolicy.getId();
@@ -237,6 +237,8 @@ public class VMClusterContext extends AbstractClusterContext {
throw new PolicyValidationException(msg);
}
+ String nPartitionId = clusterLevelNetworkPartitionContext.getId();
+
//Getting the associated partition
if (clusterInstance.getPartitionId() == null || childLevelPartition == null) {
String msg =
@@ -247,12 +249,12 @@ public class VMClusterContext extends AbstractClusterContext {
throw new PolicyValidationException(msg);
}
- //Creating cluster level network partitionContext, if not exist
+ /*//Creating cluster level network partitionContext, if not exist
if (clusterLevelNetworkPartitionContext == null) {
clusterLevelNetworkPartitionContext =
new ClusterLevelNetworkPartitionContext(clusterInstance.getNetworkPartitionId()
, networkPartition.getPartitionAlgo(), networkPartition.getMin());
- }
+ }*/
ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext.
getClusterInstanceContext(clusterInstance.getInstanceId());
@@ -270,8 +272,8 @@ public class VMClusterContext extends AbstractClusterContext {
ApplicationHolder.releaseReadLock();
}
clusterInstanceContext = new ClusterInstanceContext(clusterInstance.getInstanceId(),
- networkPartition.getPartitionAlgo(),
- networkPartition.getChildLevelPartitions(), networkPartition.getMin(), networkPartition.getId());
+ clusterLevelNetworkPartitionContext.getPartitionAlgorithm(),
+ minInstances, maxInstances , nPartitionId);
}
String partitionId;
if(childLevelPartition != null) {
@@ -283,8 +285,8 @@ public class VMClusterContext extends AbstractClusterContext {
partitionId = clusterInstance.getPartitionId();
}
//Retrieving the actual partition from application
- Partition appPartition = deploymentPolicy.getApplicationLevelNetworkPartition(networkPartition.getId()).
- getPartition(partitionId);
+ Partition appPartition = deploymentPolicy.getApplicationLevelNetworkPartition(nPartitionId).
+ getPartition(partitionId);
org.apache.stratos.cloud.controller.stub.domain.Partition partition =
convertTOCCPartition(appPartition);
http://git-wip-us.apache.org/repos/asf/stratos/blob/ad400d98/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
index befe3cd..edea5cc 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
@@ -34,9 +34,9 @@ import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInActiveProcessor;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
import org.apache.stratos.autoscaler.util.AutoScalerConstants;
import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.autoscaler.util.ConfUtil;
@@ -47,6 +47,7 @@ import org.apache.stratos.common.Property;
import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.domain.topology.Member;
@@ -54,7 +55,6 @@ import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.event.health.stat.*;
import org.apache.stratos.messaging.event.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.drools.runtime.StatefulKnowledgeSession;
import java.util.*;
@@ -138,19 +138,10 @@ public class VMClusterMonitor extends AbstractClusterMonitor {
public void run() {
while (!isDestroyed()) {
try {
- /* TODO ***********if (((getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
- (getStatus() == ClusterStatus.Inactive && !hasStartupDependents)) && !this.hasFaultyMember
- && !stop) {*/
if (log.isDebugEnabled()) {
log.debug("Cluster monitor is running.. " + this.toString());
}
monitor();
- /*} else {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }*/
} catch (Exception e) {
log.error("Cluster monitor: Monitor failed." + this.toString(), e);
}
@@ -189,109 +180,112 @@ public class VMClusterMonitor extends AbstractClusterMonitor {
final Collection<ClusterInstanceContext> clusterInstanceContexts = networkPartitionContext.
getClusterInstanceContextMap().values();
- for (final ClusterInstanceContext instanceContext : clusterInstanceContexts ) {
- /* TODO ***********if (((getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
- (getStatus() == ClusterStatus.Inactive && !hasStartupDependents)) && !this.hasFaultyMember
- && !stop) {*/
+ for (final ClusterInstanceContext instanceContext : clusterInstanceContexts) {
+ ClusterInstance instance = (ClusterInstance) this.instanceIdToInstanceMap.
+ get(instanceContext.getId());
+ if ((instance.getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
+ (getStatus() == ClusterStatus.Inactive && !hasStartupDependents) && !this.hasFaultyMember
+ && !stop) {
- Runnable monitoringRunnable = new Runnable() {
- @Override
- public void run() {
+ Runnable monitoringRunnable = new Runnable() {
+ @Override
+ public void run() {
- if (log.isDebugEnabled()) {
- log.debug("Monitor is running for [cluster] : " + getClusterId());
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Monitor is running for [cluster] : " + getClusterId());
+ }
- // store primary members in the cluster instance context
+ // store primary members in the cluster instance context
- // store primary members in the cluster instance context
- List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+ // store primary members in the cluster instance context
+ List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
- //FIXME to check the status of the instance
- if (true) {
+ //FIXME to check the status of the instance
+ if (true) {
- for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+ for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
- // get active primary members in this cluster instance context
- for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInClusterInstance.add(memberContext.getMemberId());
- }
- }
+ // get active primary members in this cluster instance context
+ for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ }
+ }
- // get pending primary members in this cluster instance context
- for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ // get pending primary members in this cluster instance context
+ for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ }
+ }
}
- }
- }
-
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
- getMinCheckKnowledgeSession().setGlobal("algorithmName",
- instanceContext.getPartitionAlgorithm());
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for cluster instance %s ",
- instanceContext.getId()));
- }
-
- minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
- minCheckFactHandle, instanceContext);
- obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
- getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, instanceContext);
+ getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+ getMinCheckKnowledgeSession().setGlobal("algorithmName",
+ instanceContext.getPartitionAlgorithm());
- //checking the status of the cluster
- boolean rifReset = instanceContext.isRifReset();
- boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
- boolean loadAverageReset = instanceContext.isLoadAverageReset();
-
- if (log.isDebugEnabled()) {
- log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
- + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
- + " [Is loadAverage Reset] : " + loadAverageReset);
- }
- if (rifReset || memoryConsumptionReset || loadAverageReset) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running minimum check for cluster instance %s ",
+ instanceContext.getId()));
+ }
+ minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
+ minCheckFactHandle, instanceContext);
- VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
+ obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+ getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, instanceContext);
- getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
- getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
- vmClusterContext.getAutoscalePolicy());
- getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
- getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
- getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
- getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
- getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
- getMinCheckKnowledgeSession().setGlobal("algorithmName",
- instanceContext.getPartitionAlgorithm());
+ //checking the status of the cluster
+ boolean rifReset = instanceContext.isRifReset();
+ boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
+ boolean loadAverageReset = instanceContext.isLoadAverageReset();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running scale check for [cluster instance context] %s ",
- instanceContext.getId()));
- log.debug(" Primary members : " + primaryMemberListInClusterInstance);
+ if (log.isDebugEnabled()) {
+ log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
+ + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
+ + " [Is loadAverage Reset] : " + loadAverageReset);
+ }
+ if (rifReset || memoryConsumptionReset || loadAverageReset) {
+
+
+ VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
+
+ getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
+ getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
+ vmClusterContext.getAutoscalePolicy());
+ getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+ getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+ getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+ getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+ getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
+ getMinCheckKnowledgeSession().setGlobal("algorithmName",
+ instanceContext.getPartitionAlgorithm());
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running scale check for [cluster instance context] %s ",
+ instanceContext.getId()));
+ log.debug(" Primary members : " + primaryMemberListInClusterInstance);
+ }
+
+ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
+ , scaleCheckFactHandle, instanceContext);
+
+ instanceContext.setRifReset(false);
+ instanceContext.setMemoryConsumptionReset(false);
+ instanceContext.setLoadAverageReset(false);
+ } else if (log.isDebugEnabled()) {
+ log.debug(String.format("Scale rule will not run since the LB statistics have not " +
+ "received before this cycle for [cluster instance context] %s ",
+ instanceContext.getId()));
+ }
}
-
- scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
- , scaleCheckFactHandle, instanceContext);
-
- instanceContext.setRifReset(false);
- instanceContext.setMemoryConsumptionReset(false);
- instanceContext.setLoadAverageReset(false);
- } else if (log.isDebugEnabled()) {
- log.debug(String.format("Scale rule will not run since the LB statistics have not " +
- "received before this cycle for [cluster instance context] %s ",
- instanceContext.getId()));
}
- }
- }
- };
- monitoringRunnable.run();
+ };
+ monitoringRunnable.run();
+ }
}
}
}