You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/30 10:01:05 UTC

git commit: Fixing autoscaler component merge issues

Repository: stratos
Updated Branches:
  refs/heads/docker-grouping-merge 25340242f -> be8885862


Fixing autoscaler component merge issues


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/be888586
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/be888586
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/be888586

Branch: refs/heads/docker-grouping-merge
Commit: be8885862eb7c18230b7c1a066f06a35fee42c53
Parents: 2534024
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Oct 30 14:30:55 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Oct 30 14:30:55 2014 +0530

----------------------------------------------------------------------
 .../org.apache.stratos.autoscaler/pom.xml       |   1 -
 .../internal/AutoscalerServerComponent.java     |  22 +-
 .../monitor/AbstractClusterMonitor.java         |  46 ++-
 .../monitor/ApplicationMonitorFactory.java      | 127 +-------
 .../KubernetesServiceClusterMonitor.java        |   8 +-
 .../autoscaler/monitor/VMLbClusterMonitor.java  |  30 +-
 .../monitor/VMServiceClusterMonitor.java        |  40 +--
 .../monitor/cluster/ClusterMonitor.java         | 293 -------------------
 .../monitor/cluster/LbClusterMonitor.java       | 129 --------
 .../status/checker/StatusChecker.java           |  15 +-
 10 files changed, 108 insertions(+), 603 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/pom.xml b/components/org.apache.stratos.autoscaler/pom.xml
index c188021..6fcd1f0 100644
--- a/components/org.apache.stratos.autoscaler/pom.xml
+++ b/components/org.apache.stratos.autoscaler/pom.xml
@@ -149,7 +149,6 @@
 			<groupId>org.apache.stratos</groupId>
 			<artifactId>org.apache.stratos.messaging</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 		</dependency>
         <dependency>
             <groupId>org.apache.stratos</groupId>

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 5296635..4a8b269 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -53,9 +53,9 @@ import java.util.List;
 public class AutoscalerServerComponent {
 
     private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
-    AutoscalerTopologyEventReceiver asTopologyReceiver;
-//    TopicSubscriber healthStatTopicSubscriber;
-    AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+
+    private AutoscalerTopologyEventReceiver asTopologyReceiver;
+    private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
 
     protected void activate(ComponentContext componentContext) throws Exception {
         try {
@@ -66,21 +66,13 @@ public class AutoscalerServerComponent {
             if (log.isDebugEnabled()) {
                 log.debug("Topology receiver thread started");
             }
-//            healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
-//            healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
-//            Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
-//            healthStatTopicSubscriberThread.start();
-//            if (log.isDebugEnabled()) {
-//                log.debug("Health event message receiver thread started");
-//            }
-
 
             // Start health stat receiver
             autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
             Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
             healthDelegatorThread.start();
             if (log.isDebugEnabled()) {
-                log.debug("Health message processor thread started");
+                log.debug("Health statistics receiver thread started");
             }
 
             // Adding the registry stored partitions to the information model
@@ -122,10 +114,10 @@ public class AutoscalerServerComponent {
             }
 
             if (log.isInfoEnabled()) {
-                log.info("Autoscaler Server Component activated");
+                log.info("Autoscaler server Component activated");
             }
         } catch (Throwable e) {
-            log.error("Error in activating the autoscaler component ", e);
+            log.error("Error in activating autoscaler component", e);
         }
     }
 
@@ -149,7 +141,7 @@ public class AutoscalerServerComponent {
 
     protected void unsetRegistryService(RegistryService registryService) {
         if (log.isDebugEnabled()) {
-            log.debug("Unsetting the Registry Service");
+            log.debug("Un-setting the Registry Service");
         }
         ServiceReferenceHolder.getInstance().setRegistry(null);
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index 972ddad..030bc53 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -23,9 +23,15 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
 import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
 import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
 import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
 import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
@@ -54,11 +60,11 @@ import org.drools.runtime.rule.FactHandle;
 /*
  * Every cluster monitor, which are monitoring a cluster, should extend this class.
  */
-public abstract class AbstractClusterMonitor implements Runnable {
+public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
 
     private String clusterId;
     private String serviceId;
-    private ClusterStatus status;
+    protected ClusterStatus status;
     private int monitoringIntervalMilliseconds;
 
     protected FactHandle minCheckFactHandle;
@@ -68,6 +74,7 @@ public abstract class AbstractClusterMonitor implements Runnable {
     private boolean isDestroyed;
 
     private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+    protected boolean hasFaultyMember = false;
 
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
@@ -159,7 +166,7 @@ public abstract class AbstractClusterMonitor implements Runnable {
     public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
 
     public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
-    
+
     public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
 
     public String getClusterId() {
@@ -244,4 +251,37 @@ public abstract class AbstractClusterMonitor implements Runnable {
             AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
         this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
     }
+
+
+    @Override
+    public void onParentEvent(MonitorStatusEvent statusEvent) {
+        // send the ClusterTerminating event
+        if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+                ApplicationStatus.Terminating) {
+            StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
+        }
+    }
+
+    @Override
+    public void onChildEvent(MonitorStatusEvent statusEvent) {
+
+    }
+
+    @Override
+    public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
+
+    }
+
+    @Override
+    public void onEvent(MonitorScalingEvent scalingEvent) {
+
+    }
+
+    public void setHasFaultyMember(boolean hasFaultyMember) {
+        this.hasFaultyMember = hasFaultyMember;
+    }
+
+    public boolean isHasFaultyMember() {
+        return hasFaultyMember;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
index 2ead896..9cf3709 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
@@ -34,7 +34,7 @@ import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationCont
 import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
 import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
 import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
 import org.apache.stratos.autoscaler.partition.PartitionGroup;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
@@ -172,7 +172,7 @@ public class ApplicationMonitorFactory {
      * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
      * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
      */
-    public static ClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
+    public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
                                                    ClusterContext context, String appId)
             throws PolicyValidationException,
             PartitionValidationException,
@@ -182,7 +182,7 @@ public class ApplicationMonitorFactory {
         String serviceName = context.getServiceName();
 
         Cluster cluster;
-        ClusterMonitor clusterMonitor;
+        AbstractClusterMonitor clusterMonitor;
         //acquire read lock for the service and cluster
         TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
         try {
@@ -207,126 +207,17 @@ public class ApplicationMonitorFactory {
 
             }
 
-            String autoscalePolicyName = cluster.getAutoscalePolicyName();
-            String deploymentPolicyName = cluster.getDeploymentPolicyName();
 
-            if (log.isDebugEnabled()) {
-                log.debug("Deployment policy name: " + deploymentPolicyName);
-                log.debug("Autoscaler policy name: " + autoscalePolicyName);
-            }
-
-            AutoscalePolicy policy =
-                    PolicyManager.getInstance()
-                            .getAutoscalePolicy(autoscalePolicyName);
-            DeploymentPolicy deploymentPolicy =
-                    PolicyManager.getInstance()
-                            .getDeploymentPolicy(deploymentPolicyName);
-
-            if (deploymentPolicy == null) {
-                String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-                log.error(msg);
-                throw new PolicyValidationException(msg);
-            }
-
-            Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-            if (allPartitions == null) {
-                String msg =
-                        "Deployment Policy's Partitions are null. Policy name: " +
-                                deploymentPolicyName;
-                log.error(msg);
-                throw new PolicyValidationException(msg);
-            }
-
-            CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-
-            clusterMonitor = new ClusterMonitor(cluster.getClusterId(), cluster.getServiceName(),
-                    deploymentPolicy, policy);
-
-            for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
-                NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                        partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
-
-                for (Partition partition : partitionGroup.getPartitions()) {
-                    PartitionContext partitionContext = new PartitionContext(partition);
-                    partitionContext.setServiceName(cluster.getServiceName());
-                    partitionContext.setProperties(cluster.getProperties());
-                    partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
-                    for (Member member : cluster.getMembers()) {
-                        String memberId = member.getMemberId();
-                        if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
-                            MemberContext memberContext = new MemberContext();
-                            memberContext.setClusterId(member.getClusterId());
-                            memberContext.setMemberId(memberId);
-                            memberContext.setPartition(partition);
-                            memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
-                            if (MemberStatus.Activated.equals(member.getStatus())) {
-                                partitionContext.addActiveMember(memberContext);
-                                //triggering the status checker
-//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                            partitionContext.incrementCurrentActiveMemberCount(1);
-
-                            } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
-                                partitionContext.addPendingMember(memberContext);
-
-//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                            } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-//                            partitionContext.addFaultyMember(memberId);
-                            }
-                            partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                            if (log.isInfoEnabled()) {
-                                log.info(String.format("Member stat context has been added: [member] %s", memberId));
-                            }
-                        }
-
-                    }
-                    networkPartitionContext.addPartitionContext(partitionContext);
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Partition context has been added: [partition] %s",
-                                partitionContext.getPartitionId()));
-                    }
-                }
-
-                clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-                clusterMonitor.setParent(parentMonitor);
-                if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
-                    clusterMonitor.setHasDependent(true);
-                } else {
-                    clusterMonitor.setHasDependent(false);
-                }
-                AutoscalerContext.getInstance().addMonitor(clusterMonitor);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Network partition context has been added: [network partition] %s",
-                            networkPartitionContext.getId()));
-                }
-            }
-            //TODO to make sure when group monitor is async
-            //if cluster is not in created state then notify the parent monitor
-            if (cluster.getStatus() != clusterMonitor.getStatus()) {
-                //updating the status, so that it will notify the parent
-                clusterMonitor.setStatus(cluster.getStatus());
-            }
-
-            if (!cluster.hasMembers()) {
-                //triggering the status checker if cluster has members to decide
-                // on the current status of the cluster
-                StatusChecker.getInstance().onMemberStatusChange(clusterId);
+            clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
+            if (clusterMonitor instanceof VMClusterMonitor) {
+                return (VMClusterMonitor) clusterMonitor;
+            } else if (clusterMonitor != null) {
+                log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
             }
+            return null;
         } finally {
-            //release read lock for the service and cluster
             TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
         }
-
-        // set hasPrimary property
-        // hasPrimary is true if there are primary members available in that cluster
-        if (cluster.getProperties() != null) {
-            clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
-        }
-
-        log.info("Cluster monitor created: " + clusterMonitor.toString());
-        return clusterMonitor;
     }
 
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
index 67850ba..15b14b6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -64,19 +64,19 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
     public void run() {
 
         if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
+            log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
         }
         try {
-            if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+            if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
                 monitor();
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-                              + ClusterStatus.In_Maintenance + " mode......");
+                            + getStatus() + "state");
                 }
             }
         } catch (Exception e) {
-            log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
+            log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
                       e);
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index 2ed78f1..3e6cddc 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -63,20 +63,26 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
     @Override
     public void run() {
 
-        if (log.isDebugEnabled()) {
-            log.debug("VMLbClusterMonitor is running.. " + this.toString());
-        }
-        try {
-            if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-                monitor();
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
-                              ClusterStatus.In_Maintenance + " mode......");
+        while (!isDestroyed()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Cluster monitor is running.. " + this.toString());
+            }
+            try {
+                if (!ClusterStatus.Inactive.equals(status)) {
+                    monitor();
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("LB Cluster monitor is suspended as the cluster is in " +
+                                ClusterStatus.Inactive + " mode......");
+                    }
                 }
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
             }
-        } catch (Exception e) {
-            log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index 9aec279..9a26b42 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -63,28 +63,28 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
 
     @Override
     public void run() {
-
-        try {
-            // TODO make this configurable,
-            // this is the delay the min check of normal cluster monitor to wait until LB monitor is added
-            Thread.sleep(60000);
-        } catch (InterruptedException ignore) {
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("VMServiceClusterMonitor is running.. " + this.toString());
-        }
-        try {
-            if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-                monitor();
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
-                              ClusterStatus.In_Maintenance + " mode......");
+        while (!isDestroyed()) {
+            try {
+                if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
+                        (this.status == ClusterStatus.Inactive && !hasDependent) ||
+                        !this.hasFaultyMember) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cluster monitor is running.. " + this.toString());
+                    }
+                    monitor();
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cluster monitor is suspended as the cluster is in " +
+                                ClusterStatus.Inactive + " mode......");
+                    }
                 }
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed." + this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
             }
-        } catch (Exception e) {
-            log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
deleted file mode 100644
index 6d7e8ca..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class ClusterMonitor extends AbstractClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(ClusterMonitor.class);
-    private String lbReferenceType;
-    private boolean hasPrimary;
-
-
-    public ClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
-                          AutoscalePolicy autoscalePolicy) {
-        this.clusterId = clusterId;
-        this.id = clusterId;
-        this.serviceId = serviceId;
-
-        this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
-        this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
-        this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
-        this.terminateAllKnowledgeSession = autoscalerRuleEvaluator.getTerminateAllStatefulSession();
-
-        this.deploymentPolicy = deploymentPolicy;
-        this.autoscalePolicy = autoscalePolicy;
-        if (log.isDebugEnabled()) {
-            log.debug("ClusterMonitor:autoScalePolicy:" + autoscalePolicy);
-        }
-        networkPartitionCtxts = new ConcurrentHashMap<String, NetworkPartitionContext>();
-        //status = ClusterStatus.Created;
-    }
-
-
-    @Override
-    public void run() {
-        while (!isDestroyed()) {
-            try {
-                if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
-                        (this.status == ClusterStatus.Inactive && !hasDependent) ||
-                        !this.hasFaultyMember) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cluster monitor is running.. " + this.toString());
-                    }
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cluster monitor is suspended as the cluster is in " +
-                                ClusterStatus.Inactive + " mode......");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Cluster monitor: Monitor failed." + this.toString(), e);
-            }
-            try {
-                Thread.sleep(monitorInterval);
-            } catch (InterruptedException ignore) {
-            }
-        }
-
-
-    }
-
-    @Override
-    public void terminateAllMembers() {
-
-        Thread memberTerminator = new Thread(new Runnable(){
-            public void run(){
-
-                for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-                    for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
-                        //if (log.isDebugEnabled()) {
-                        log.info("Starting to terminate all members in Network Partition [ " +
-                                networkPartitionContext.getId() + " ], Partition [ " +
-                                partitionContext.getPartitionId() + " ]");
-                        // }
-                        // need to terminate active, pending and obsolete members
-
-                        // active members
-                        for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) {
-                            log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId());
-                            terminateMember(activeMemberCtxt.getMemberId());
-                        }
-
-                        // pending members
-                        for  (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) {
-                            log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId());
-                            terminateMember(pendingMemberCtxt.getMemberId());
-                        }
-
-                        // obsolete members
-                        for (String obsoleteMemberId : partitionContext.getObsoletedMembers()) {
-                            log.info("Terminating obsolete member [member id] " + obsoleteMemberId);
-                            terminateMember(obsoleteMemberId);
-                        }
-
-//                terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll
-//                        (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext);
-                    }
-                }
-            }
-        }, "Member Terminator - [cluster id] " + this.clusterId);
-
-        memberTerminator.start();
-    }
-
-    private static void terminateMember (String memberId) {
-        try {
-            CloudControllerClient.getInstance().terminate(memberId);
-
-        } catch (TerminationException e) {
-            log.error("Unable to terminate member [member id ] " + memberId, e);
-        }
-    }
-
-    private boolean isPrimaryMember(MemberContext memberContext) {
-        Properties props = memberContext.getProperties();
-        if (log.isDebugEnabled()) {
-            log.debug(" Properties [" + props + "] ");
-        }
-        if (props != null && props.getProperties() != null) {
-            for (Property prop : props.getProperties()) {
-                if (prop.getName().equals("PRIMARY")) {
-                    if (Boolean.parseBoolean(prop.getValue())) {
-                        log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
-                                "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
-    }
-
-    public void monitor() {
-        //TODO make this concurrent
-
-        for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-            // store primary members in the network partition context
-            List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
-            //minimum check per partition
-            for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
-                // store primary members in the partition context
-                List<String> primaryMemberListInPartition = new ArrayList<String>();
-                // get active primary members in this partition context
-                for (MemberContext memberContext : partitionContext.getActiveMembers()) {
-                    if (isPrimaryMember(memberContext)) {
-                        primaryMemberListInPartition.add(memberContext.getMemberId());
-                    }
-                }
-
-                // get pending primary members in this partition context
-                for (MemberContext memberContext : partitionContext.getPendingMembers()) {
-                    if (isPrimaryMember(memberContext)) {
-                        primaryMemberListInPartition.add(memberContext.getMemberId());
-                    }
-                }
-                primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
-                minCheckKnowledgeSession.setGlobal("clusterId", clusterId);
-                minCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType);
-                minCheckKnowledgeSession.setGlobal("isPrimary", hasPrimary);
-
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
-                }
-
-                minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession
-                        , minCheckFactHandle, partitionContext);
-
-                //checking the status of the cluster
-
-
-            }
-
-        	/*boolean rifReset = networkPartitionContext.isRifReset();
-            boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
-            boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
-
-            if (log.isDebugEnabled()) {
-                log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
-                        + " flag of loadAverageReset" + loadAverageReset);
-            }
-            if (rifReset || memoryConsumptionReset || loadAverageReset) {
-
-                scaleCheckKnowledgeSession.setGlobal("clusterId", clusterId);
-                //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
-                scaleCheckKnowledgeSession.setGlobal("autoscalePolicy", autoscalePolicy);
-                scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset);
-                scaleCheckKnowledgeSession.setGlobal("mcReset", memoryConsumptionReset);
-                scaleCheckKnowledgeSession.setGlobal("laReset", loadAverageReset);
-                scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType);
-                scaleCheckKnowledgeSession.setGlobal("isPrimary", false);
-                scaleCheckKnowledgeSession.setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
-                    log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
-                }
-
-                scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession
-                        , scaleCheckFactHandle, networkPartitionContext);
-
-                networkPartitionContext.setRifReset(false);
-                networkPartitionContext.setMemoryConsumptionReset(false);
-                networkPartitionContext.setLoadAverageReset(false);
-            } else if (log.isDebugEnabled()) {
-                log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
-                        "cycle for network partition %s", networkPartitionContext.getId()));
-            }*/
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "ClusterMonitor [clusterId=" + clusterId + ", serviceId=" + serviceId +
-                ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
-                ", lbReferenceType=" + lbReferenceType +
-                ", hasPrimary=" + hasPrimary + " ]";
-    }
-
-    public String getLbReferenceType() {
-        return lbReferenceType;
-    }
-
-    public void setLbReferenceType(String lbReferenceType) {
-        this.lbReferenceType = lbReferenceType;
-    }
-
-    public boolean isHasPrimary() {
-        return hasPrimary;
-    }
-
-    public void setHasPrimary(boolean hasPrimary) {
-        this.hasPrimary = hasPrimary;
-    }
-
-    @Override
-    public void onChildEvent(MonitorStatusEvent statusEvent) {
-
-    }
-
-    @Override
-    public void onParentEvent(MonitorStatusEvent statusEvent) {
-        // send the ClusterTerminating event
-        if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
-                ApplicationStatus.Terminating) {
-            StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java
deleted file mode 100644
index 6697d73..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class LbClusterMonitor extends AbstractClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(LbClusterMonitor.class);
-
-    public LbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
-                            AutoscalePolicy autoscalePolicy) {
-        this.clusterId = clusterId;
-        this.serviceId = serviceId;
-
-        this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
-        this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
-        this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
-        this.terminateAllKnowledgeSession = autoscalerRuleEvaluator.getTerminateAllStatefulSession();
-
-        this.deploymentPolicy = deploymentPolicy;
-        this.deploymentPolicy = deploymentPolicy;
-        networkPartitionCtxts = new ConcurrentHashMap<String, NetworkPartitionContext>();
-    }
-
-    @Override
-    public void run() {
-
-        while (!isDestroyed()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cluster monitor is running.. " + this.toString());
-            }
-            try {
-                if (!ClusterStatus.Inactive.equals(status)) {
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("LB Cluster monitor is suspended as the cluster is in " +
-                                ClusterStatus.Inactive + " mode......");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
-            }
-            try {
-                Thread.sleep(monitorInterval);
-            } catch (InterruptedException ignore) {
-            }
-        }
-    }
-
-    @Override
-    public void terminateAllMembers() {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    private void monitor() {
-        // TODO make this concurrent
-        for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-
-            // minimum check per partition
-            for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
-                    .values()) {
-
-                if (partitionContext != null) {
-                    minCheckKnowledgeSession.setGlobal("clusterId", clusterId);
-                    minCheckKnowledgeSession.setGlobal("isPrimary", false);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Running minimum check for partition %s ",
-                                partitionContext.getPartitionId()));
-                    }
-
-                    minCheckFactHandle =
-                            AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession,
-                                    minCheckFactHandle,
-                                    partitionContext);
-                    // start only in the first partition context
-                    break;
-                }
-
-            }
-
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "LbClusterMonitor [clusterId=" + clusterId + ", serviceId=" + serviceId + "]";
-    }
-
-
-    @Override
-    public void onParentEvent(MonitorStatusEvent statusEvent) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
index 3a925d1..5f3b590 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
@@ -24,8 +24,7 @@ import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
 import org.apache.stratos.autoscaler.PartitionContext;
 import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
@@ -57,7 +56,7 @@ public class StatusChecker {
     public void onMemberStatusChange(final String clusterId) {
         Runnable group = new Runnable() {
             public void run() {
-                ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
+                VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
                 boolean clusterActive = false;
                 if (monitor != null) {
                     clusterActive = clusterActive(monitor);
@@ -80,7 +79,7 @@ public class StatusChecker {
     public void onMemberTermination(final String clusterId) {
         Runnable group = new Runnable() {
             public void run() {
-                ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
+                VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
                 boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor);
                 boolean clusterActive = clusterActive(monitor);
 
@@ -121,7 +120,7 @@ public class StatusChecker {
 
     }
 
-    private boolean clusterActive(AbstractClusterMonitor monitor) {
+    private boolean clusterActive(VMClusterMonitor monitor) {
         boolean clusterActive = false;
         for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
             //minimum check per partition
@@ -140,7 +139,7 @@ public class StatusChecker {
         return clusterActive;
     }
 
-    private boolean clusterMonitorHasMembers(AbstractClusterMonitor monitor) {
+    private boolean clusterMonitorHasMembers(VMClusterMonitor monitor) {
         boolean hasMember = false;
         for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
             //minimum check per partition
@@ -161,7 +160,7 @@ public class StatusChecker {
     public void onMemberFaultEvent(final String clusterId, final String partitionId) {
         Runnable group = new Runnable() {
             public void run() {
-                ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
+                VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
                 boolean clusterInActive = getClusterInActive(monitor, partitionId);
                 String appId = monitor.getAppId();
                 if (clusterInActive) {
@@ -185,7 +184,7 @@ public class StatusChecker {
         groupThread.start();
     }
 
-    private boolean getClusterInActive(AbstractClusterMonitor monitor, String  partitionId) {
+    private boolean getClusterInActive(VMClusterMonitor monitor, String  partitionId) {
         boolean clusterInActive = false;
         for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
             for (PartitionContext partition : networkPartitionContext.getPartitionCtxts().values()) {