You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/04 10:01:52 UTC

stratos git commit: Running a thread per cluster instance for evaluating rules

Repository: stratos
Updated Branches:
  refs/heads/master 4aed87b3d -> 64e874796


Running a thread per cluster instance for evaluating rules


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/64e87479
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/64e87479
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/64e87479

Branch: refs/heads/master
Commit: 64e874796e1ff1fdd981995adf8da669d2c4e272
Parents: 4aed87b
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Thu Dec 4 14:33:29 2014 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Thu Dec 4 14:33:29 2014 +0530

----------------------------------------------------------------------
 .../monitor/cluster/VMClusterMonitor.java       | 161 +++++++++----------
 1 file changed, 78 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/64e87479/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
index 3064d3b..2744361 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
@@ -184,115 +184,110 @@ public class VMClusterMonitor extends AbstractClusterMonitor {
 
     public void monitor() {
 
-        final Collection<ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContexts =
-                ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts().values();
+        for (ClusterLevelNetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts()) {
 
-        Runnable monitoringRunnable = new Runnable() {
-            @Override
-            public void run() {
+            final Collection<ClusterInstanceContext> clusterInstanceContexts = networkPartitionContext.
+                    getClusterInstanceContextMap().values();
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Monitor is running for [cluster] : " + getClusterId());
-                }
+            for (final ClusterInstanceContext instanceContext : clusterInstanceContexts ) {
 
-                for (ClusterLevelNetworkPartitionContext networkPartitionContext :
-                        clusterLevelNetworkPartitionContexts) {
+                Runnable monitoringRunnable = new Runnable() {
+                    @Override
+                    public void run() {
 
-                    for (ClusterInstanceContext instanceContext : networkPartitionContext.
-                            getClusterInstanceContextMap().values()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Monitor is running for [cluster] : " + getClusterId());
+                    }
 
-                        // store primary members in the cluster instance context
-                        List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+                    // store primary members in the cluster instance context
+                    List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
 
-                        //FIXME to check the status of the instance
-                        if (true) {
+                    //FIXME to check the status of the instance
+                    if (true) {
 
-                            for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+                        for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
 
-                                // get active primary members in this cluster instance context
-                                for (MemberContext memberContext : partitionContext.getActiveMembers()) {
-                                    if (isPrimaryMember(memberContext)) {
-                                        primaryMemberListInClusterInstance.add(memberContext.getMemberId());
-                                    }
+                            // get active primary members in this cluster instance context
+                            for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+                                if (isPrimaryMember(memberContext)) {
+                                    primaryMemberListInClusterInstance.add(memberContext.getMemberId());
                                 }
+                            }
 
-                                // get pending primary members in this cluster instance context
-                                for (MemberContext memberContext : partitionContext.getPendingMembers()) {
-                                    if (isPrimaryMember(memberContext)) {
-                                        primaryMemberListInClusterInstance.add(memberContext.getMemberId());
-                                    }
+                            // get pending primary members in this cluster instance context
+                            for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+                                if (isPrimaryMember(memberContext)) {
+                                    primaryMemberListInClusterInstance.add(memberContext.getMemberId());
                                 }
                             }
+                        }
 
-                            getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                            getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
-                            getMinCheckKnowledgeSession().setGlobal("algorithmName",
-                                    networkPartitionContext.getPartitionAlgorithm());
+                        getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                        getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                        getMinCheckKnowledgeSession().setGlobal("algorithmName",
+                                instanceContext.getPartitionAlgorithm());
 
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Running minimum check for cluster instance %s ",
-                                        instanceContext.getId()));
-                            }
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Running minimum check for cluster instance %s ",
+                                    instanceContext.getId()));
+                        }
 
-                            minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
-                                    minCheckFactHandle, instanceContext);
+                        minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
+                                minCheckFactHandle, instanceContext);
 
-                            obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
-                                    getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, instanceContext);
+                        obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+                                getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, instanceContext);
 
-                            //checking the status of the cluster
-                            boolean rifReset = instanceContext.isRifReset();
-                            boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
-                            boolean loadAverageReset = instanceContext.isLoadAverageReset();
+                        //checking the status of the cluster
+                        boolean rifReset = instanceContext.isRifReset();
+                        boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
+                        boolean loadAverageReset = instanceContext.isLoadAverageReset();
 
-                            if (log.isDebugEnabled()) {
-                                log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
-                                        + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
-                                        + " [Is loadAverage Reset] : " + loadAverageReset);
-                            }
-                            if (rifReset || memoryConsumptionReset || loadAverageReset) {
-
-
-                                VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
-
-                                getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
-                                getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                                getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
-                                        vmClusterContext.getAutoscalePolicy());
-                                getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-                                getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-                                getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-                                getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
-                                getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
-                                getMinCheckKnowledgeSession().setGlobal("algorithmName",
-                                        networkPartitionContext.getPartitionAlgorithm());
-
-                                if (log.isDebugEnabled()) {
-                                    log.debug(String.format("Running scale check for network partition %s ",
-                                            networkPartitionContext.getId()));
-                                    log.debug(" Primary members : " + primaryMemberListInClusterInstance);
-                                }
+                        if (log.isDebugEnabled()) {
+                            log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
+                                    + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
+                                    + " [Is loadAverage Reset] : " + loadAverageReset);
+                        }
+                        if (rifReset || memoryConsumptionReset || loadAverageReset) {
 
-                                scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
-                                        , scaleCheckFactHandle, instanceContext);
 
-                                instanceContext.setRifReset(false);
-                                instanceContext.setMemoryConsumptionReset(false);
-                                instanceContext.setLoadAverageReset(false);
-                            } else if (log.isDebugEnabled()) {
-                                log.debug(String.format("Scale rule will not run since the LB statistics have not " +
-                                                "received before this cycle for network partition %s",
-                                        networkPartitionContext.getId()));
+                            VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
+
+                            getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
+                            getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                            getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
+                                    vmClusterContext.getAutoscalePolicy());
+                            getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                            getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                            getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+                            getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+                            getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
+                            getMinCheckKnowledgeSession().setGlobal("algorithmName",
+                                    instanceContext.getPartitionAlgorithm());
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Running scale check for [cluster instance context] %s ",
+                                        instanceContext.getId()));
+                                log.debug(" Primary members : " + primaryMemberListInClusterInstance);
                             }
 
+                            scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
+                                    , scaleCheckFactHandle, instanceContext);
 
+                            instanceContext.setRifReset(false);
+                            instanceContext.setMemoryConsumptionReset(false);
+                            instanceContext.setLoadAverageReset(false);
+                        } else if (log.isDebugEnabled()) {
+                            log.debug(String.format("Scale rule will not run since the LB statistics have not " +
+                                            "received before this cycle for [cluster instance context] %s ",
+                                    instanceContext.getId()));
                         }
                     }
-                }
-
+                    }
+                };
+                monitoringRunnable.run();
             }
-        };
-        monitoringRunnable.run();
+        }
     }
 
     @Override