You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/31 05:04:04 UTC

[1/5] Adding autoscaler topology event listeners introduced by service grouping

Repository: stratos
Updated Branches:
  refs/heads/docker-grouping-merge a5dcbaa7d -> e1f37d63f


http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
new file mode 100644
index 0000000..d4b6a25
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+abstract public class VMClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
+    // Map<NetworkpartitionId, Network Partition Context>
+    protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
+    protected DeploymentPolicy deploymentPolicy;
+    protected AutoscalePolicy autoscalePolicy;
+
+    protected VMClusterMonitor(String clusterId, String serviceId,
+                               AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+                               DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
+                               Map<String, NetworkPartitionContext> networkPartitionCtxts) {
+        super(clusterId, serviceId, autoscalerRuleEvaluator);
+        this.deploymentPolicy = deploymentPolicy;
+        this.autoscalePolicy = autoscalePolicy;
+        this.networkPartitionCtxts = networkPartitionCtxts;
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        float value = averageLoadAverageEvent.getValue();
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+        String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
+                                    + "[value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                                  .format("Network partition context is not available for :"
+                                          + " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        float value = averageRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+        String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageLoadAverageEvent.getValue();
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+        String memberId = memberFaultEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        if (null == member) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+            }
+            return;
+        }
+        if (!member.isActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member activated event has not received for the member %s. "
+                                        + "Therefore ignoring" + " the member fault health stat", memberId));
+            }
+            return;
+        }
+
+        NetworkPartitionContext nwPartitionCtxt;
+        nwPartitionCtxt = getNetworkPartitionCtxt(member);
+        String partitionId = getPartitionOfMember(memberId);
+        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+        if (!partitionCtxt.activeMemberExist(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Could not find the active member in partition context, "
+                                        + "[member] %s ", memberId));
+            }
+            return;
+        }
+        // terminate the faulty member
+        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+        try {
+            ccClient.terminate(memberId);
+        } catch (TerminationException e) {
+            String msg = "TerminationException " + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+        // remove from active member list
+        partitionCtxt.removeActiveMemberById(memberId);
+        if (log.isInfoEnabled()) {
+            String clusterId = memberFaultEvent.getClusterId();
+            log.info(String.format("Faulty member is terminated and removed from the active members list: "
+                                   + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+        }
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+        String partitionId = memberActivatedEvent.getPartitionId();
+        String memberId = memberActivatedEvent.getMemberId();
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionContext;
+        partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been added successfully: "
+                                   + "[member] %s", memberId));
+        }
+        partitionContext.movePendingMemberToActiveMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
+        String partitionId = maintenanceModeEvent.getPartitionId();
+        String memberId = maintenanceModeEvent.getMemberId();
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member has been moved as pending termination: "
+                                    + "[member] %s", memberId));
+        }
+        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+        NetworkPartitionContext nwPartitionCtxt;
+        String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
+        nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+
+        // start a new member in the same Partition
+        String memberId = memberReadyToShutdownEvent.getMemberId();
+        String partitionId = getPartitionOfMember(memberId);
+        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+        // terminate the shutdown ready member
+        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+        try {
+            ccClient.terminate(memberId);
+            // remove from active member list
+            partitionCtxt.removeActiveMemberById(memberId);
+
+            String clusterId = memberReadyToShutdownEvent.getClusterId();
+            log.info(String.format("Member is terminated and removed from the active members list: "
+                                   + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+        } catch (TerminationException e) {
+            String msg = "TerminationException" + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+        String memberId = memberTerminatedEvent.getMemberId();
+        String partitionId = memberTerminatedEvent.getPartitionId();
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
+        partitionContext.removeMemberStatsContext(memberId);
+
+        if (partitionContext.removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (partitionContext.removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (partitionContext.removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is removed from "
+                                   + "active members list", memberId));
+        } else if (partitionContext.removeObsoleteMember(memberId)) {
+            log.warn(String.format("Member's obsolated timeout has been expired and "
+                                   + "it is removed from obsolated members list", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list active, "
+                                   + "pending and termination pending", memberId));
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been removed successfully: "
+                                   + "[member] %s", memberId));
+        }
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+    }
+
+    private String getNetworkPartitionIdByMemberId(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getNetworkPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    private Member getMemberByMemberId(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for (Service service : TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
+        log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
+        String networkPartitionId = member.getNetworkPartitionId();
+        if (networkPartitionCtxts.containsKey(networkPartitionId)) {
+            log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
+            return networkPartitionCtxts.get(networkPartitionId);
+        }
+        log.info("returning null getNetworkPartitionCtxt");
+        return null;
+    }
+
+    public String getPartitionOfMember(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    public DeploymentPolicy getDeploymentPolicy() {
+        return deploymentPolicy;
+    }
+
+    public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
+        this.deploymentPolicy = deploymentPolicy;
+    }
+
+    public AutoscalePolicy getAutoscalePolicy() {
+        return autoscalePolicy;
+    }
+
+    public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
+        this.autoscalePolicy = autoscalePolicy;
+    }
+
+    public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
+        return networkPartitionCtxts;
+    }
+
+    public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
+        return networkPartitionCtxts.get(networkPartitionId);
+    }
+
+    public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) {
+        this.networkPartitionCtxts = partitionCtxt;
+    }
+
+    public boolean partitionCtxtAvailable(String partitionId) {
+        return networkPartitionCtxts.containsKey(partitionId);
+    }
+
+    public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
+        this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
+    }
+
+    public NetworkPartitionContext getPartitionCtxt(String id) {
+        return this.networkPartitionCtxts.get(id);
+    }
+
+    @Override
+    public void terminateAllMembers() {
+
+        Thread memberTerminator = new Thread(new Runnable(){
+            public void run(){
+
+                for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
+                    for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
+                        //if (log.isDebugEnabled()) {
+                        log.info("Starting to terminate all members in Network Partition [ " +
+                                networkPartitionContext.getId() + " ], Partition [ " +
+                                partitionContext.getPartitionId() + " ]");
+                        // }
+                        // need to terminate active, pending and obsolete members
+
+                        // active members
+                        for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) {
+                            log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId());
+                            terminateMember(activeMemberCtxt.getMemberId());
+                        }
+
+                        // pending members
+                        for  (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) {
+                            log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId());
+                            terminateMember(pendingMemberCtxt.getMemberId());
+                        }
+
+                        // obsolete members
+                        for (String obsoleteMemberId : partitionContext.getObsoletedMembers().keySet()) {
+                            log.info("Terminating obsolete member [member id] " + obsoleteMemberId);
+                            terminateMember(obsoleteMemberId);
+                        }
+
+//                terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll
+//                        (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext);
+                    }
+                }
+            }
+        }, "Member Terminator - [cluster id] " + getClusterId());
+
+        memberTerminator.start();
+    }
+
+    private static void terminateMember (String memberId) {
+        try {
+            CloudControllerClient.getInstance().terminate(memberId);
+
+        } catch (TerminationException e) {
+            log.error("Unable to terminate member [member id ] " + memberId, e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
new file mode 100644
index 0000000..8a0959c
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class VMLbClusterMonitor extends VMClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
+
+    public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
+                              AutoscalePolicy autoscalePolicy) {
+        super(clusterId, serviceId,
+              new AutoscalerRuleEvaluator(
+                      StratosConstants.VM_MIN_CHECK_DROOL_FILE,
+                      StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
+              deploymentPolicy, autoscalePolicy,
+              new ConcurrentHashMap<String, NetworkPartitionContext>());
+        readConfigurations();
+    }
+
+    @Override
+    public void run() {
+
+        while (!isDestroyed()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Cluster monitor is running.. " + this.toString());
+            }
+            try {
+                if (!ClusterStatus.Inactive.equals(status)) {
+                    monitor();
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("LB Cluster monitor is suspended as the cluster is in " +
+                                ClusterStatus.Inactive + " mode......");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
+            }
+        }
+    }
+
+    @Override
+    protected void monitor() {
+        // TODO make this concurrent
+        for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
+
+            // minimum check per partition
+            for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
+                    .values()) {
+
+                if (partitionContext != null) {
+                    getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                    getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Running minimum check for partition %s ",
+                                                partitionContext.getPartitionId()));
+                    }
+
+                    minCheckFactHandle =
+                            AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
+                                                                     minCheckFactHandle,
+                                                                     partitionContext);
+                    // start only in the first partition context
+                    break;
+                }
+
+            }
+
+        }
+    }
+
+    @Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getMinCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = conf.getInt(AutoScalerConstants.VMLb_Cluster_MONITOR_INTERVAL, 90000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("VMLbClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+        String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
+        String clusterId = clusterRemovedEvent.getClusterId();
+        DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
+        if (depPolicy != null) {
+            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+                    .getNetworkPartitionLbHolders(depPolicy);
+
+            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+                // removes lb cluster ids
+                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+                if (isRemoved) {
+                    log.info("Removed the lb cluster [id]:"
+                             + clusterId
+                             + " reference from Network Partition [id]: "
+                             + networkPartitionLbHolder
+                            .getNetworkPartitionId());
+
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug(networkPartitionLbHolder);
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "VMLbClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + "]";
+    }
+
+    @Override
+    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+        // TODO 
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java
new file mode 100644
index 0000000..9d0f134
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class VMServiceClusterMonitor extends VMClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(VMServiceClusterMonitor.class);
+    private String lbReferenceType;
+    private boolean hasPrimary;
+
+    public VMServiceClusterMonitor(String clusterId, String serviceId,
+                                   DeploymentPolicy deploymentPolicy,
+                                   AutoscalePolicy autoscalePolicy) {
+        super(clusterId, serviceId,
+              new AutoscalerRuleEvaluator(StratosConstants.VM_MIN_CHECK_DROOL_FILE,
+                                          StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
+              deploymentPolicy, autoscalePolicy,
+              new ConcurrentHashMap<String, NetworkPartitionContext>());
+        readConfigurations();
+    }
+
+    @Override
+    public void run() {
+        while (!isDestroyed()) {
+            try {
+                if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
+                        (this.status == ClusterStatus.Inactive && !hasDependent) ||
+                        !this.hasFaultyMember) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cluster monitor is running.. " + this.toString());
+                    }
+                    monitor();
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cluster monitor is suspended as the cluster is in " +
+                                ClusterStatus.Inactive + " mode......");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed." + this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
+            }
+        }
+    }
+
+    @Override
+    protected void monitor() {
+
+        //TODO make this concurrent
+        for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
+            // store primary members in the network partition context
+            List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
+
+            //minimum check per partition
+            for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
+                // store primary members in the partition context
+                List<String> primaryMemberListInPartition = new ArrayList<String>();
+                // get active primary members in this partition context
+                for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+                    if (isPrimaryMember(memberContext)) {
+                        primaryMemberListInPartition.add(memberContext.getMemberId());
+                    }
+                }
+                // get pending primary members in this partition context
+                for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+                    if (isPrimaryMember(memberContext)) {
+                        primaryMemberListInPartition.add(memberContext.getMemberId());
+                    }
+                }
+                primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
+                getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+                getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                getMinCheckKnowledgeSession().setGlobal("primaryMemberCount", primaryMemberListInPartition.size());
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
+                }
+
+                minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession()
+                        , minCheckFactHandle, partitionContext);
+
+            }
+
+            boolean rifReset = networkPartitionContext.isRifReset();
+            boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
+            boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
+            if (log.isDebugEnabled()) {
+                log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
+                          + " flag of loadAverageReset" + loadAverageReset);
+            }
+            if (rifReset || memoryConsumptionReset || loadAverageReset) {
+                getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
+                getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
+                getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+                getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+                getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+                getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
+                    log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
+                }
+
+                scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
+                        , scaleCheckFactHandle, networkPartitionContext);
+
+                networkPartitionContext.setRifReset(false);
+                networkPartitionContext.setMemoryConsumptionReset(false);
+                networkPartitionContext.setLoadAverageReset(false);
+            } else if (log.isDebugEnabled()) {
+                log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
+                                        "cycle for network partition %s", networkPartitionContext.getId()));
+            }
+        }
+    }
+
+    private boolean isPrimaryMember(MemberContext memberContext) {
+        Properties props = memberContext.getProperties();
+        if (log.isDebugEnabled()) {
+            log.debug(" Properties [" + props + "] ");
+        }
+        if (props != null && props.getProperties() != null) {
+            for (Property prop : props.getProperties()) {
+                if (prop.getName().equals("PRIMARY")) {
+                    if (Boolean.parseBoolean(prop.getValue())) {
+                        log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
+                                  "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
+               ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
+               ", lbReferenceType=" + lbReferenceType +
+               ", hasPrimary=" + hasPrimary + " ]";
+    }
+
+    public String getLbReferenceType() {
+        return lbReferenceType;
+    }
+
+    public void setLbReferenceType(String lbReferenceType) {
+        this.lbReferenceType = lbReferenceType;
+    }
+
+    public boolean isHasPrimary() {
+        return hasPrimary;
+    }
+
+    public void setHasPrimary(boolean hasPrimary) {
+        this.hasPrimary = hasPrimary;
+    }
+
+    @Override
+    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+        // TODO 
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
index 5f3b590..73d85f0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
@@ -24,7 +24,7 @@ import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
 import org.apache.stratos.autoscaler.PartitionContext;
 import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 


[5/5] git commit: Adding autoscaler topology event listeners introduced by service grouping

Posted by im...@apache.org.
Adding autoscaler topology event listeners introduced by service grouping


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e1f37d63
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e1f37d63
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e1f37d63

Branch: refs/heads/docker-grouping-merge
Commit: e1f37d63f291d091f6bf9daf216a66a171162ec1
Parents: a5dcbaa
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Oct 31 09:33:56 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Oct 31 09:33:56 2014 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/AutoscalerContext.java   |  47 +-
 .../autoscaler/api/AutoScalerServiceImpl.java   |   2 +-
 .../AutoscalerHealthStatEventReceiver.java      |   2 +-
 .../AutoscalerTopologyEventReceiver.java        | 803 ++++++++++++++++---
 .../monitor/AbstractClusterMonitor.java         | 287 -------
 .../monitor/ApplicationMonitorFactory.java      | 235 ------
 .../monitor/ClusterMonitorFactory.java          | 444 ----------
 .../monitor/KubernetesClusterMonitor.java       | 510 ------------
 .../KubernetesServiceClusterMonitor.java        | 202 -----
 .../monitor/ParentComponentMonitor.java         |   5 +-
 .../autoscaler/monitor/VMClusterMonitor.java    | 639 ---------------
 .../autoscaler/monitor/VMLbClusterMonitor.java  | 181 -----
 .../monitor/VMServiceClusterMonitor.java        | 235 ------
 .../monitor/application/ApplicationMonitor.java |   2 +-
 .../application/ApplicationMonitorFactory.java  | 225 ++++++
 .../monitor/cluster/AbstractClusterMonitor.java | 290 +++++++
 .../monitor/cluster/ClusterMonitorFactory.java  | 442 ++++++++++
 .../cluster/KubernetesClusterMonitor.java       | 520 ++++++++++++
 .../KubernetesServiceClusterMonitor.java        | 197 +++++
 .../monitor/cluster/VMClusterMonitor.java       | 692 ++++++++++++++++
 .../monitor/cluster/VMLbClusterMonitor.java     | 181 +++++
 .../cluster/VMServiceClusterMonitor.java        | 235 ++++++
 .../status/checker/StatusChecker.java           |   2 +-
 23 files changed, 3498 insertions(+), 2880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index 2d10954..e8553bc 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -25,7 +25,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
 
 /**
  * It holds all cluster monitors which are active in stratos.
@@ -35,16 +36,15 @@ public class AutoscalerContext {
     private static final Log log = LogFactory.getLog(AutoscalerContext.class);
     private static final AutoscalerContext INSTANCE = new AutoscalerContext();
 
-    private AutoscalerContext() {
-        try {
-            setClusterMonitors(new HashMap<String, AbstractClusterMonitor>());
-        } catch (Exception e) {
-            log.error("Rule evaluateMinCheck error", e);
-        }
-    }
-
     // Map<ClusterId, AbstractClusterMonitor>
     private Map<String, AbstractClusterMonitor> clusterMonitors;
+    // Map<ApplicationId, ApplicationMonitor>
+    private Map<String, ApplicationMonitor> applicationMonitors;
+
+    private AutoscalerContext() {
+        clusterMonitors = new HashMap<String, AbstractClusterMonitor>();
+        applicationMonitors = new HashMap<String, ApplicationMonitor>();
+    }
 
     public static AutoscalerContext getInstance() {
         return INSTANCE;
@@ -58,22 +58,27 @@ public class AutoscalerContext {
         return clusterMonitors.get(clusterId);
     }
 
-    public Map<String, AbstractClusterMonitor> getClusterMonitors() {
-        return clusterMonitors;
+    public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
+        return clusterMonitors.remove(clusterId);
     }
 
-    public void setClusterMonitors(Map<String, AbstractClusterMonitor> clusterMonitors) {
-        this.clusterMonitors = clusterMonitors;
+    public void addAppMonitor(ApplicationMonitor applicationMonitor) {
+        applicationMonitors.put(applicationMonitor.getId(), applicationMonitor);
     }
 
-    public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
+    public ApplicationMonitor getAppMonitor(String applicationId) {
+        return applicationMonitors.get(applicationId);
+    }
+
+    public void removeAppMonitor(String applicationId) {
+        applicationMonitors.remove(applicationId);
+    }
+
+    public boolean appMonitorExist(String applicationId) {
+        return applicationMonitors.containsKey(applicationId);
+    }
 
-        AbstractClusterMonitor monitor = clusterMonitors.remove(clusterId);
-        if (monitor == null) {
-            log.fatal("ClusterMonitor not found for cluster id: " + clusterId);
-        } else {
-            log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
-        }
-        return monitor;
+    public boolean clusterMonitorExist(String clusterId) {
+        return clusterMonitors.containsKey(clusterId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
index 7748c09..3066034 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
@@ -27,7 +27,7 @@ import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.*;
 import org.apache.stratos.autoscaler.interfaces.AutoScalerServiceInterface;
 import org.apache.stratos.autoscaler.kubernetes.KubernetesManager;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.partition.PartitionGroup;
 import org.apache.stratos.autoscaler.partition.PartitionManager;
 import org.apache.stratos.autoscaler.policy.PolicyManager;

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index cd9aa9d..a5c6577 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -21,7 +21,7 @@ package org.apache.stratos.autoscaler.message.receiver.health;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index e03ff52..d6a140a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -21,37 +21,35 @@ package org.apache.stratos.autoscaler.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
+import org.apache.stratos.autoscaler.*;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.*;
+import org.apache.stratos.autoscaler.grouping.topic.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.KubernetesClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.autoscaler.status.checker.StatusChecker;
+import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener;
-import org.apache.stratos.messaging.listener.topology.MemberReadyToShutdownEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
 
+import java.util.List;
+import java.util.Set;
+
 /**
  * Autoscaler topology receiver.
  */
@@ -61,6 +59,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
 
     private TopologyEventReceiver topologyEventReceiver;
     private boolean terminated;
+    private boolean topologyInitialized;
 
     public AutoscalerTopologyEventReceiver() {
         this.topologyEventReceiver = new TopologyEventReceiver();
@@ -97,146 +96,614 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
-                try {
+                if (!topologyInitialized) {
+                    log.info("[CompleteTopologyEvent] Received: " + event.getClass());
+
                     TopologyManager.acquireReadLock();
-                    for (Service service : TopologyManager.getTopology().getServices()) {
-                        for (Cluster cluster : service.getClusters()) {
-                            startClusterMonitor(cluster);
+                    try {
+                        for (Application application : TopologyManager.getTopology().getApplications()) {
+                            startApplicationMonitor(application.getUniqueIdentifier());
                         }
+
+                        topologyInitialized = true;
+                    } catch (Exception e) {
+                        log.error("Error processing event", e);
+                    } finally {
+                        TopologyManager.releaseReadLock();
                     }
-                } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
-                    log.error(msg, e);
-                } finally {
-                    TopologyManager.releaseReadLock();
                 }
             }
         });
 
-        topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
+        topologyEventReceiver.addEventListener(new ApplicationCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
-                    String clusterId = memberReadyToShutdownEvent.getClusterId();
-                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    AbstractClusterMonitor monitor;
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                    if (null == monitor) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                    + "[cluster] %s", clusterId));
-                        }
-                        return;
+                    log.info("[ApplicationCreatedEvent] Received: " + event.getClass());
+                    ApplicationCreatedEvent applicationCreatedEvent = (ApplicationCreatedEvent) event;
+                    try {
+
+                        //acquire read lock
+                        TopologyManager.acquireReadLockForApplication(
+                                applicationCreatedEvent.getApplication().getUniqueIdentifier());
+                        //start the application monitor
+                        startApplicationMonitor(applicationCreatedEvent.getApplication().getUniqueIdentifier());
+                    } catch (Exception e) {
+                        String msg = "Error processing event " + e.getLocalizedMessage();
+                        log.error(msg, e);
+                    } finally {
+                        //release read lock
+                        TopologyManager.releaseReadLockForApplication(
+                                applicationCreatedEvent.getApplication().getUniqueIdentifier());
                     }
-                    monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-                } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
+                } catch (ClassCastException e) {
+                    String msg = "Error while casting the event " + e.getLocalizedMessage();
                     log.error(msg, e);
                 }
+
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                log.info("[ClusterActivatedEvent] Received: " + event.getClass());
+
+                ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event;
+                String appId = clusterActivatedEvent.getAppId();
+                String clusterId = clusterActivatedEvent.getClusterId();
+                AbstractClusterMonitor clusterMonitor =
+                        AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if(clusterMonitor!= null) {
+                    clusterMonitor.setStatus(ClusterStatus.Active);
+                }
+
             }
         });
 
         topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                log.info("[ClusterCreatedEvent] Received: " + event.getClass());
+
+                ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+                String clusterId = clusterCreatedEvent.getClusterId();
+                AbstractClusterMonitor clusterMonitor =
+                        AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                clusterMonitor.setStatus(ClusterStatus.Created);
+
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ClusterInActivateEvent] Received: " + event.getClass());
+
+                ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event;
+                String appId = clusterInactivateEvent.getAppId();
+                String clusterId = clusterInactivateEvent.getClusterId();
+                AbstractClusterMonitor clusterMonitor =
+                        AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if(clusterMonitor!= null) {
+                    clusterMonitor.setStatus(ClusterStatus.Inactive);
+                }
+
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ClusterTerminatingEvent] Received: " + event.getClass());
+
+                ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event;
+                String clusterId = clusterTerminatingEvent.getClusterId();
+                AbstractClusterMonitor clusterMonitor =
+                        AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if (clusterMonitor != null) {
+                    if (clusterMonitor.getStatus() == ClusterStatus.Active) {
+                        // terminated gracefully
+                        clusterMonitor.setStatus(ClusterStatus.Terminating);
+                        InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+                    } else {
+                        clusterMonitor.setStatus(ClusterStatus.Terminating);
+                        clusterMonitor.terminateAllMembers();
+                    }
+
+                } else {
+                    log.warn("No Cluster Monitor found for cluster id " + clusterId);
+                }
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ClusterTerminatedEvent] Received: " + event.getClass());
+
+                ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event;
+                String appId = clusterTerminatedEvent.getAppId();
+                String clusterId = clusterTerminatedEvent.getClusterId();
+                AbstractClusterMonitor clusterMonitor =
+                        AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if (clusterMonitor != null) {
+                    clusterMonitor.setStatus(ClusterStatus.Terminated);
+                }
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new GroupActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[GroupActivatedEvent] Received: " + event.getClass());
+
+                GroupActivatedEvent groupActivatedEvent = (GroupActivatedEvent) event;
+                String appId = groupActivatedEvent.getAppId();
+                String groupId = groupActivatedEvent.getGroupId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if (monitor != null) {
+                    monitor.setStatus(GroupStatus.Active);
+                }
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new GroupInActivateEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[GroupInActivateEvent] Received: " + event.getClass());
+
+                GroupInactivateEvent groupInactivateEvent = (GroupInactivateEvent) event;
+                String appId = groupInactivateEvent.getAppId();
+                String groupId = groupInactivateEvent.getGroupId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if (monitor != null) {
+                    monitor.setStatus(GroupStatus.Inactive);
+                }
+
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new GroupTerminatingEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[GroupTerminatingEvent] Received: " + event.getClass());
+
+                GroupTerminatingEvent groupTerminatingEvent = (GroupTerminatingEvent) event;
+                String appId = groupTerminatingEvent.getAppId();
+                String groupId = groupTerminatingEvent.getGroupId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if (monitor != null) {
+                    monitor.setStatus(GroupStatus.Terminating);
+                }
+
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new GroupTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[GroupTerminatedEvent] Received: " + event.getClass());
+
+                GroupTerminatedEvent groupTerminatedEvent = (GroupTerminatedEvent) event;
+                String appId = groupTerminatedEvent.getAppId();
+                String groupId = groupTerminatedEvent.getGroupId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+                //changing the status in the monitor, will notify its parent monitor
+                if (monitor != null) {
+                    monitor.setStatus(GroupStatus.Terminated);
+                }
+
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ApplicationActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ApplicationActivatedEvent] Received: " + event.getClass());
+
+                ApplicationActivatedEvent applicationActivatedEvent = (ApplicationActivatedEvent) event;
+                String appId = applicationActivatedEvent.getAppId();
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+                if(appMonitor != null) {
+                    appMonitor.setStatus(ApplicationStatus.Active);
+                }
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ApplicationUndeployedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ApplicationUndeployedEvent] Received: " + event.getClass());
+
+                ApplicationUndeployedEvent applicationUndeployedEvent = (ApplicationUndeployedEvent) event;
+
+                ApplicationMonitor appMonitor = AutoscalerContext.getInstance().
+                        getAppMonitor(applicationUndeployedEvent.getApplicationId());
+
+                // if any of Cluster Monitors are not added yet, should send the
+                // Cluster Terminated event for those clusters
+                Set<ClusterDataHolder> clusterDataHolders = applicationUndeployedEvent.getClusterData();
+                if (clusterDataHolders != null) {
+                    for (ClusterDataHolder clusterDataHolder : clusterDataHolders) {
+                        AbstractClusterMonitor clusterMonitor =
+                                AutoscalerContext.getInstance().getClusterMonitor(clusterDataHolder.getClusterId());
+                        if (clusterMonitor == null) {
+                            // Cluster Monitor not found; send Cluster Terminated event to cleanup
+                            StatusEventPublisher.sendClusterTerminatedEvent(
+                                    applicationUndeployedEvent.getApplicationId(),
+                                    clusterDataHolder.getServiceType(),
+                                    clusterDataHolder.getClusterId());
+                        } else {
+                            // if the Cluster Monitor exists, mark it as destroyed to stop it from spawning
+                            // more instances
+                            clusterMonitor.setDestroyed(true);
+                        }
+                    }
+                }
+
+                if (appMonitor != null) {
+                    // set Application Monitor state to 'Terminating'
+                    appMonitor.setStatus(ApplicationStatus.Terminating);
+
+                } else {
+                    // ApplicationMonitor is not found, send Terminating event to clean up
+                    StatusEventPublisher.sendApplicationTerminatedEvent(
+                            applicationUndeployedEvent.getApplicationId(), applicationUndeployedEvent.getClusterData());
+                }
+            }
+        });
+
+
+        topologyEventReceiver.addEventListener(new ApplicationTerminatingEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ApplicationTerminatingEvent] Received: " + event.getClass());
+
+                ApplicationTerminatingEvent appTerminatingEvent = (ApplicationTerminatingEvent) event;
+
+                // acquire read locks for application and relevant clusters
+                TopologyManager.acquireReadLockForApplication(appTerminatingEvent.getAppId());
+
                 try {
-                    log.info("Event received: " + event);
-                    ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
-                    TopologyManager.acquireReadLock();
-                    Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
-                    Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId());
-                    startClusterMonitor(cluster);
+                    ApplicationMonitor appMonitor = AutoscalerContext.getInstance().
+                            getAppMonitor(appTerminatingEvent.getAppId());
+
+                    if (appMonitor != null) {
+                        // update the status as Terminating
+                        appMonitor.setStatus(ApplicationStatus.Terminating);
+
+                    } else {
+                        log.warn("Application Monitor cannot be found for the undeployed [application] "
+                                + appTerminatingEvent.getAppId());
+                    }
+
+                } finally {
+                    TopologyManager.
+                            releaseReadLockForApplication(appTerminatingEvent.getAppId());
+                }
+            }
+        });
+
+        topologyEventReceiver.addEventListener(new ApplicationTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ApplicationTerminatedEvent] Received: " + event.getClass());
+
+                ApplicationTerminatedEvent applicationRemovedEvent = (ApplicationTerminatedEvent) event;
+                Set<ClusterDataHolder> clusterDataHolders = applicationRemovedEvent.getClusterData();
+
+                try {
+                    //TODO remove monitors as well as any starting or pending threads
+                    ApplicationMonitor monitor = AutoscalerContext.getInstance().
+                            getAppMonitor(applicationRemovedEvent.getAppId());
+                    if (monitor != null) {
+                        for (ClusterDataHolder clusterData : clusterDataHolders) {
+                            //stopping the cluster monitor and remove it from the AS
+                            VMClusterMonitor clusterMonitor = ((VMClusterMonitor)
+                                    AutoscalerContext.getInstance().getClusterMonitor(clusterData.getClusterId()));
+                            if (clusterMonitor != null) {
+                                clusterMonitor.setDestroyed(true);
+                                AutoscalerContext.getInstance().removeClusterMonitor(clusterData.getClusterId());
+                            } else {
+                                log.warn("Cluster Monitor not found for [ cluster id ] " +
+                                        clusterData.getClusterId() + ", unable to remove");
+                            }
+                        }
+                        //removing the application monitor
+                        AutoscalerContext.getInstance().
+                                removeAppMonitor(applicationRemovedEvent.getAppId());
+                    } else {
+                        log.warn("Application Monitor cannot be found for the terminated [application] "
+                                + applicationRemovedEvent.getAppId() + ", unable to remove");
+                    }
+
+
                 } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    String msg = "Error processing event " + e.getMessage();
                     log.error(msg, e);
-                } finally {
-                    TopologyManager.releaseReadLock();
                 }
             }
         });
 
-        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
-                    String clusterId = clusterRemovedEvent.getClusterId();
+                    MemberReadyToShutdownEvent memberReadyToShutdownEvent =
+                            (MemberReadyToShutdownEvent) event;
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
                     AbstractClusterMonitor monitor;
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                    if (null == monitor) {
+                    String clusterId = memberReadyToShutdownEvent.getClusterId();
+                    String memberId = memberReadyToShutdownEvent.getMemberId();
+
+                    if (asCtx.clusterMonitorExist(clusterId)) {
+                        monitor = asCtx.getClusterMonitor(clusterId);
+                    } else {
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                    + "[cluster] %s", clusterId));
+                            log.debug(String.format("A cluster monitor is not found " +
+                                    "in autoscaler context [cluster] %s", clusterId));
                         }
                         return;
                     }
-                    monitor.handleClusterRemovedEvent(clusterRemovedEvent);
-                    asCtx.removeClusterMonitor(clusterId);
-                    monitor.destroy();
-                    log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
-                                           clusterId));
-                } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
-                    log.error(msg, e);
+
+                    if(monitor instanceof VMClusterMonitor) {
+                        VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+                        NetworkPartitionContext nwPartitionCtxt;
+                        nwPartitionCtxt = vmClusterMonitor.getNetworkPartitionCtxt(
+                                memberReadyToShutdownEvent.getNetworkPartitionId());
+
+                        String partitionId = vmClusterMonitor.getPartitionOfMember(memberId);
+                        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+                        // terminate the member
+                        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+                        ccClient.terminate(memberId);
+
+                        // remove from active member list
+                        partitionCtxt.removeActiveMemberById(memberId);
+
+
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member is terminated and removed from the active " +
+                                            "members list: [member] %s [partition] %s [cluster] %s ",
+                                    memberId, partitionId, clusterId));
+                        }
+                    } else if(monitor instanceof KubernetesClusterMonitor) {
+                        KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+                        kubernetesClusterMonitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+                    }
+                } catch (TerminationException e) {
+                    log.error(e);
                 }
             }
+
         });
 
-        topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
+        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
 
-            }
+                ClusterRemovedEvent clusterRemovedEvent = null;
+                try {
+                    clusterRemovedEvent = (ClusterRemovedEvent) event;
+                    //TopologyManager.acquireReadLock();
+                    TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
+
+                    String clusterId = clusterRemovedEvent.getClusterId();
+                    String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
+
+                    AbstractClusterMonitor monitor;
+
+                    if (clusterRemovedEvent.isLbCluster()) {
+                        DeploymentPolicy depPolicy = PolicyManager.getInstance().
+                                getDeploymentPolicy(deploymentPolicy);
+                        if (depPolicy != null) {
+                            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+                                    .getNetworkPartitionLbHolders(depPolicy);
+
+                            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+                                // removes lb cluster ids
+                                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+                                if (isRemoved) {
+                                    log.info("Removed the lb cluster [id]:"
+                                            + clusterId
+                                            + " reference from Network Partition [id]: "
+                                            + networkPartitionLbHolder
+                                            .getNetworkPartitionId());
+
+                                }
+                                if (log.isDebugEnabled()) {
+                                    log.debug(networkPartitionLbHolder);
+                                }
+
+                            }
+                        }
+                        monitor = AutoscalerContext.getInstance()
+                                .removeClusterMonitor(clusterId);
 
+                    } else {
+                        monitor = (AbstractClusterMonitor) AutoscalerContext.getInstance()
+                                .removeClusterMonitor(clusterId);
+                    }
+
+                    // runTerminateAllRule(monitor);
+                    if (monitor != null) {
+                        monitor.destroy();
+                        log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+                                clusterId));
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(),
+                            clusterRemovedEvent.getClusterId());
+                }
+            }
         });
 
         topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberTerminatedEvent memberTerminatedEvent = null;
                 try {
-                    MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+                    //TopologyManager.acquireReadLock();
+
+                    memberTerminatedEvent = (MemberTerminatedEvent) event;
+                    String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
                     String clusterId = memberTerminatedEvent.getClusterId();
-                    AbstractClusterMonitor monitor;
-                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                    if (null == monitor) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                    + "[cluster] %s", clusterId));
-                        }
+                    String partitionId = memberTerminatedEvent.getPartitionId();
+
+                    TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
+
+                    AbstractClusterMonitor monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+                    if(monitor == null) {
+                        log.error(String.format("Cluster monitor not found in autoscaler context: [clusterId] %s ", clusterId));
                         return;
                     }
-                    monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+
+                    if(monitor instanceof VMClusterMonitor) {
+                        VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+                        NetworkPartitionContext networkPartitionContext = vmClusterMonitor.
+                                getNetworkPartitionCtxt(networkPartitionId);
+
+                        PartitionContext partitionContext = networkPartitionContext.
+                                getPartitionCtxt(partitionId);
+                        String memberId = memberTerminatedEvent.getMemberId();
+                        partitionContext.removeMemberStatsContext(memberId);
+
+
+                        if (partitionContext.removeTerminationPendingMember(memberId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Member is removed from termination pending " +
+                                        "members list: [member] %s", memberId));
+                            }
+                        } else if (partitionContext.removePendingMember(memberId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Member is removed from pending members list: " +
+                                        "[member] %s", memberId));
+                            }
+                        } else if (partitionContext.removeActiveMemberById(memberId)) {
+                            log.warn(String.format("Member is in the wrong list and it is removed " +
+                                    "from active members list", memberId));
+                        } else {
+                            log.warn(String.format("Member is not available in any of the list " +
+                                    "active, pending and termination pending", memberId));
+                        }
+
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has been removed " +
+                                    "               successfully: [member] %s", memberId));
+                        }
+                        //Checking whether the cluster state can be changed either from in_active to created/terminating to terminated
+                        StatusChecker.getInstance().onMemberTermination(clusterId);
+                    } else if(monitor instanceof KubernetesClusterMonitor) {
+                        KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+                        kubernetesClusterMonitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+                    }
+
                 } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
-                    log.error(msg, e);
+                    log.error("Error processing event", e);
+                } finally {
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+                            memberTerminatedEvent.getClusterId());
                 }
             }
-
         });
 
         topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+                        memberActivatedEvent.getClusterId());
+
                 try {
-                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                    String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
                     String clusterId = memberActivatedEvent.getClusterId();
-                    AbstractClusterMonitor monitor;
-                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                    if (null == monitor) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                    + "[cluster] %s", clusterId));
-                        }
+                    String partitionId = memberActivatedEvent.getPartitionId();
+                    String memberId = memberActivatedEvent.getMemberId();
+
+                    AbstractClusterMonitor monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+                    if(monitor == null) {
+                        log.error(String.format("Cluster monitor not found in autoscaler context: [clusterId] %s ", clusterId));
                         return;
                     }
-                    monitor.handleMemberActivatedEvent(memberActivatedEvent);
+
+                    if(monitor instanceof VMClusterMonitor) {
+                        VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+                        NetworkPartitionContext networkPartitionContext = vmClusterMonitor.
+                                getNetworkPartitionCtxt(networkPartitionId);
+                        PartitionContext partitionContext = networkPartitionContext.
+                                getPartitionCtxt(partitionId);
+
+                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                        // TODO starting the pending clusters which are waiting for this member activation in a cluster
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has been added " +
+                                    "successfully: [member] %s", memberId));
+                        }
+                        partitionContext.movePendingMemberToActiveMembers(memberId);
+                        //triggering the status checker
+                        StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId());
+                    } else if(monitor instanceof KubernetesClusterMonitor) {
+                        KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+                        kubernetesClusterMonitor.handleMemberActivatedEvent(memberActivatedEvent);
+                    }
+
                 } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
-                    log.error(msg, e);
+                    log.error("Error processing event", e);
+                } finally {
+                    //TopologyManager.releaseReadLock();
+                    TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+                            memberActivatedEvent.getClusterId());
                 }
             }
         });
@@ -244,34 +711,51 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
             @Override
             protected void onEvent(Event event) {
+
+                MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+                //TopologyManager.acquireReadLock();
+                TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                        memberMaintenanceModeEvent.getClusterId());
+
                 try {
-                    MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
-                    String clusterId = maintenanceModeEvent.getClusterId();
-                    AbstractClusterMonitor monitor;
-                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                    if (null == monitor) {
+
+                    String memberId = memberMaintenanceModeEvent.getMemberId();
+                    String partitionId = memberMaintenanceModeEvent.getPartitionId();
+                    String networkPartitionId = memberMaintenanceModeEvent.getNetworkPartitionId();
+
+                    PartitionContext partitionContext;
+                    String clusterId = memberMaintenanceModeEvent.getClusterId();
+
+                    AbstractClusterMonitor monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+                    if(monitor == null) {
+                        log.error(String.format("Cluster monitor not found in autoscaler context: [clusterId] %s ", clusterId));
+                        return;
+                    }
+
+                    if(monitor instanceof VMClusterMonitor) {
+                        VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+                            partitionContext = vmClusterMonitor.getNetworkPartitionCtxt(networkPartitionId).
+                                    getPartitionCtxt(partitionId);
+                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
                         if (log.isDebugEnabled()) {
-                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                    + "[cluster] %s", clusterId));
+                            log.debug(String.format("Member has been moved as pending termination: " +
+                                    "[member] %s", memberId));
                         }
-                        return;
+                        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+                    } else if(monitor instanceof KubernetesClusterMonitor) {
+                        KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+                        kubernetesClusterMonitor.handleMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
                     }
-                    monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+
                 } catch (Exception e) {
-                    String msg = "Error processing event " + e.getLocalizedMessage();
-                    log.error(msg, e);
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+                            memberMaintenanceModeEvent.getClusterId());
                 }
             }
         });
-
-
-        topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-            }
-        });
     }
 
     private class ClusterMonitorAdder implements Runnable {
@@ -371,4 +855,85 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             }
         }
     }
+
+    protected synchronized void startApplicationMonitor(String applicationId) {
+        Thread th = null;
+        if (!AutoscalerContext.getInstance().appMonitorExist(applicationId)) {
+            th = new Thread(
+                    new ApplicationMonitorAdder(applicationId));
+        }
+
+        if (th != null) {
+            th.start();
+            //    try {
+            //        th.join();
+            //    } catch (InterruptedException ignore) {
+
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Application monitor thread has been started successfully: " +
+                                "[application] %s ", applicationId));
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Application monitor thread already exists: " +
+                                "[application] %s ", applicationId));
+            }
+        }
+    }
+
+    private class ApplicationMonitorAdder implements Runnable {
+        private String appId;
+
+        public ApplicationMonitorAdder(String appId) {
+            this.appId = appId;
+        }
+
+        public void run() {
+            ApplicationMonitor applicationMonitor = null;
+            int retries = 5;
+            boolean success = false;
+            do {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e1) {
+                }
+                try {
+                    long start = System.currentTimeMillis();
+                    if (log.isDebugEnabled()) {
+                        log.debug("application monitor is going to be started for [application] " +
+                                appId);
+                    }
+                    applicationMonitor = ApplicationMonitorFactory.getApplicationMonitor(appId);
+
+                    long end = System.currentTimeMillis();
+                    log.info("Time taken to start app monitor: " + (end - start) / 1000);
+                    success = true;
+                } catch (DependencyBuilderException e) {
+                    String msg = "Application monitor creation failed for Application: ";
+                    log.warn(msg, e);
+                    retries--;
+                } catch (TopologyInConsistentException e) {
+                    String msg = "Application monitor creation failed for Application: ";
+                    log.warn(msg, e);
+                    retries--;
+                }
+            } while (!success && retries != 0);
+
+            if (applicationMonitor == null) {
+                String msg = "Application monitor creation failed, even after retrying for 5 times, "
+                        + "for Application: " + appId;
+                log.error(msg);
+                throw new RuntimeException(msg);
+            }
+
+            AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application monitor has been added successfully: " +
+                        "[application] %s", applicationMonitor.getId()));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
deleted file mode 100644
index 030bc53..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.drools.runtime.StatefulKnowledgeSession;
-import org.drools.runtime.rule.FactHandle;
-
-/*
- * Every cluster monitor, which are monitoring a cluster, should extend this class.
- */
-public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
-
-    private String clusterId;
-    private String serviceId;
-    protected ClusterStatus status;
-    private int monitoringIntervalMilliseconds;
-
-    protected FactHandle minCheckFactHandle;
-    protected FactHandle scaleCheckFactHandle;
-    private StatefulKnowledgeSession minCheckKnowledgeSession;
-    private StatefulKnowledgeSession scaleCheckKnowledgeSession;
-    private boolean isDestroyed;
-
-    private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-    protected boolean hasFaultyMember = false;
-
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
-    protected AbstractClusterMonitor(String clusterId, String serviceId,
-                                     AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-
-        super();
-        this.clusterId = clusterId;
-        this.serviceId = serviceId;
-        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
-        this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
-        this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
-    }
-
-    protected abstract void readConfigurations();
-
-    public void startScheduler() {
-        scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
-    }
-
-    protected void stopScheduler() {
-        scheduler.shutdownNow();
-    }
-
-    protected abstract void monitor();
-
-    public abstract void destroy();
-
-    //handle health events
-    public abstract void handleAverageLoadAverageEvent(
-            AverageLoadAverageEvent averageLoadAverageEvent);
-
-    public abstract void handleGradientOfLoadAverageEvent(
-            GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
-
-    public abstract void handleSecondDerivativeOfLoadAverageEvent(
-            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
-
-    public abstract void handleAverageMemoryConsumptionEvent(
-            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
-
-    public abstract void handleGradientOfMemoryConsumptionEvent(
-            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
-
-    public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
-            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
-
-    public abstract void handleAverageRequestsInFlightEvent(
-            AverageRequestsInFlightEvent averageRequestsInFlightEvent);
-
-    public abstract void handleGradientOfRequestsInFlightEvent(
-            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
-
-    public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
-            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
-
-    public abstract void handleMemberAverageMemoryConsumptionEvent(
-            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
-
-    public abstract void handleMemberGradientOfMemoryConsumptionEvent(
-            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
-
-    public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
-            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
-
-
-    public abstract void handleMemberAverageLoadAverageEvent(
-            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
-
-    public abstract void handleMemberGradientOfLoadAverageEvent(
-            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
-
-    public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
-            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
-
-    public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
-
-    //handle topology events
-    public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
-
-    public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
-
-    public abstract void handleMemberMaintenanceModeEvent(
-            MemberMaintenanceModeEvent maintenanceModeEvent);
-
-    public abstract void handleMemberReadyToShutdownEvent(
-            MemberReadyToShutdownEvent memberReadyToShutdownEvent);
-
-    public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
-
-    public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
-
-    public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
-
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public void setClusterId(String clusterId) {
-        this.clusterId = clusterId;
-    }
-
-    public void setStatus(ClusterStatus status) {
-        this.status = status;
-    }
-
-    public ClusterStatus getStatus() {
-        return status;
-    }
-
-    public String getServiceId() {
-        return serviceId;
-    }
-
-    public void setServiceId(String serviceId) {
-        this.serviceId = serviceId;
-    }
-
-    public int getMonitorIntervalMilliseconds() {
-        return monitoringIntervalMilliseconds;
-    }
-
-    public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
-        this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
-    }
-
-    public FactHandle getMinCheckFactHandle() {
-        return minCheckFactHandle;
-    }
-
-    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
-        this.minCheckFactHandle = minCheckFactHandle;
-    }
-
-    public FactHandle getScaleCheckFactHandle() {
-        return scaleCheckFactHandle;
-    }
-
-    public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
-        this.scaleCheckFactHandle = scaleCheckFactHandle;
-    }
-
-    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
-        return minCheckKnowledgeSession;
-    }
-
-    public void setMinCheckKnowledgeSession(
-            StatefulKnowledgeSession minCheckKnowledgeSession) {
-        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
-    }
-
-    public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
-        return scaleCheckKnowledgeSession;
-    }
-
-    public void setScaleCheckKnowledgeSession(
-            StatefulKnowledgeSession scaleCheckKnowledgeSession) {
-        this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
-    }
-
-    public boolean isDestroyed() {
-        return isDestroyed;
-    }
-
-    public void setDestroyed(boolean isDestroyed) {
-        this.isDestroyed = isDestroyed;
-    }
-
-    public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
-        return autoscalerRuleEvaluator;
-    }
-
-    public void setAutoscalerRuleEvaluator(
-            AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
-    }
-
-
-    @Override
-    public void onParentEvent(MonitorStatusEvent statusEvent) {
-        // send the ClusterTerminating event
-        if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
-                ApplicationStatus.Terminating) {
-            StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
-        }
-    }
-
-    @Override
-    public void onChildEvent(MonitorStatusEvent statusEvent) {
-
-    }
-
-    @Override
-    public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
-
-    }
-
-    @Override
-    public void onEvent(MonitorScalingEvent scalingEvent) {
-
-    }
-
-    public void setHasFaultyMember(boolean hasFaultyMember) {
-        this.hasFaultyMember = hasFaultyMember;
-    }
-
-    public boolean isHasFaultyMember() {
-        return hasFaultyMember;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
deleted file mode 100644
index 9cf3709..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
-import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
-import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
-import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.status.checker.StatusChecker;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
-
-import java.util.Map;
-
-/**
- * Factory class to get the Monitors.
- */
-public class ApplicationMonitorFactory {
-    private static final Log log = LogFactory.getLog(ApplicationMonitorFactory.class);
-
-    /**
-     * Factor method used to create relevant monitors based on the given context
-     *
-     * @param context       Application/Group/Cluster context
-     * @param appId         appId of the application which requires to create app monitor
-     * @param parentMonitor parent of the monitor
-     * @return Monitor which can be ApplicationMonitor/GroupMonitor/ClusterMonitor
-     * @throws TopologyInConsistentException throws while traversing thr topology
-     * @throws DependencyBuilderException    throws while building dependency for app monitor
-     * @throws PolicyValidationException     throws while validating the policy associated with cluster
-     * @throws PartitionValidationException  throws while validating the partition used in a cluster
-     */
-    public static Monitor getMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
-            throws TopologyInConsistentException,
-            DependencyBuilderException, PolicyValidationException, PartitionValidationException {
-        Monitor monitor;
-
-        if (context instanceof GroupContext) {
-            monitor = getGroupMonitor(parentMonitor, context, appId);
-        } else if (context instanceof ClusterContext) {
-            monitor = getClusterMonitor(parentMonitor, (ClusterContext) context, appId);
-            //Start the thread
-            Thread th = new Thread((AbstractClusterMonitor) monitor);
-            th.start();
-        } else {
-            monitor = getApplicationMonitor(appId);
-        }
-        return monitor;
-    }
-
-    /**
-     * This will create the GroupMonitor based on given groupId by going thr Topology
-     *
-     * @param parentMonitor parent of the monitor
-     * @param context       groupId of the group
-     * @param appId         appId of the relevant application
-     * @return Group monitor
-     * @throws DependencyBuilderException    throws while building dependency for app monitor
-     * @throws TopologyInConsistentException throws while traversing thr topology
-     */
-    public static Monitor getGroupMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
-            throws DependencyBuilderException,
-            TopologyInConsistentException {
-        GroupMonitor groupMonitor;
-        TopologyManager.acquireReadLockForApplication(appId);
-
-        try {
-            Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId());
-            groupMonitor = new GroupMonitor(group, appId);
-            groupMonitor.setAppId(appId);
-            if(parentMonitor != null) {
-                groupMonitor.setParent(parentMonitor);
-                //Setting the dependent behaviour of the monitor
-                if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
-                    groupMonitor.setHasDependent(true);
-                } else {
-                    groupMonitor.setHasDependent(false);
-                }
-                //TODO make sure when it is async
-
-                if (group.getStatus() != groupMonitor.getStatus()) {
-                    //updating the status, if the group is not in created state when creating group Monitor
-                    //so that groupMonitor will notify the parent (useful when restarting stratos)
-                    groupMonitor.setStatus(group.getStatus());
-                }
-            }
-
-        } finally {
-            TopologyManager.releaseReadLockForApplication(appId);
-
-        }
-        return groupMonitor;
-
-    }
-
-    /**
-     * This will create a new app monitor based on the give appId by getting the
-     * application from Topology
-     *
-     * @param appId appId of the application which requires to create app monitor
-     * @return ApplicationMonitor
-     * @throws DependencyBuilderException    throws while building dependency for app monitor
-     * @throws TopologyInConsistentException throws while traversing thr topology
-     */
-    public static ApplicationMonitor getApplicationMonitor(String appId)
-            throws DependencyBuilderException,
-            TopologyInConsistentException {
-        ApplicationMonitor applicationMonitor;
-        TopologyManager.acquireReadLockForApplication(appId);
-        try {
-            Application application = TopologyManager.getTopology().getApplication(appId);
-            if (application != null) {
-                applicationMonitor = new ApplicationMonitor(application);
-                applicationMonitor.setHasDependent(false);
-
-            } else {
-                String msg = "[Application] " + appId + " cannot be found in the Topology";
-                throw new TopologyInConsistentException(msg);
-            }
-        } finally {
-            TopologyManager.releaseReadLockForApplication(appId);
-        }
-
-        return applicationMonitor;
-
-    }
-
-    /**
-     * Updates ClusterContext for given cluster
-     *
-     * @param parentMonitor parent of the monitor
-     * @param context
-     * @return ClusterMonitor - Updated ClusterContext
-     * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
-     * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
-     */
-    public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
-                                                   ClusterContext context, String appId)
-            throws PolicyValidationException,
-            PartitionValidationException,
-            TopologyInConsistentException {
-        //Retrieving the Cluster from Topology
-        String clusterId = context.getId();
-        String serviceName = context.getServiceName();
-
-        Cluster cluster;
-        AbstractClusterMonitor clusterMonitor;
-        //acquire read lock for the service and cluster
-        TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
-        try {
-            Topology topology = TopologyManager.getTopology();
-            if (topology.serviceExists(serviceName)) {
-                Service service = topology.getService(serviceName);
-                if (service.clusterExists(clusterId)) {
-                    cluster = service.getCluster(clusterId);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Dependency check starting the [cluster]" + clusterId);
-                    }
-                    // startClusterMonitor(this, cluster);
-                    //context.setCurrentStatus(Status.Created);
-                } else {
-                    String msg = "[Cluster] " + clusterId + " cannot be found in the " +
-                            "Topology for [service] " + serviceName;
-                    throw new TopologyInConsistentException(msg);
-                }
-            } else {
-                String msg = "[Service] " + serviceName + " cannot be found in the Topology";
-                throw new TopologyInConsistentException(msg);
-
-            }
-
-
-            clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
-            if (clusterMonitor instanceof VMClusterMonitor) {
-                return (VMClusterMonitor) clusterMonitor;
-            } else if (clusterMonitor != null) {
-                log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
-            }
-            return null;
-        } finally {
-            TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
-        }
-    }
-
-
-    private static Properties convertMemberPropsToMemberContextProps(
-            java.util.Properties properties) {
-        Properties props = new Properties();
-        for (Map.Entry<Object, Object> e : properties.entrySet()) {
-            Property prop = new Property();
-            prop.setName((String) e.getKey());
-            prop.setValue((String) e.getValue());
-            props.addProperties(prop);
-        }
-        return props;
-    }
-}


[3/5] Adding autoscaler topology event listeners introduced by service grouping

Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
deleted file mode 100644
index 38ed1a6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
+++ /dev/null
@@ -1,639 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-abstract public class VMClusterMonitor extends AbstractClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
-    // Map<NetworkpartitionId, Network Partition Context>
-    protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
-    protected DeploymentPolicy deploymentPolicy;
-    protected AutoscalePolicy autoscalePolicy;
-
-    protected VMClusterMonitor(String clusterId, String serviceId,
-                               AutoscalerRuleEvaluator autoscalerRuleEvaluator,
-                               DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
-                               Map<String, NetworkPartitionContext> networkPartitionCtxts) {
-        super(clusterId, serviceId, autoscalerRuleEvaluator);
-        this.deploymentPolicy = deploymentPolicy;
-        this.autoscalePolicy = autoscalePolicy;
-        this.networkPartitionCtxts = networkPartitionCtxts;
-    }
-
-    @Override
-    public void handleAverageLoadAverageEvent(
-            AverageLoadAverageEvent averageLoadAverageEvent) {
-
-        String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
-        String clusterId = averageLoadAverageEvent.getClusterId();
-        float value = averageLoadAverageEvent.getValue();
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
-                                    clusterId, networkPartitionId, value));
-        }
-
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setAverageLoadAverage(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-
-    }
-
-    @Override
-    public void handleGradientOfLoadAverageEvent(
-            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
-        String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
-        String clusterId = gradientOfLoadAverageEvent.getClusterId();
-        float value = gradientOfLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
-                                    clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setLoadAverageGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfLoadAverageEvent(
-            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
-        String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
-        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
-        float value = secondDerivativeOfLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
-                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setLoadAverageSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleAverageMemoryConsumptionEvent(
-            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
-        String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
-        String clusterId = averageMemoryConsumptionEvent.getClusterId();
-        float value = averageMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
-                                    + "[value] %s", clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setAverageMemoryConsumption(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String
-                                  .format("Network partition context is not available for :"
-                                          + " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleGradientOfMemoryConsumptionEvent(
-            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
-        String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
-        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
-        float value = gradientOfMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
-                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setMemoryConsumptionGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfMemoryConsumptionEvent(
-            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
-        String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
-        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
-        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
-                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleAverageRequestsInFlightEvent(
-            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
-        String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
-        String clusterId = averageRequestsInFlightEvent.getClusterId();
-        float value = averageRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
-                                    clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setAverageRequestsInFlight(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleGradientOfRequestsInFlightEvent(
-            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
-        String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
-        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
-        float value = gradientOfRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
-                                    clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setRequestsInFlightGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfRequestsInFlightEvent(
-            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
-        String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
-        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
-        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
-                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
-        }
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        if (null != networkPartitionContext) {
-            networkPartitionContext.setRequestsInFlightSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Network partition context is not available for :" +
-                                        " [network partition] %s", networkPartitionId));
-            }
-        }
-    }
-
-    @Override
-    public void handleMemberAverageMemoryConsumptionEvent(
-            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
-        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
-        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberAverageMemoryConsumptionEvent.getValue();
-        memberStatsContext.setAverageMemoryConsumption(value);
-    }
-
-    @Override
-    public void handleMemberGradientOfMemoryConsumptionEvent(
-            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
-        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
-        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberGradientOfMemoryConsumptionEvent.getValue();
-        memberStatsContext.setGradientOfMemoryConsumption(value);
-    }
-
-    @Override
-    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
-            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
-    }
-
-    @Override
-    public void handleMemberAverageLoadAverageEvent(
-            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
-        String memberId = memberAverageLoadAverageEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
-        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberAverageLoadAverageEvent.getValue();
-        memberStatsContext.setAverageLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberGradientOfLoadAverageEvent(
-            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
-        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
-        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberGradientOfLoadAverageEvent.getValue();
-        memberStatsContext.setGradientOfLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberSecondDerivativeOfLoadAverageEvent(
-            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
-        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
-        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
-        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-
-        String memberId = memberFaultEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        if (null == member) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
-            }
-            return;
-        }
-        if (!member.isActive()) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member activated event has not received for the member %s. "
-                                        + "Therefore ignoring" + " the member fault health stat", memberId));
-            }
-            return;
-        }
-
-        NetworkPartitionContext nwPartitionCtxt;
-        nwPartitionCtxt = getNetworkPartitionCtxt(member);
-        String partitionId = getPartitionOfMember(memberId);
-        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-        if (!partitionCtxt.activeMemberExist(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Could not find the active member in partition context, "
-                                        + "[member] %s ", memberId));
-            }
-            return;
-        }
-        // terminate the faulty member
-        CloudControllerClient ccClient = CloudControllerClient.getInstance();
-        try {
-            ccClient.terminate(memberId);
-        } catch (TerminationException e) {
-            String msg = "TerminationException " + e.getLocalizedMessage();
-            log.error(msg, e);
-        }
-        // remove from active member list
-        partitionCtxt.removeActiveMemberById(memberId);
-        if (log.isInfoEnabled()) {
-            String clusterId = memberFaultEvent.getClusterId();
-            log.info(String.format("Faulty member is terminated and removed from the active members list: "
-                                   + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-        }
-    }
-
-    @Override
-    public void handleMemberStartedEvent(
-            MemberStartedEvent memberStartedEvent) {
-
-    }
-
-    @Override
-    public void handleMemberActivatedEvent(
-            MemberActivatedEvent memberActivatedEvent) {
-
-        String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
-        String partitionId = memberActivatedEvent.getPartitionId();
-        String memberId = memberActivatedEvent.getMemberId();
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionContext;
-        partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
-        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Member stat context has been added successfully: "
-                                   + "[member] %s", memberId));
-        }
-        partitionContext.movePendingMemberToActiveMembers(memberId);
-    }
-
-    @Override
-    public void handleMemberMaintenanceModeEvent(
-            MemberMaintenanceModeEvent maintenanceModeEvent) {
-
-        String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
-        String partitionId = maintenanceModeEvent.getPartitionId();
-        String memberId = maintenanceModeEvent.getMemberId();
-        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
-        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Member has been moved as pending termination: "
-                                    + "[member] %s", memberId));
-        }
-        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
-    }
-
-    @Override
-    public void handleMemberReadyToShutdownEvent(
-            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
-        NetworkPartitionContext nwPartitionCtxt;
-        String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
-        nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-
-        // start a new member in the same Partition
-        String memberId = memberReadyToShutdownEvent.getMemberId();
-        String partitionId = getPartitionOfMember(memberId);
-        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-        // terminate the shutdown ready member
-        CloudControllerClient ccClient = CloudControllerClient.getInstance();
-        try {
-            ccClient.terminate(memberId);
-            // remove from active member list
-            partitionCtxt.removeActiveMemberById(memberId);
-
-            String clusterId = memberReadyToShutdownEvent.getClusterId();
-            log.info(String.format("Member is terminated and removed from the active members list: "
-                                   + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-        } catch (TerminationException e) {
-            String msg = "TerminationException" + e.getLocalizedMessage();
-            log.error(msg, e);
-        }
-    }
-
-    @Override
-    public void handleMemberTerminatedEvent(
-            MemberTerminatedEvent memberTerminatedEvent) {
-
-        String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
-        String memberId = memberTerminatedEvent.getMemberId();
-        String partitionId = memberTerminatedEvent.getPartitionId();
-        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
-        PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
-        partitionContext.removeMemberStatsContext(memberId);
-
-        if (partitionContext.removeTerminationPendingMember(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member is removed from termination pending members list: "
-                                        + "[member] %s", memberId));
-            }
-        } else if (partitionContext.removePendingMember(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member is removed from pending members list: "
-                                        + "[member] %s", memberId));
-            }
-        } else if (partitionContext.removeActiveMemberById(memberId)) {
-            log.warn(String.format("Member is in the wrong list and it is removed from "
-                                   + "active members list", memberId));
-        } else if (partitionContext.removeObsoleteMember(memberId)) {
-            log.warn(String.format("Member's obsolated timeout has been expired and "
-                                   + "it is removed from obsolated members list", memberId));
-        } else {
-            log.warn(String.format("Member is not available in any of the list active, "
-                                   + "pending and termination pending", memberId));
-        }
-
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Member stat context has been removed successfully: "
-                                   + "[member] %s", memberId));
-        }
-    }
-
-    @Override
-    public void handleClusterRemovedEvent(
-            ClusterRemovedEvent clusterRemovedEvent) {
-
-    }
-
-    private String getNetworkPartitionIdByMemberId(String memberId) {
-        for (Service service : TopologyManager.getTopology().getServices()) {
-            for (Cluster cluster : service.getClusters()) {
-                if (cluster.memberExists(memberId)) {
-                    return cluster.getMember(memberId).getNetworkPartitionId();
-                }
-            }
-        }
-        return null;
-    }
-
-    private Member getMemberByMemberId(String memberId) {
-        try {
-            TopologyManager.acquireReadLock();
-            for (Service service : TopologyManager.getTopology().getServices()) {
-                for (Cluster cluster : service.getClusters()) {
-                    if (cluster.memberExists(memberId)) {
-                        return cluster.getMember(memberId);
-                    }
-                }
-            }
-            return null;
-        } finally {
-            TopologyManager.releaseReadLock();
-        }
-    }
-
-    public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
-        log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
-        String networkPartitionId = member.getNetworkPartitionId();
-        if (networkPartitionCtxts.containsKey(networkPartitionId)) {
-            log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
-            return networkPartitionCtxts.get(networkPartitionId);
-        }
-        log.info("returning null getNetworkPartitionCtxt");
-        return null;
-    }
-
-    public String getPartitionOfMember(String memberId) {
-        for (Service service : TopologyManager.getTopology().getServices()) {
-            for (Cluster cluster : service.getClusters()) {
-                if (cluster.memberExists(memberId)) {
-                    return cluster.getMember(memberId).getPartitionId();
-                }
-            }
-        }
-        return null;
-    }
-
-    public DeploymentPolicy getDeploymentPolicy() {
-        return deploymentPolicy;
-    }
-
-    public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
-        this.deploymentPolicy = deploymentPolicy;
-    }
-
-    public AutoscalePolicy getAutoscalePolicy() {
-        return autoscalePolicy;
-    }
-
-    public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
-        this.autoscalePolicy = autoscalePolicy;
-    }
-
-    public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
-        return networkPartitionCtxts;
-    }
-
-    public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
-        return networkPartitionCtxts.get(networkPartitionId);
-    }
-
-    public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) {
-        this.networkPartitionCtxts = partitionCtxt;
-    }
-
-    public boolean partitionCtxtAvailable(String partitionId) {
-        return networkPartitionCtxts.containsKey(partitionId);
-    }
-
-    public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
-        this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
-    }
-
-    public NetworkPartitionContext getPartitionCtxt(String id) {
-        return this.networkPartitionCtxts.get(id);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
deleted file mode 100644
index 3e6cddc..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMLbClusterMonitor extends VMClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
-
-    public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
-                              AutoscalePolicy autoscalePolicy) {
-        super(clusterId, serviceId,
-              new AutoscalerRuleEvaluator(
-                      StratosConstants.VM_MIN_CHECK_DROOL_FILE,
-                      StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
-              deploymentPolicy, autoscalePolicy,
-              new ConcurrentHashMap<String, NetworkPartitionContext>());
-        readConfigurations();
-    }
-
-    @Override
-    public void run() {
-
-        while (!isDestroyed()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cluster monitor is running.. " + this.toString());
-            }
-            try {
-                if (!ClusterStatus.Inactive.equals(status)) {
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("LB Cluster monitor is suspended as the cluster is in " +
-                                ClusterStatus.Inactive + " mode......");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
-            }
-            try {
-                Thread.sleep(getMonitorIntervalMilliseconds());
-            } catch (InterruptedException ignore) {
-            }
-        }
-    }
-
-    @Override
-    protected void monitor() {
-        // TODO make this concurrent
-        for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-
-            // minimum check per partition
-            for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
-                    .values()) {
-
-                if (partitionContext != null) {
-                    getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                    getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Running minimum check for partition %s ",
-                                                partitionContext.getPartitionId()));
-                    }
-
-                    minCheckFactHandle =
-                            AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
-                                                                     minCheckFactHandle,
-                                                                     partitionContext);
-                    // start only in the first partition context
-                    break;
-                }
-
-            }
-
-        }
-    }
-
-    @Override
-    public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getMinCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        stopScheduler();
-        if (log.isDebugEnabled()) {
-            log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
-        }
-    }
-
-    @Override
-    protected void readConfigurations() {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = conf.getInt(AutoScalerConstants.VMLb_Cluster_MONITOR_INTERVAL, 90000);
-        setMonitorIntervalMilliseconds(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("VMLbClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
-        }
-    }
-
-    @Override
-    public void handleClusterRemovedEvent(
-            ClusterRemovedEvent clusterRemovedEvent) {
-
-        String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
-        String clusterId = clusterRemovedEvent.getClusterId();
-        DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
-        if (depPolicy != null) {
-            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
-                    .getNetworkPartitionLbHolders(depPolicy);
-
-            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
-                // removes lb cluster ids
-                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
-                if (isRemoved) {
-                    log.info("Removed the lb cluster [id]:"
-                             + clusterId
-                             + " reference from Network Partition [id]: "
-                             + networkPartitionLbHolder
-                            .getNetworkPartitionId());
-
-                }
-                if (log.isDebugEnabled()) {
-                    log.debug(networkPartitionLbHolder);
-                }
-
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "VMLbClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + "]";
-    }
-
-    @Override
-    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-        // TODO 
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
deleted file mode 100644
index 9a26b42..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMServiceClusterMonitor extends VMClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(VMServiceClusterMonitor.class);
-    private String lbReferenceType;
-    private boolean hasPrimary;
-
-    public VMServiceClusterMonitor(String clusterId, String serviceId,
-                                   DeploymentPolicy deploymentPolicy,
-                                   AutoscalePolicy autoscalePolicy) {
-        super(clusterId, serviceId,
-              new AutoscalerRuleEvaluator(StratosConstants.VM_MIN_CHECK_DROOL_FILE,
-                                          StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
-              deploymentPolicy, autoscalePolicy,
-              new ConcurrentHashMap<String, NetworkPartitionContext>());
-        readConfigurations();
-    }
-
-    @Override
-    public void run() {
-        while (!isDestroyed()) {
-            try {
-                if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
-                        (this.status == ClusterStatus.Inactive && !hasDependent) ||
-                        !this.hasFaultyMember) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cluster monitor is running.. " + this.toString());
-                    }
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cluster monitor is suspended as the cluster is in " +
-                                ClusterStatus.Inactive + " mode......");
-                    }
-                }
-            } catch (Exception e) {
-                log.error("Cluster monitor: Monitor failed." + this.toString(), e);
-            }
-            try {
-                Thread.sleep(getMonitorIntervalMilliseconds());
-            } catch (InterruptedException ignore) {
-            }
-        }
-    }
-
-    @Override
-    protected void monitor() {
-
-        //TODO make this concurrent
-        for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-            // store primary members in the network partition context
-            List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
-
-            //minimum check per partition
-            for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
-                // store primary members in the partition context
-                List<String> primaryMemberListInPartition = new ArrayList<String>();
-                // get active primary members in this partition context
-                for (MemberContext memberContext : partitionContext.getActiveMembers()) {
-                    if (isPrimaryMember(memberContext)) {
-                        primaryMemberListInPartition.add(memberContext.getMemberId());
-                    }
-                }
-                // get pending primary members in this partition context
-                for (MemberContext memberContext : partitionContext.getPendingMembers()) {
-                    if (isPrimaryMember(memberContext)) {
-                        primaryMemberListInPartition.add(memberContext.getMemberId());
-                    }
-                }
-                primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
-                getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
-                getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
-                getMinCheckKnowledgeSession().setGlobal("primaryMemberCount", primaryMemberListInPartition.size());
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
-                }
-
-                minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession()
-                        , minCheckFactHandle, partitionContext);
-
-            }
-
-            boolean rifReset = networkPartitionContext.isRifReset();
-            boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
-            boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
-            if (log.isDebugEnabled()) {
-                log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
-                          + " flag of loadAverageReset" + loadAverageReset);
-            }
-            if (rifReset || memoryConsumptionReset || loadAverageReset) {
-                getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-                //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
-                getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
-                getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-                getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-                getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-                getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
-                getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
-                getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
-                    log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
-                }
-
-                scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
-                        , scaleCheckFactHandle, networkPartitionContext);
-
-                networkPartitionContext.setRifReset(false);
-                networkPartitionContext.setMemoryConsumptionReset(false);
-                networkPartitionContext.setLoadAverageReset(false);
-            } else if (log.isDebugEnabled()) {
-                log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
-                                        "cycle for network partition %s", networkPartitionContext.getId()));
-            }
-        }
-    }
-
-    private boolean isPrimaryMember(MemberContext memberContext) {
-        Properties props = memberContext.getProperties();
-        if (log.isDebugEnabled()) {
-            log.debug(" Properties [" + props + "] ");
-        }
-        if (props != null && props.getProperties() != null) {
-            for (Property prop : props.getProperties()) {
-                if (prop.getName().equals("PRIMARY")) {
-                    if (Boolean.parseBoolean(prop.getValue())) {
-                        log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
-                                  "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
-    }
-
-    @Override
-    protected void readConfigurations() {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
-        setMonitorIntervalMilliseconds(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
-        }
-    }
-
-    @Override
-    public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getScaleCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        stopScheduler();
-        if (log.isDebugEnabled()) {
-            log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
-               ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
-               ", lbReferenceType=" + lbReferenceType +
-               ", hasPrimary=" + hasPrimary + " ]";
-    }
-
-    public String getLbReferenceType() {
-        return lbReferenceType;
-    }
-
-    public void setLbReferenceType(String lbReferenceType) {
-        this.lbReferenceType = lbReferenceType;
-    }
-
-    public boolean isHasPrimary() {
-        return hasPrimary;
-    }
-
-    public void setHasPrimary(boolean hasPrimary) {
-        this.hasPrimary = hasPrimary;
-    }
-
-    @Override
-    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-        // TODO 
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
index 2bf85a6..cf92a18 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
 import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.Monitor;
 import org.apache.stratos.autoscaler.monitor.MonitorStatusEventBuilder;
 import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor;

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
new file mode 100644
index 0000000..bc23dd4
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.application;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
+import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
+import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
+import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
+import org.apache.stratos.autoscaler.monitor.Monitor;
+import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.Map;
+
+/**
+ * Factory class to get the Monitors.
+ */
+public class ApplicationMonitorFactory {
+    private static final Log log = LogFactory.getLog(ApplicationMonitorFactory.class);
+
+    /**
+     * Factor method used to create relevant monitors based on the given context
+     *
+     * @param context       Application/Group/Cluster context
+     * @param appId         appId of the application which requires to create app monitor
+     * @param parentMonitor parent of the monitor
+     * @return Monitor which can be ApplicationMonitor/GroupMonitor/ClusterMonitor
+     * @throws TopologyInConsistentException throws while traversing thr topology
+     * @throws DependencyBuilderException    throws while building dependency for app monitor
+     * @throws PolicyValidationException     throws while validating the policy associated with cluster
+     * @throws PartitionValidationException  throws while validating the partition used in a cluster
+     */
+    public static Monitor getMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
+            throws TopologyInConsistentException,
+            DependencyBuilderException, PolicyValidationException, PartitionValidationException {
+        Monitor monitor;
+
+        if (context instanceof GroupContext) {
+            monitor = getGroupMonitor(parentMonitor, context, appId);
+        } else if (context instanceof ClusterContext) {
+            monitor = getClusterMonitor(parentMonitor, (ClusterContext) context, appId);
+            //Start the thread
+            Thread th = new Thread((AbstractClusterMonitor) monitor);
+            th.start();
+        } else {
+            monitor = getApplicationMonitor(appId);
+        }
+        return monitor;
+    }
+
+    /**
+     * This will create the GroupMonitor based on given groupId by going thr Topology
+     *
+     * @param parentMonitor parent of the monitor
+     * @param context       groupId of the group
+     * @param appId         appId of the relevant application
+     * @return Group monitor
+     * @throws DependencyBuilderException    throws while building dependency for app monitor
+     * @throws TopologyInConsistentException throws while traversing thr topology
+     */
+    public static Monitor getGroupMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
+            throws DependencyBuilderException,
+            TopologyInConsistentException {
+        GroupMonitor groupMonitor;
+        TopologyManager.acquireReadLockForApplication(appId);
+
+        try {
+            Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId());
+            groupMonitor = new GroupMonitor(group, appId);
+            groupMonitor.setAppId(appId);
+            if(parentMonitor != null) {
+                groupMonitor.setParent(parentMonitor);
+                //Setting the dependent behaviour of the monitor
+                if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
+                    groupMonitor.setHasDependent(true);
+                } else {
+                    groupMonitor.setHasDependent(false);
+                }
+                //TODO make sure when it is async
+
+                if (group.getStatus() != groupMonitor.getStatus()) {
+                    //updating the status, if the group is not in created state when creating group Monitor
+                    //so that groupMonitor will notify the parent (useful when restarting stratos)
+                    groupMonitor.setStatus(group.getStatus());
+                }
+            }
+
+        } finally {
+            TopologyManager.releaseReadLockForApplication(appId);
+
+        }
+        return groupMonitor;
+
+    }
+
+    /**
+     * This will create a new app monitor based on the give appId by getting the
+     * application from Topology
+     *
+     * @param appId appId of the application which requires to create app monitor
+     * @return ApplicationMonitor
+     * @throws DependencyBuilderException    throws while building dependency for app monitor
+     * @throws TopologyInConsistentException throws while traversing thr topology
+     */
+    public static ApplicationMonitor getApplicationMonitor(String appId)
+            throws DependencyBuilderException,
+            TopologyInConsistentException {
+        ApplicationMonitor applicationMonitor;
+        TopologyManager.acquireReadLockForApplication(appId);
+        try {
+            Application application = TopologyManager.getTopology().getApplication(appId);
+            if (application != null) {
+                applicationMonitor = new ApplicationMonitor(application);
+                applicationMonitor.setHasDependent(false);
+
+            } else {
+                String msg = "[Application] " + appId + " cannot be found in the Topology";
+                throw new TopologyInConsistentException(msg);
+            }
+        } finally {
+            TopologyManager.releaseReadLockForApplication(appId);
+        }
+
+        return applicationMonitor;
+
+    }
+
+    /**
+     * Updates ClusterContext for given cluster
+     *
+     * @param parentMonitor parent of the monitor
+     * @param context
+     * @return ClusterMonitor - Updated ClusterContext
+     * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
+     * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
+     */
+    public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
+                                                   ClusterContext context, String appId)
+            throws PolicyValidationException,
+            PartitionValidationException,
+            TopologyInConsistentException {
+        //Retrieving the Cluster from Topology
+        String clusterId = context.getId();
+        String serviceName = context.getServiceName();
+
+        Cluster cluster;
+        AbstractClusterMonitor clusterMonitor;
+        //acquire read lock for the service and cluster
+        TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+        try {
+            Topology topology = TopologyManager.getTopology();
+            if (topology.serviceExists(serviceName)) {
+                Service service = topology.getService(serviceName);
+                if (service.clusterExists(clusterId)) {
+                    cluster = service.getCluster(clusterId);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Dependency check starting the [cluster]" + clusterId);
+                    }
+                    // startClusterMonitor(this, cluster);
+                    //context.setCurrentStatus(Status.Created);
+                } else {
+                    String msg = "[Cluster] " + clusterId + " cannot be found in the " +
+                            "Topology for [service] " + serviceName;
+                    throw new TopologyInConsistentException(msg);
+                }
+            } else {
+                String msg = "[Service] " + serviceName + " cannot be found in the Topology";
+                throw new TopologyInConsistentException(msg);
+
+            }
+
+
+            clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
+            if (clusterMonitor instanceof VMClusterMonitor) {
+                return (VMClusterMonitor) clusterMonitor;
+            } else if (clusterMonitor != null) {
+                log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+        }
+    }
+
+
+    private static Properties convertMemberPropsToMemberContextProps(
+            java.util.Properties properties) {
+        Properties props = new Properties();
+        for (Map.Entry<Object, Object> e : properties.entrySet()) {
+            Property prop = new Property();
+            prop.setName((String) e.getKey());
+            prop.setValue((String) e.getValue());
+            props.addProperties(prop);
+        }
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
new file mode 100644
index 0000000..f043f51
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.Monitor;
+import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
+/*
+ * Every cluster monitor, which are monitoring a cluster, should extend this class.
+ */
+public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
+
+    private String clusterId;
+    private String serviceId;
+    protected ClusterStatus status;
+    private int monitoringIntervalMilliseconds;
+
+    protected FactHandle minCheckFactHandle;
+    protected FactHandle scaleCheckFactHandle;
+    private StatefulKnowledgeSession minCheckKnowledgeSession;
+    private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+    private boolean isDestroyed;
+
+    private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+    protected boolean hasFaultyMember = false;
+
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+    protected AbstractClusterMonitor(String clusterId, String serviceId,
+                                     AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+        super();
+        this.clusterId = clusterId;
+        this.serviceId = serviceId;
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+        this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
+        this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
+    }
+
+    protected abstract void readConfigurations();
+
+    public void startScheduler() {
+        scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+    }
+
+    protected void stopScheduler() {
+        scheduler.shutdownNow();
+    }
+
+    protected abstract void monitor();
+
+    public abstract void destroy();
+
+    //handle health events
+    public abstract void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent);
+
+    public abstract void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+    public abstract void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+    public abstract void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+    public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+    public abstract void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+    public abstract void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+    public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+    public abstract void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+    public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+    public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+    public abstract void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+    public abstract void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+    public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+    //handle topology events
+    public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+    public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+    public abstract void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent);
+
+    public abstract void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+    public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+    public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+    public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void setStatus(ClusterStatus status) {
+        this.status = status;
+    }
+
+    public ClusterStatus getStatus() {
+        return status;
+    }
+
+    public String getServiceId() {
+        return serviceId;
+    }
+
+    public void setServiceId(String serviceId) {
+        this.serviceId = serviceId;
+    }
+
+    public int getMonitorIntervalMilliseconds() {
+        return monitoringIntervalMilliseconds;
+    }
+
+    public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+        this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+    }
+
+    public FactHandle getMinCheckFactHandle() {
+        return minCheckFactHandle;
+    }
+
+    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+        this.minCheckFactHandle = minCheckFactHandle;
+    }
+
+    public FactHandle getScaleCheckFactHandle() {
+        return scaleCheckFactHandle;
+    }
+
+    public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+        this.scaleCheckFactHandle = scaleCheckFactHandle;
+    }
+
+    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+        return minCheckKnowledgeSession;
+    }
+
+    public void setMinCheckKnowledgeSession(
+            StatefulKnowledgeSession minCheckKnowledgeSession) {
+        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+    }
+
+    public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+        return scaleCheckKnowledgeSession;
+    }
+
+    public void setScaleCheckKnowledgeSession(
+            StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+        this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+    }
+
+    public boolean isDestroyed() {
+        return isDestroyed;
+    }
+
+    public void setDestroyed(boolean isDestroyed) {
+        this.isDestroyed = isDestroyed;
+    }
+
+    public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+        return autoscalerRuleEvaluator;
+    }
+
+    public void setAutoscalerRuleEvaluator(
+            AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    }
+
+
+    @Override
+    public void onParentEvent(MonitorStatusEvent statusEvent) {
+        // send the ClusterTerminating event
+        if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+                ApplicationStatus.Terminating) {
+            StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
+        }
+    }
+
+    @Override
+    public void onChildEvent(MonitorStatusEvent statusEvent) {
+
+    }
+
+    @Override
+    public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
+
+    }
+
+    @Override
+    public void onEvent(MonitorScalingEvent scalingEvent) {
+
+    }
+
+    public void setHasFaultyMember(boolean hasFaultyMember) {
+        this.hasFaultyMember = hasFaultyMember;
+    }
+
+    public boolean isHasFaultyMember() {
+        return hasFaultyMember;
+    }
+
+    public abstract void terminateAllMembers();
+}


[2/5] Adding autoscaler topology event listeners introduced by service grouping

Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
new file mode 100644
index 0000000..e286187
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.partition.PartitionGroup;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.util.Constants;
+
+/*
+ * Factory class for creating cluster monitors.
+ */
+public class ClusterMonitorFactory {
+
+    private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+    /**
+     * @param cluster the cluster to be monitored
+     * @return the created cluster monitor
+     * @throws PolicyValidationException    when deployment policy is not valid
+     * @throws PartitionValidationException when partition is not valid
+     */
+    public static AbstractClusterMonitor getMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+
+        AbstractClusterMonitor clusterMonitor;
+        if (cluster.isKubernetesCluster()) {
+            clusterMonitor = getDockerServiceClusterMonitor(cluster);
+        } else if (cluster.isLbCluster()) {
+            clusterMonitor = getVMLbClusterMonitor(cluster);
+        } else {
+            clusterMonitor = getVMServiceClusterMonitor(cluster);
+        }
+
+        return clusterMonitor;
+    }
+
+    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Deployment policy name: " + deploymentPolicyName);
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy =
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
+        DeploymentPolicy deploymentPolicy =
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
+
+        if (deploymentPolicy == null) {
+            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
+        if (allPartitions == null) {
+            String msg =
+                    "Deployment Policy's Partitions are null. Policy name: " +
+                    deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
+
+        VMServiceClusterMonitor clusterMonitor =
+                new VMServiceClusterMonitor(cluster.getClusterId(),
+                                            cluster.getServiceName(),
+                                            deploymentPolicy, policy);
+        clusterMonitor.setStatus(ClusterStatus.Created);
+
+        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
+
+            for (Partition partition : partitionGroup.getPartitions()) {
+                PartitionContext partitionContext = new PartitionContext(partition);
+                partitionContext.setServiceName(cluster.getServiceName());
+                partitionContext.setProperties(cluster.getProperties());
+                partitionContext.setNetworkPartitionId(partitionGroup.getId());
+
+                for (Member member : cluster.getMembers()) {
+                    String memberId = member.getMemberId();
+                    if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
+                        MemberContext memberContext = new MemberContext();
+                        memberContext.setClusterId(member.getClusterId());
+                        memberContext.setMemberId(memberId);
+                        memberContext.setInitTime(member.getInitTime());
+                        memberContext.setPartition(partition);
+                        memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
+
+                        if (MemberStatus.Activated.equals(member.getStatus())) {
+                        	if (log.isDebugEnabled()) {
+                        		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+            					log.debug(msg);
+            				}
+                            partitionContext.addActiveMember(memberContext);
+//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+//                            partitionContext.incrementCurrentActiveMemberCount(1);
+
+                        } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
+                        	if (log.isDebugEnabled()) {
+                        		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+            					log.debug(msg);
+            				}
+                            partitionContext.addPendingMember(memberContext);
+
+//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+                        } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+//                            partitionContext.addFaultyMember(memberId);
+                        }
+                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has been added: [member] %s", memberId));
+                        }
+                    }
+
+                }
+                networkPartitionContext.addPartitionContext(partitionContext);
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Partition context has been added: [partition] %s",
+                                           partitionContext.getPartitionId()));
+                }
+            }
+
+            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Network partition context has been added: [network partition] %s",
+                                       networkPartitionContext.getId()));
+            }
+        }
+
+
+        // find lb reference type
+        java.util.Properties props = cluster.getProperties();
+
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+            clusterMonitor.setLbReferenceType(value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
+            }
+        }
+
+        // set hasPrimary property
+        // hasPrimary is true if there are primary members available in that cluster
+        clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
+
+        log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
+        return clusterMonitor;
+    }
+
+    private static Properties convertMemberPropsToMemberContextProps(
+            java.util.Properties properties) {
+        Properties props = new Properties();
+        for (Map.Entry<Object, Object> e : properties.entrySet()) {
+            Property prop = new Property();
+            prop.setName((String) e.getKey());
+            prop.setValue((String) e.getValue());
+            props.addProperties(prop);
+        }
+        return props;
+    }
+
+
+    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Deployment policy name: " + deploymentPolicyName);
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy =
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
+        DeploymentPolicy deploymentPolicy =
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
+
+        if (deploymentPolicy == null) {
+            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        String clusterId = cluster.getClusterId();
+        VMLbClusterMonitor clusterMonitor =
+                new VMLbClusterMonitor(clusterId,
+                                       cluster.getServiceName(),
+                                       deploymentPolicy, policy);
+        clusterMonitor.setStatus(ClusterStatus.Created);
+        // partition group = network partition context
+        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+            NetworkPartitionLbHolder networkPartitionLbHolder =
+                    PartitionManager.getInstance()
+                            .getNetworkPartitionLbHolder(partitionGroup.getId());
+//                                                              PartitionManager.getInstance()
+//                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
+            // FIXME pick a random partition
+            Partition partition =
+                    partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+            PartitionContext partitionContext = new PartitionContext(partition);
+            partitionContext.setServiceName(cluster.getServiceName());
+            partitionContext.setProperties(cluster.getProperties());
+            partitionContext.setNetworkPartitionId(partitionGroup.getId());
+            partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
+
+            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
+            for (Member member : cluster.getMembers()) {
+                String memberId = member.getMemberId();
+                if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
+                    MemberContext memberContext = new MemberContext();
+                    memberContext.setClusterId(member.getClusterId());
+                    memberContext.setMemberId(memberId);
+                    memberContext.setPartition(partition);
+                    memberContext.setInitTime(member.getInitTime());
+
+                    if (MemberStatus.Activated.equals(member.getStatus())) {
+                    	if (log.isDebugEnabled()) {
+                    		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+        					log.debug(msg);
+        				}
+                        partitionContext.addActiveMember(memberContext);
+//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+//                        partitionContext.incrementCurrentActiveMemberCount(1);
+                    } else if (MemberStatus.Created.equals(member.getStatus()) ||
+                               MemberStatus.Starting.equals(member.getStatus())) {
+                    	if (log.isDebugEnabled()) {
+                    		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+        					log.debug(msg);
+        				}
+                        partitionContext.addPendingMember(memberContext);
+//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+                    } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+//                        partitionContext.addFaultyMember(memberId);
+                    }
+
+                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Member stat context has been added: [member] %s", memberId));
+                    }
+                }
+
+            }
+            networkPartitionContext.addPartitionContext(partitionContext);
+
+            // populate lb cluster id in network partition context.
+            java.util.Properties props = cluster.getProperties();
+
+            // get service type of load balanced cluster
+            String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
+
+            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+                String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+
+                if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
+                    networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
+
+                } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
+                    String serviceName = cluster.getServiceName();
+                    // TODO: check if this is correct
+                    networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
+
+                    if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
+                        networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
+                        }
+                    }
+                }
+            }
+
+            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+        }
+
+        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
+        return clusterMonitor;
+    }
+
+    /**
+     * @param cluster - the cluster which needs to be monitored
+     * @return - the cluster monitor
+     */
+    private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
+            throws PolicyValidationException {
+
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        if (log.isDebugEnabled()) {
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
+        
+        if (policy == null) {
+            String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+        
+        java.util.Properties props = cluster.getProperties();
+        String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
+        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+                                                                                      cluster.getClusterId());
+
+        String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
+        if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
+            int minReplicas = Integer.parseInt(minReplicasProperty);
+            kubernetesClusterCtxt.setMinReplicas(minReplicas);
+        }
+
+        String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
+        if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
+            int maxReplicas = Integer.parseInt(maxReplicasProperty);
+            kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
+        }
+
+        KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+                kubernetesClusterCtxt,
+                cluster.getClusterId(),
+                cluster.getServiceName(),
+                policy.getId());
+
+        dockerClusterMonitor.setStatus(ClusterStatus.Created);
+
+        //populate the members after restarting        
+        for (Member member : cluster.getMembers()) {
+            String memberId = member.getMemberId();
+            String clusterId = member.getClusterId();
+            MemberContext memberContext = new MemberContext();
+            memberContext.setMemberId(memberId);
+            memberContext.setClusterId(clusterId);
+            memberContext.setInitTime(member.getInitTime());
+            
+            // if there is at least one member in the topology, that means service has been created already
+            // this is to avoid calling startContainer() method again
+            kubernetesClusterCtxt.setServiceClusterCreated(true);
+            
+            if (MemberStatus.Activated.equals(member.getStatus())) {
+            	if (log.isDebugEnabled()) {
+            		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+					log.debug(msg);
+				}
+                dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+            } else if (MemberStatus.Created.equals(member.getStatus())
+                       || MemberStatus.Starting.equals(member.getStatus())) {
+            	if (log.isDebugEnabled()) {
+            		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+					log.debug(msg);
+				}
+                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+            }
+            
+            kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member stat context has been added: [member] %s", memberId));
+            }
+        }
+
+        // find lb reference type
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+            dockerClusterMonitor.setLbReferenceType(value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
+            }
+        }
+
+        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
+        return dockerClusterMonitor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
new file mode 100644
index 0000000..39fbd46
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/*
+ * Every kubernetes cluster monitor should extend this class
+ */
+public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
+
+    private KubernetesClusterContext kubernetesClusterCtxt;
+    protected String autoscalePolicyId;
+
+    protected KubernetesClusterMonitor(String clusterId, String serviceId,
+                                       KubernetesClusterContext kubernetesClusterContext,
+                                       AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+                                       String autoscalePolicyId) {
+
+        super(clusterId, serviceId, autoscalerRuleEvaluator);
+        this.kubernetesClusterCtxt = kubernetesClusterContext;
+        this.autoscalePolicyId = autoscalePolicyId;
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        float value = averageLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        float value = averageRequestsInFlightEvent.getValue();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        float value = memberAverageLoadAverageEvent.getValue();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+    	// kill the container
+        String memberId = memberFaultEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        if (null == member) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+            }
+            return;
+        }
+        if (!member.isActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member activated event has not received for the member %s. "
+                                        + "Therefore ignoring" + " the member fault health stat", memberId));
+            }
+            return;
+        }
+        
+        if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Could not find the active member in kubernetes cluster context, "
+                                        + "[member] %s ", memberId));
+            }
+            return;
+        }
+        // terminate the faulty member
+        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+        try {
+            ccClient.terminateContainer(memberId);
+            // remove from active member list
+            getKubernetesClusterCtxt().removeActiveMemberById(memberId);
+            if (log.isInfoEnabled()) {
+                String clusterId = memberFaultEvent.getClusterId();
+                String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+				log.info(String.format("Faulty member is terminated and removed from the active members list: "
+                                       + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
+            }
+        } catch (TerminationException e) {
+            String msg = "Cannot delete a container " + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        KubernetesClusterContext kubernetesClusterContext;
+        kubernetesClusterContext = getKubernetesClusterCtxt();
+        String memberId = memberActivatedEvent.getMemberId();
+        kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been added successfully: "
+                                   + "[member] %s", memberId));
+        }
+        kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+        // we will only receive member terminated event
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+    	// we will only receive member terminated event
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        String memberId = memberTerminatedEvent.getMemberId();
+        if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is removed from "
+                                   + "active members list", memberId));
+        } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+            log.warn(String.format("Member's obsolated timeout has been expired and "
+                                   + "it is removed from obsolated members list", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list active, "
+                                   + "pending and termination pending", memberId));
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been removed successfully: "
+                                   + "[member] %s", memberId));
+        }
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+    	getKubernetesClusterCtxt().getPendingMembers().clear();
+    	getKubernetesClusterCtxt().getActiveMembers().clear();
+    	getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
+    	getKubernetesClusterCtxt().getObsoletedMembers().clear();
+    }
+
+    public KubernetesClusterContext getKubernetesClusterCtxt() {
+        return kubernetesClusterCtxt;
+    }
+
+    public void setKubernetesClusterCtxt(
+            KubernetesClusterContext kubernetesClusterCtxt) {
+        this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+    }
+
+    public AutoscalePolicy getAutoscalePolicy() {
+        return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
+    }
+
+    private Member getMemberByMemberId(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for (Service service : TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    @Override
+    public void terminateAllMembers() {
+        try {
+            CloudControllerClient.getInstance().terminateAllContainers(getKubernetesClusterCtxt().getClusterId());
+        } catch (TerminationException e) {
+            log.error(String.format("Could not terminate containers: [cluster-id] %s",
+                    getKubernetesClusterCtxt().getClusterId()), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
new file mode 100644
index 0000000..2615651
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.List;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/*
+ * It is monitoring a kubernetes service cluster periodically.
+ */
+public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
+
+    private String lbReferenceType;
+
+    public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
+                                           String serviceClusterID, String serviceId,
+                                           String autoscalePolicyId) {
+        super(serviceClusterID, serviceId, kubernetesClusterCtxt,
+              new AutoscalerRuleEvaluator(
+                      StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
+                      StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
+              autoscalePolicyId);
+        readConfigurations();
+    }
+
+    @Override
+    public void run() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
+        }
+        try {
+            if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
+                monitor();
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+                            + getStatus() + "state");
+                }
+            }
+        } catch (Exception e) {
+            log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
+                      e);
+        }
+    }
+
+    @Override
+    protected void monitor() {
+        minCheck();
+        scaleCheck();
+    }
+
+    private void scaleCheck() {
+        boolean rifReset = getKubernetesClusterCtxt().isRifReset();
+        boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
+        boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
+        if (log.isDebugEnabled()) {
+            log.debug("flag of rifReset : " + rifReset
+                      + " flag of memoryConsumptionReset : "
+                      + memoryConsumptionReset + " flag of loadAverageReset : "
+                      + loadAverageReset);
+        }
+        String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+        String clusterId = getClusterId();
+        if (rifReset || memoryConsumptionReset || loadAverageReset) {
+            getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
+            getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
+            getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+            getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+            getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                        "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+            }
+            scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
+                    getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
+            getKubernetesClusterCtxt().setRifReset(false);
+            getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
+            getKubernetesClusterCtxt().setLoadAverageReset(false);
+        } else if (log.isDebugEnabled()) {
+            log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
+                                    + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
+        }
+    }
+
+	private void minCheck() {
+		getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+		String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format(
+                    "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+        }
+		minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
+				getMinCheckKnowledgeSession(), minCheckFactHandle,
+				getKubernetesClusterCtxt());
+	}
+
+	@Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "KubernetesServiceClusterMonitor "
+               + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
+               + ", clusterId=" + getClusterId()
+               + ", serviceId=" + getServiceId() + "]";
+    }
+
+    public String getLbReferenceType() {
+        return lbReferenceType;
+    }
+
+    public void setLbReferenceType(String lbReferenceType) {
+        this.lbReferenceType = lbReferenceType;
+    }
+
+    @Override
+    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+        
+        if (properties != null) {
+            Property[] propertyArray = properties.getProperties();
+            if (propertyArray == null) {
+                return;
+            }
+            List<Property> propertyList = Arrays.asList(propertyArray);
+            
+            for (Property property : propertyList) {
+                String key = property.getName();
+                String value = property.getValue();
+                
+                if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
+                    int min = Integer.parseInt(value);
+                    int max = getKubernetesClusterCtxt().getMaxReplicas();
+                    if (min > max) {
+                        String msg = String.format("%s should be less than %s . But %s is not less than %s.", 
+                                StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
+                        log.error(msg);
+                        throw new InvalidArgumentException(msg);
+                    }
+                    getKubernetesClusterCtxt().setMinReplicas(min);
+                    break;
+                }
+            }
+            
+        }
+    }
+}
\ No newline at end of file


[4/5] Adding autoscaler topology event listeners introduced by service grouping

Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
deleted file mode 100644
index d7238bf..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
-
-/*
- * Factory class for creating cluster monitors.
- */
-public class ClusterMonitorFactory {
-
-    private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
-    /**
-     * @param cluster the cluster to be monitored
-     * @return the created cluster monitor
-     * @throws PolicyValidationException    when deployment policy is not valid
-     * @throws PartitionValidationException when partition is not valid
-     */
-    public static AbstractClusterMonitor getMonitor(Cluster cluster)
-            throws PolicyValidationException, PartitionValidationException {
-
-        AbstractClusterMonitor clusterMonitor;
-        if (cluster.isKubernetesCluster()) {
-            clusterMonitor = getDockerServiceClusterMonitor(cluster);
-        } else if (cluster.isLbCluster()) {
-            clusterMonitor = getVMLbClusterMonitor(cluster);
-        } else {
-            clusterMonitor = getVMServiceClusterMonitor(cluster);
-        }
-
-        return clusterMonitor;
-    }
-
-    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
-            throws PolicyValidationException, PartitionValidationException {
-        // FIXME fix the following code to correctly update
-        // AutoscalerContext context = AutoscalerContext.getInstance();
-        if (null == cluster) {
-            return null;
-        }
-
-        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-        String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
-        if (log.isDebugEnabled()) {
-            log.debug("Deployment policy name: " + deploymentPolicyName);
-            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-        }
-
-        AutoscalePolicy policy =
-                PolicyManager.getInstance()
-                        .getAutoscalePolicy(autoscalePolicyName);
-        DeploymentPolicy deploymentPolicy =
-                PolicyManager.getInstance()
-                        .getDeploymentPolicy(deploymentPolicyName);
-
-        if (deploymentPolicy == null) {
-            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-
-        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-        if (allPartitions == null) {
-            String msg =
-                    "Deployment Policy's Partitions are null. Policy name: " +
-                    deploymentPolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-
-        CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-
-        VMServiceClusterMonitor clusterMonitor =
-                new VMServiceClusterMonitor(cluster.getClusterId(),
-                                            cluster.getServiceName(),
-                                            deploymentPolicy, policy);
-        clusterMonitor.setStatus(ClusterStatus.Created);
-
-        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
-            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                                                                                          partitionGroup.getPartitionAlgo(),
-                                                                                          partitionGroup.getPartitions());
-
-            for (Partition partition : partitionGroup.getPartitions()) {
-                PartitionContext partitionContext = new PartitionContext(partition);
-                partitionContext.setServiceName(cluster.getServiceName());
-                partitionContext.setProperties(cluster.getProperties());
-                partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
-                for (Member member : cluster.getMembers()) {
-                    String memberId = member.getMemberId();
-                    if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
-                        MemberContext memberContext = new MemberContext();
-                        memberContext.setClusterId(member.getClusterId());
-                        memberContext.setMemberId(memberId);
-                        memberContext.setInitTime(member.getInitTime());
-                        memberContext.setPartition(partition);
-                        memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
-                        if (MemberStatus.Activated.equals(member.getStatus())) {
-                        	if (log.isDebugEnabled()) {
-                        		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-            					log.debug(msg);
-            				}
-                            partitionContext.addActiveMember(memberContext);
-//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                            partitionContext.incrementCurrentActiveMemberCount(1);
-
-                        } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
-                        	if (log.isDebugEnabled()) {
-                        		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-            					log.debug(msg);
-            				}
-                            partitionContext.addPendingMember(memberContext);
-
-//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                        } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-//                            partitionContext.addFaultyMember(memberId);
-                        }
-                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been added: [member] %s", memberId));
-                        }
-                    }
-
-                }
-                networkPartitionContext.addPartitionContext(partitionContext);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Partition context has been added: [partition] %s",
-                                           partitionContext.getPartitionId()));
-                }
-            }
-
-            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Network partition context has been added: [network partition] %s",
-                                       networkPartitionContext.getId()));
-            }
-        }
-
-
-        // find lb reference type
-        java.util.Properties props = cluster.getProperties();
-
-        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
-            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-            clusterMonitor.setLbReferenceType(value);
-            if (log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: " + value);
-            }
-        }
-
-        // set hasPrimary property
-        // hasPrimary is true if there are primary members available in that cluster
-        clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
-
-        log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
-        return clusterMonitor;
-    }
-
-    private static Properties convertMemberPropsToMemberContextProps(
-            java.util.Properties properties) {
-        Properties props = new Properties();
-        for (Map.Entry<Object, Object> e : properties.entrySet()) {
-            Property prop = new Property();
-            prop.setName((String) e.getKey());
-            prop.setValue((String) e.getValue());
-            props.addProperties(prop);
-        }
-        return props;
-    }
-
-
-    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
-            throws PolicyValidationException, PartitionValidationException {
-        // FIXME fix the following code to correctly update
-        // AutoscalerContext context = AutoscalerContext.getInstance();
-        if (null == cluster) {
-            return null;
-        }
-
-        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-        String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
-        if (log.isDebugEnabled()) {
-            log.debug("Deployment policy name: " + deploymentPolicyName);
-            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-        }
-
-        AutoscalePolicy policy =
-                PolicyManager.getInstance()
-                        .getAutoscalePolicy(autoscalePolicyName);
-        DeploymentPolicy deploymentPolicy =
-                PolicyManager.getInstance()
-                        .getDeploymentPolicy(deploymentPolicyName);
-
-        if (deploymentPolicy == null) {
-            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-
-        String clusterId = cluster.getClusterId();
-        VMLbClusterMonitor clusterMonitor =
-                new VMLbClusterMonitor(clusterId,
-                                       cluster.getServiceName(),
-                                       deploymentPolicy, policy);
-        clusterMonitor.setStatus(ClusterStatus.Created);
-        // partition group = network partition context
-        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
-            NetworkPartitionLbHolder networkPartitionLbHolder =
-                    PartitionManager.getInstance()
-                            .getNetworkPartitionLbHolder(partitionGroup.getId());
-//                                                              PartitionManager.getInstance()
-//                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
-            // FIXME pick a random partition
-            Partition partition =
-                    partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
-            PartitionContext partitionContext = new PartitionContext(partition);
-            partitionContext.setServiceName(cluster.getServiceName());
-            partitionContext.setProperties(cluster.getProperties());
-            partitionContext.setNetworkPartitionId(partitionGroup.getId());
-            partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
-
-            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                                                                                          partitionGroup.getPartitionAlgo(),
-                                                                                          partitionGroup.getPartitions());
-            for (Member member : cluster.getMembers()) {
-                String memberId = member.getMemberId();
-                if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
-                    MemberContext memberContext = new MemberContext();
-                    memberContext.setClusterId(member.getClusterId());
-                    memberContext.setMemberId(memberId);
-                    memberContext.setPartition(partition);
-                    memberContext.setInitTime(member.getInitTime());
-
-                    if (MemberStatus.Activated.equals(member.getStatus())) {
-                    	if (log.isDebugEnabled()) {
-                    		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-        					log.debug(msg);
-        				}
-                        partitionContext.addActiveMember(memberContext);
-//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                        partitionContext.incrementCurrentActiveMemberCount(1);
-                    } else if (MemberStatus.Created.equals(member.getStatus()) ||
-                               MemberStatus.Starting.equals(member.getStatus())) {
-                    	if (log.isDebugEnabled()) {
-                    		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-        					log.debug(msg);
-        				}
-                        partitionContext.addPendingMember(memberContext);
-//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                    } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-//                        partitionContext.addFaultyMember(memberId);
-                    }
-
-                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Member stat context has been added: [member] %s", memberId));
-                    }
-                }
-
-            }
-            networkPartitionContext.addPartitionContext(partitionContext);
-
-            // populate lb cluster id in network partition context.
-            java.util.Properties props = cluster.getProperties();
-
-            // get service type of load balanced cluster
-            String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-
-            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
-                String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-
-                if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
-                    networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
-
-                } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
-                    String serviceName = cluster.getServiceName();
-                    // TODO: check if this is correct
-                    networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
-
-                    if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
-                        networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
-                        }
-                    }
-                }
-            }
-
-            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-        }
-
-        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
-        return clusterMonitor;
-    }
-
-    /**
-     * @param cluster - the cluster which needs to be monitored
-     * @return - the cluster monitor
-     */
-    private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
-            throws PolicyValidationException {
-
-        if (null == cluster) {
-            return null;
-        }
-
-        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-        if (log.isDebugEnabled()) {
-            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-        }
-
-        AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-        
-        if (policy == null) {
-            String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-        
-        java.util.Properties props = cluster.getProperties();
-        String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
-                                                                                      cluster.getClusterId());
-
-        String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
-        if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
-            int minReplicas = Integer.parseInt(minReplicasProperty);
-            kubernetesClusterCtxt.setMinReplicas(minReplicas);
-        }
-
-        String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
-        if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
-            int maxReplicas = Integer.parseInt(maxReplicasProperty);
-            kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
-        }
-
-        KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
-                kubernetesClusterCtxt,
-                cluster.getClusterId(),
-                cluster.getServiceName(),
-                policy.getId());
-
-        dockerClusterMonitor.setStatus(ClusterStatus.Created);
-
-        //populate the members after restarting        
-        for (Member member : cluster.getMembers()) {
-            String memberId = member.getMemberId();
-            String clusterId = member.getClusterId();
-            MemberContext memberContext = new MemberContext();
-            memberContext.setMemberId(memberId);
-            memberContext.setClusterId(clusterId);
-            memberContext.setInitTime(member.getInitTime());
-            
-            // if there is at least one member in the topology, that means service has been created already
-            // this is to avoid calling startContainer() method again
-            kubernetesClusterCtxt.setServiceClusterCreated(true);
-            
-            if (MemberStatus.Activated.equals(member.getStatus())) {
-            	if (log.isDebugEnabled()) {
-            		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-					log.debug(msg);
-				}
-                dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
-            } else if (MemberStatus.Created.equals(member.getStatus())
-                       || MemberStatus.Starting.equals(member.getStatus())) {
-            	if (log.isDebugEnabled()) {
-            		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-					log.debug(msg);
-				}
-                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-            }
-            
-            kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member stat context has been added: [member] %s", memberId));
-            }
-        }
-
-        // find lb reference type
-        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
-            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-            dockerClusterMonitor.setLbReferenceType(value);
-            if (log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: " + value);
-            }
-        }
-
-        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
-        return dockerClusterMonitor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
deleted file mode 100644
index 0254030..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * Every kubernetes cluster monitor should extend this class
- */
-public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
-
-    private KubernetesClusterContext kubernetesClusterCtxt;
-    protected String autoscalePolicyId;
-
-    protected KubernetesClusterMonitor(String clusterId, String serviceId,
-                                       KubernetesClusterContext kubernetesClusterContext,
-                                       AutoscalerRuleEvaluator autoscalerRuleEvaluator,
-                                       String autoscalePolicyId) {
-
-        super(clusterId, serviceId, autoscalerRuleEvaluator);
-        this.kubernetesClusterCtxt = kubernetesClusterContext;
-        this.autoscalePolicyId = autoscalePolicyId;
-    }
-
-    @Override
-    public void handleAverageLoadAverageEvent(
-            AverageLoadAverageEvent averageLoadAverageEvent) {
-
-        String clusterId = averageLoadAverageEvent.getClusterId();
-        float value = averageLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setAverageLoadAverage(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-
-    }
-
-    @Override
-    public void handleGradientOfLoadAverageEvent(
-            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
-        String clusterId = gradientOfLoadAverageEvent.getClusterId();
-        float value = gradientOfLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setLoadAverageGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfLoadAverageEvent(
-            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
-        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
-        float value = secondDerivativeOfLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setLoadAverageSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleAverageMemoryConsumptionEvent(
-            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
-        String clusterId = averageMemoryConsumptionEvent.getClusterId();
-        float value = averageMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
-                                    + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setAverageMemoryConsumption(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleGradientOfMemoryConsumptionEvent(
-            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
-        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
-        float value = gradientOfMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setMemoryConsumptionGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfMemoryConsumptionEvent(
-            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
-        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
-        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleAverageRequestsInFlightEvent(
-            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
-        float value = averageRequestsInFlightEvent.getValue();
-        String clusterId = averageRequestsInFlightEvent.getClusterId();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setAverageRequestsInFlight(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleGradientOfRequestsInFlightEvent(
-            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
-        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
-        float value = gradientOfRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setRequestsInFlightGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfRequestsInFlightEvent(
-            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
-        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
-        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleMemberAverageMemoryConsumptionEvent(
-            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
-        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberAverageMemoryConsumptionEvent.getValue();
-        memberStatsContext.setAverageMemoryConsumption(value);
-    }
-
-    @Override
-    public void handleMemberGradientOfMemoryConsumptionEvent(
-            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
-        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberGradientOfMemoryConsumptionEvent.getValue();
-        memberStatsContext.setGradientOfMemoryConsumption(value);
-    }
-
-    @Override
-    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
-            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
-    }
-
-    @Override
-    public void handleMemberAverageLoadAverageEvent(
-            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        String memberId = memberAverageLoadAverageEvent.getMemberId();
-        float value = memberAverageLoadAverageEvent.getValue();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        memberStatsContext.setAverageLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberGradientOfLoadAverageEvent(
-            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
-        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberGradientOfLoadAverageEvent.getValue();
-        memberStatsContext.setGradientOfLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberSecondDerivativeOfLoadAverageEvent(
-            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
-        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
-        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-    	// kill the container
-        String memberId = memberFaultEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        if (null == member) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
-            }
-            return;
-        }
-        if (!member.isActive()) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member activated event has not received for the member %s. "
-                                        + "Therefore ignoring" + " the member fault health stat", memberId));
-            }
-            return;
-        }
-        
-        if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Could not find the active member in kubernetes cluster context, "
-                                        + "[member] %s ", memberId));
-            }
-            return;
-        }
-        // terminate the faulty member
-        CloudControllerClient ccClient = CloudControllerClient.getInstance();
-        try {
-            ccClient.terminateContainer(memberId);
-            // remove from active member list
-            getKubernetesClusterCtxt().removeActiveMemberById(memberId);
-            if (log.isInfoEnabled()) {
-                String clusterId = memberFaultEvent.getClusterId();
-                String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-				log.info(String.format("Faulty member is terminated and removed from the active members list: "
-                                       + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
-            }
-        } catch (TerminationException e) {
-            String msg = "Cannot delete a container " + e.getLocalizedMessage();
-            log.error(msg, e);
-        }
-    }
-
-    @Override
-    public void handleMemberStartedEvent(
-            MemberStartedEvent memberStartedEvent) {
-
-    }
-
-    @Override
-    public void handleMemberActivatedEvent(
-            MemberActivatedEvent memberActivatedEvent) {
-
-        KubernetesClusterContext kubernetesClusterContext;
-        kubernetesClusterContext = getKubernetesClusterCtxt();
-        String memberId = memberActivatedEvent.getMemberId();
-        kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Member stat context has been added successfully: "
-                                   + "[member] %s", memberId));
-        }
-        kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
-    }
-
-    @Override
-    public void handleMemberMaintenanceModeEvent(
-            MemberMaintenanceModeEvent maintenanceModeEvent) {
-
-        // no need to do anything here
-        // we will not be receiving this event for containers
-        // we will only receive member terminated event
-    }
-
-    @Override
-    public void handleMemberReadyToShutdownEvent(
-            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
-        // no need to do anything here
-        // we will not be receiving this event for containers
-    	// we will only receive member terminated event
-    }
-
-    @Override
-    public void handleMemberTerminatedEvent(
-            MemberTerminatedEvent memberTerminatedEvent) {
-
-        String memberId = memberTerminatedEvent.getMemberId();
-        if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member is removed from termination pending members list: "
-                                        + "[member] %s", memberId));
-            }
-        } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member is removed from pending members list: "
-                                        + "[member] %s", memberId));
-            }
-        } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
-            log.warn(String.format("Member is in the wrong list and it is removed from "
-                                   + "active members list", memberId));
-        } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
-            log.warn(String.format("Member's obsolated timeout has been expired and "
-                                   + "it is removed from obsolated members list", memberId));
-        } else {
-            log.warn(String.format("Member is not available in any of the list active, "
-                                   + "pending and termination pending", memberId));
-        }
-
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Member stat context has been removed successfully: "
-                                   + "[member] %s", memberId));
-        }
-    }
-
-    @Override
-    public void handleClusterRemovedEvent(
-            ClusterRemovedEvent clusterRemovedEvent) {
-    	getKubernetesClusterCtxt().getPendingMembers().clear();
-    	getKubernetesClusterCtxt().getActiveMembers().clear();
-    	getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
-    	getKubernetesClusterCtxt().getObsoletedMembers().clear();
-    }
-
-    public KubernetesClusterContext getKubernetesClusterCtxt() {
-        return kubernetesClusterCtxt;
-    }
-
-    public void setKubernetesClusterCtxt(
-            KubernetesClusterContext kubernetesClusterCtxt) {
-        this.kubernetesClusterCtxt = kubernetesClusterCtxt;
-    }
-
-    public AutoscalePolicy getAutoscalePolicy() {
-        return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
-    }
-
-    private Member getMemberByMemberId(String memberId) {
-        try {
-            TopologyManager.acquireReadLock();
-            for (Service service : TopologyManager.getTopology().getServices()) {
-                for (Cluster cluster : service.getClusters()) {
-                    if (cluster.memberExists(memberId)) {
-                        return cluster.getMember(memberId);
-                    }
-                }
-            }
-            return null;
-        } finally {
-            TopologyManager.releaseReadLock();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
deleted file mode 100644
index 15b14b6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections.ListUtils;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-import edu.emory.mathcs.backport.java.util.Arrays;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
-
-    private String lbReferenceType;
-
-    public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
-                                           String serviceClusterID, String serviceId,
-                                           String autoscalePolicyId) {
-        super(serviceClusterID, serviceId, kubernetesClusterCtxt,
-              new AutoscalerRuleEvaluator(
-                      StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
-                      StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
-              autoscalePolicyId);
-        readConfigurations();
-    }
-
-    @Override
-    public void run() {
-
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
-        }
-        try {
-            if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
-                monitor();
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-                            + getStatus() + "state");
-                }
-            }
-        } catch (Exception e) {
-            log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
-                      e);
-        }
-    }
-
-    @Override
-    protected void monitor() {
-        minCheck();
-        scaleCheck();
-    }
-
-    private void scaleCheck() {
-        boolean rifReset = getKubernetesClusterCtxt().isRifReset();
-        boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
-        boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
-        if (log.isDebugEnabled()) {
-            log.debug("flag of rifReset : " + rifReset
-                      + " flag of memoryConsumptionReset : "
-                      + memoryConsumptionReset + " flag of loadAverageReset : "
-                      + loadAverageReset);
-        }
-        String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-        String clusterId = getClusterId();
-        if (rifReset || memoryConsumptionReset || loadAverageReset) {
-            getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
-            getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
-            getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-            getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-            getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-            if (log.isDebugEnabled()) {
-                log.debug(String.format(
-                        "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-            }
-            scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
-                    getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
-            getKubernetesClusterCtxt().setRifReset(false);
-            getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
-            getKubernetesClusterCtxt().setLoadAverageReset(false);
-        } else if (log.isDebugEnabled()) {
-            log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
-                                    + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
-        }
-    }
-
-	private void minCheck() {
-		getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-		String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format(
-                    "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-        }
-		minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
-				getMinCheckKnowledgeSession(), minCheckFactHandle,
-				getKubernetesClusterCtxt());
-	}
-
-	@Override
-    public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getScaleCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        stopScheduler();
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
-        }
-    }
-
-    @Override
-    protected void readConfigurations() {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
-        setMonitorIntervalMilliseconds(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KubernetesServiceClusterMonitor "
-               + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
-               + ", clusterId=" + getClusterId()
-               + ", serviceId=" + getServiceId() + "]";
-    }
-
-    public String getLbReferenceType() {
-        return lbReferenceType;
-    }
-
-    public void setLbReferenceType(String lbReferenceType) {
-        this.lbReferenceType = lbReferenceType;
-    }
-
-    @Override
-    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-        
-        if (properties != null) {
-            Property[] propertyArray = properties.getProperties();
-            if (propertyArray == null) {
-                return;
-            }
-            List<Property> propertyList = Arrays.asList(propertyArray);
-            
-            for (Property property : propertyList) {
-                String key = property.getName();
-                String value = property.getValue();
-                
-                if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
-                    int min = Integer.parseInt(value);
-                    int max = getKubernetesClusterCtxt().getMaxReplicas();
-                    if (min > max) {
-                        String msg = String.format("%s should be less than %s . But %s is not less than %s.", 
-                                StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
-                        log.error(msg);
-                        throw new InvalidArgumentException(msg);
-                    }
-                    getKubernetesClusterCtxt().setMinReplicas(min);
-                    break;
-                }
-            }
-            
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
index fa8f425..8aeae94 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
@@ -28,10 +28,9 @@ import org.apache.stratos.autoscaler.grouping.dependency.DependencyBuilder;
 import org.apache.stratos.autoscaler.grouping.dependency.DependencyTree;
 import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
 import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.status.checker.StatusChecker;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
 import org.apache.stratos.messaging.domain.topology.ParentComponent;
 
 import java.util.HashMap;