You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/12/04 13:31:14 UTC

[3/6] stratos git commit: adding status checking before monitoring and fixing network partition issue

adding status checking before monitoring and fixing network partition issue


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ad400d98
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ad400d98
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ad400d98

Branch: refs/heads/master
Commit: ad400d984f9420acc1f3ed592b55cec1bf8b04df
Parents: 761b88f
Author: reka <rt...@gmail.com>
Authored: Thu Dec 4 17:05:41 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Thu Dec 4 17:05:41 2014 +0530

----------------------------------------------------------------------
 .../context/cluster/ClusterInstanceContext.java |  13 +-
 .../context/cluster/VMClusterContext.java       |  20 +-
 .../monitor/cluster/VMClusterMonitor.java       | 186 +++++++++----------
 3 files changed, 103 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/ad400d98/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
index bf6a051..a6fe42a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
@@ -66,31 +66,22 @@ public class ClusterInstanceContext extends InstanceContext {
     private int maxMembers;
     //details required for partition selection algorithms
     private int currentPartitionIndex;
-    private ChildLevelPartition[] partitions;
 
     private String networkPartitionId;
 
     public ClusterInstanceContext(String clusterInstanceId, String partitionAlgo,
-                                  ChildLevelPartition[] partitions,
-                                  int min, String networkPartitionId) {
+                                  int min, int max, String networkPartitionId) {
 
         super(clusterInstanceId);
         this.networkPartitionId = networkPartitionId;
         this.setMinMembers(min);
-        if (partitions == null) {
-            this.partitions = new ChildLevelPartition[0];
-        } else {
-            this.partitions = Arrays.copyOf(partitions, partitions.length);
-        }
         partitionCtxts = new ArrayList<ClusterLevelPartitionContext>();
         this.partitionAlgorithm = partitionAlgo;
         //partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>();
         requestsInFlight = new RequestsInFlight();
         loadAverage = new LoadAverage();
         memoryConsumption = new MemoryConsumption();
-        for (ChildLevelPartition partition : partitions) {
-            maxInstanceCount += partition.getMax();
-        }
+        maxInstanceCount = max;
         requiredInstanceCountBasedOnStats = minInstanceCount;
         requiredInstanceCountBasedOnDependencies = minInstanceCount;
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/ad400d98/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
index 2505cbe..f5cdb32 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
@@ -227,9 +227,9 @@ public class VMClusterContext extends AbstractClusterContext {
             ChildLevelPartition childLevelPartition)
             throws PolicyValidationException, PartitionValidationException {
         //Getting the associated network partition
-        ChildLevelNetworkPartition networkPartition = deploymentPolicy.
-                getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());
-        if (networkPartition == null) {
+       /* ChildLevelNetworkPartition networkPartition = deploymentPolicy.
+                getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());*/
+        if (clusterLevelNetworkPartitionContext == null) {
             String msg =
                     "Network Partition is null in deployment policy: [policy-name]: " +
                             deploymentPolicy.getId();
@@ -237,6 +237,8 @@ public class VMClusterContext extends AbstractClusterContext {
             throw new PolicyValidationException(msg);
         }
 
+        String nPartitionId = clusterLevelNetworkPartitionContext.getId();
+
         //Getting the associated  partition
         if (clusterInstance.getPartitionId() == null || childLevelPartition == null) {
             String msg =
@@ -247,12 +249,12 @@ public class VMClusterContext extends AbstractClusterContext {
             throw new PolicyValidationException(msg);
         }
 
-        //Creating cluster level network partitionContext, if not exist
+        /*//Creating cluster level network partitionContext, if not exist
         if (clusterLevelNetworkPartitionContext == null) {
             clusterLevelNetworkPartitionContext =
                     new ClusterLevelNetworkPartitionContext(clusterInstance.getNetworkPartitionId()
                             , networkPartition.getPartitionAlgo(), networkPartition.getMin());
-        }
+        }*/
 
         ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext.
                                         getClusterInstanceContext(clusterInstance.getInstanceId());
@@ -270,8 +272,8 @@ public class VMClusterContext extends AbstractClusterContext {
                 ApplicationHolder.releaseReadLock();
             }
             clusterInstanceContext = new ClusterInstanceContext(clusterInstance.getInstanceId(),
-                    networkPartition.getPartitionAlgo(),
-                    networkPartition.getChildLevelPartitions(), networkPartition.getMin(), networkPartition.getId());
+                    clusterLevelNetworkPartitionContext.getPartitionAlgorithm(),
+                    minInstances, maxInstances , nPartitionId);
         }
         String partitionId;
         if(childLevelPartition != null) {
@@ -283,8 +285,8 @@ public class VMClusterContext extends AbstractClusterContext {
             partitionId = clusterInstance.getPartitionId();
         }
         //Retrieving the actual partition from application
-        Partition appPartition = deploymentPolicy.getApplicationLevelNetworkPartition(networkPartition.getId()).
-                getPartition(partitionId);
+        Partition appPartition = deploymentPolicy.getApplicationLevelNetworkPartition(nPartitionId).
+                                                                        getPartition(partitionId);
         org.apache.stratos.cloud.controller.stub.domain.Partition partition =
                 convertTOCCPartition(appPartition);
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/ad400d98/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
index befe3cd..edea5cc 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
@@ -34,9 +34,9 @@ import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
 import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
 import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
 import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInActiveProcessor;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
 import org.apache.stratos.autoscaler.util.AutoScalerConstants;
 import org.apache.stratos.autoscaler.util.AutoscalerUtil;
 import org.apache.stratos.autoscaler.util.ConfUtil;
@@ -47,6 +47,7 @@ import org.apache.stratos.common.Property;
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
 import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -54,7 +55,6 @@ import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.health.stat.*;
 import org.apache.stratos.messaging.event.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.drools.runtime.StatefulKnowledgeSession;
 
 import java.util.*;
 
@@ -138,19 +138,10 @@ public class VMClusterMonitor extends AbstractClusterMonitor {
     public void run() {
         while (!isDestroyed()) {
             try {
-                /* TODO ***********if  (((getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
-                        (getStatus() == ClusterStatus.Inactive && !hasStartupDependents)) && !this.hasFaultyMember
-                        && !stop) {*/
                 if (log.isDebugEnabled()) {
                     log.debug("Cluster monitor is running.. " + this.toString());
                 }
                 monitor();
-                /*} else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cluster monitor is suspended as the cluster is in " +
-                                ClusterStatus.Inactive + " mode......");
-                    }
-                }*/
             } catch (Exception e) {
                 log.error("Cluster monitor: Monitor failed." + this.toString(), e);
             }
@@ -189,109 +180,112 @@ public class VMClusterMonitor extends AbstractClusterMonitor {
             final Collection<ClusterInstanceContext> clusterInstanceContexts = networkPartitionContext.
                     getClusterInstanceContextMap().values();
 
-            for (final ClusterInstanceContext instanceContext : clusterInstanceContexts ) {
-                /* TODO ***********if  (((getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
-                        (getStatus() == ClusterStatus.Inactive && !hasStartupDependents)) && !this.hasFaultyMember
-                        && !stop) {*/
+            for (final ClusterInstanceContext instanceContext : clusterInstanceContexts) {
+                ClusterInstance instance = (ClusterInstance) this.instanceIdToInstanceMap.
+                        get(instanceContext.getId());
+                if ((instance.getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
+                        (getStatus() == ClusterStatus.Inactive && !hasStartupDependents) && !this.hasFaultyMember
+                                && !stop) {
 
-                Runnable monitoringRunnable = new Runnable() {
-                    @Override
-                    public void run() {
+                    Runnable monitoringRunnable = new Runnable() {
+                        @Override
+                        public void run() {
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("Monitor is running for [cluster] : " + getClusterId());
-                    }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Monitor is running for [cluster] : " + getClusterId());
+                            }
 
 
-                        // store primary members in the cluster instance context
+                            // store primary members in the cluster instance context
 
-                    // store primary members in the cluster instance context
-                    List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+                            // store primary members in the cluster instance context
+                            List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
 
-                    //FIXME to check the status of the instance
-                    if (true) {
+                            //FIXME to check the status of the instance
+                            if (true) {
 
-                        for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+                                for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
 
-                            // get active primary members in this cluster instance context
-                            for (MemberContext memberContext : partitionContext.getActiveMembers()) {
-                                if (isPrimaryMember(memberContext)) {
-                                    primaryMemberListInClusterInstance.add(memberContext.getMemberId());
-                                }
-                            }
+                                    // get active primary members in this cluster instance context
+                                    for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+                                        if (isPrimaryMember(memberContext)) {
+                                            primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                        }
+                                    }
 
-                            // get pending primary members in this cluster instance context
-                            for (MemberContext memberContext : partitionContext.getPendingMembers()) {
-                                if (isPrimaryMember(memberContext)) {
-                                    primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                    // get pending primary members in this cluster instance context
+                                    for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+                                        if (isPrimaryMember(memberContext)) {
+                                            primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                        }
+                                    }
                                 }
-                            }
-                        }
-
-                        getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                        getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
-                        getMinCheckKnowledgeSession().setGlobal("algorithmName",
-                                instanceContext.getPartitionAlgorithm());
-
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Running minimum check for cluster instance %s ",
-                                    instanceContext.getId()));
-                        }
-
-                        minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
-                                minCheckFactHandle, instanceContext);
 
-                        obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
-                                getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, instanceContext);
+                                getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                                getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                                getMinCheckKnowledgeSession().setGlobal("algorithmName",
+                                        instanceContext.getPartitionAlgorithm());
 
-                        //checking the status of the cluster
-                        boolean rifReset = instanceContext.isRifReset();
-                        boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
-                        boolean loadAverageReset = instanceContext.isLoadAverageReset();
-
-                        if (log.isDebugEnabled()) {
-                            log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
-                                    + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
-                                    + " [Is loadAverage Reset] : " + loadAverageReset);
-                        }
-                        if (rifReset || memoryConsumptionReset || loadAverageReset) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug(String.format("Running minimum check for cluster instance %s ",
+                                            instanceContext.getId()));
+                                }
 
+                                minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
+                                        minCheckFactHandle, instanceContext);
 
-                            VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
+                                obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+                                        getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, instanceContext);
 
-                            getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
-                            getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                            getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
-                                    vmClusterContext.getAutoscalePolicy());
-                            getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-                            getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-                            getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-                            getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
-                            getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
-                            getMinCheckKnowledgeSession().setGlobal("algorithmName",
-                                    instanceContext.getPartitionAlgorithm());
+                                //checking the status of the cluster
+                                boolean rifReset = instanceContext.isRifReset();
+                                boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
+                                boolean loadAverageReset = instanceContext.isLoadAverageReset();
 
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Running scale check for [cluster instance context] %s ",
-                                        instanceContext.getId()));
-                                log.debug(" Primary members : " + primaryMemberListInClusterInstance);
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
+                                            + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
+                                            + " [Is loadAverage Reset] : " + loadAverageReset);
+                                }
+                                if (rifReset || memoryConsumptionReset || loadAverageReset) {
+
+
+                                    VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
+
+                                    getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
+                                    getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                                    getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
+                                            vmClusterContext.getAutoscalePolicy());
+                                    getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                                    getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                                    getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+                                    getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+                                    getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
+                                    getMinCheckKnowledgeSession().setGlobal("algorithmName",
+                                            instanceContext.getPartitionAlgorithm());
+
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(String.format("Running scale check for [cluster instance context] %s ",
+                                                instanceContext.getId()));
+                                        log.debug(" Primary members : " + primaryMemberListInClusterInstance);
+                                    }
+
+                                    scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
+                                            , scaleCheckFactHandle, instanceContext);
+
+                                    instanceContext.setRifReset(false);
+                                    instanceContext.setMemoryConsumptionReset(false);
+                                    instanceContext.setLoadAverageReset(false);
+                                } else if (log.isDebugEnabled()) {
+                                    log.debug(String.format("Scale rule will not run since the LB statistics have not " +
+                                                    "received before this cycle for [cluster instance context] %s ",
+                                            instanceContext.getId()));
+                                }
                             }
-
-                            scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
-                                    , scaleCheckFactHandle, instanceContext);
-
-                            instanceContext.setRifReset(false);
-                            instanceContext.setMemoryConsumptionReset(false);
-                            instanceContext.setLoadAverageReset(false);
-                        } else if (log.isDebugEnabled()) {
-                            log.debug(String.format("Scale rule will not run since the LB statistics have not " +
-                                            "received before this cycle for [cluster instance context] %s ",
-                                    instanceContext.getId()));
                         }
-                    }
-                    }
-                };
-                monitoringRunnable.run();
+                    };
+                    monitoringRunnable.run();
+                }
             }
         }
     }