You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/19 12:23:30 UTC

[4/5] stratos git commit: Removed kubernetes cluster monitors/contexts and renamed vm cluster monitor to cluster monitor

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 75f7a2d..a8c90b3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
@@ -172,8 +172,8 @@ public class AutoscalerHealthStatEventReceiver {
                     }
                     return;
                 }
-                if(monitor instanceof VMClusterMonitor) {
-                    VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+                if(monitor instanceof ClusterMonitor) {
+                    ClusterMonitor vmClusterMonitor = (ClusterMonitor) monitor;
                     vmClusterMonitor.handleAverageRequestsServingCapabilityEvent(averageRequestsServingCapabilityEvent);
                 }
             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 2a6f945..f5875ce 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
 import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
 import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
 import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
@@ -33,7 +33,7 @@ import org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce
 import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
 import org.apache.stratos.autoscaler.monitor.MonitorFactory;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
 import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
@@ -304,7 +304,7 @@ public class AutoscalerTopologyEventReceiver {
                 monitor.notifyParentMonitor(ClusterStatus.Terminated, instanceId);
                 //Removing the instance and instanceContext
                 ClusterInstance instance = (ClusterInstance) monitor.getInstance(instanceId);
-                ((VMClusterContext)monitor.getClusterContext()).
+                ((ClusterContext)monitor.getClusterContext()).
                         getNetworkPartitionCtxt(instance.getNetworkPartitionId()).
                         removeInstanceContext(instanceId);
                 monitor.removeInstance(instanceId);
@@ -446,8 +446,8 @@ public class AutoscalerTopologyEventReceiver {
                            Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
                             if (cluster != null) {
                                 try {
-                                    VMClusterContext clusterContext =
-                                            (VMClusterContext) clusterMonitor.getClusterContext();
+                                    ClusterContext clusterContext =
+                                            (ClusterContext) clusterMonitor.getClusterContext();
                                     if (clusterContext == null) {
                                         clusterContext = ClusterContextFactory.getVMClusterContext(instanceId, cluster,
                                                 clusterMonitor.hasScalingDependents());
@@ -470,7 +470,7 @@ public class AutoscalerTopologyEventReceiver {
                                                 + clusterInstanceCreatedEvent.getClusterId() + " started successfully");
                                     } else {
                                         //monitor already started. Invoking it directly to speed up the process
-                                        ((VMClusterMonitor)clusterMonitor).monitor();
+                                        ((ClusterMonitor)clusterMonitor).monitor();
                                     }
                                 } catch (PolicyValidationException e) {
                                     log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
index a8721a6..b5b1614 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
@@ -31,7 +31,7 @@ import org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce
 import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.component.GroupMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ParentComponentMonitor;
@@ -270,7 +270,7 @@ public class MonitorFactory {
             }
 
             //Creating the instance of the cluster
-            ((VMClusterMonitor) clusterMonitor).createClusterInstance(parentInstanceIds, cluster);
+            ((ClusterMonitor) clusterMonitor).createClusterInstance(parentInstanceIds, cluster);
             //add it to autoscaler context
             AutoscalerContext.getInstance().addClusterMonitor(clusterMonitor);
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
new file mode 100644
index 0000000..24b8f3a
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.client.CloudControllerClient;
+import org.apache.stratos.autoscaler.context.InstanceContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
+import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
+import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
+import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
+import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
+import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
+import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
+import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.ScalingOverMaxEvent;
+import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
+import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
+import org.apache.stratos.common.Properties;
+import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.domain.instance.Instance;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.*;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class ClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(ClusterMonitor.class);
+    private Map<String, ClusterLevelNetworkPartitionContext> networkPartitionIdToClusterLevelNetworkPartitionCtxts;
+    private boolean hasPrimary;
+    private float scalingFactorBasedOnDependencies = 1.0f;
+
+
+    protected ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) {
+        super(cluster, hasScalingDependents, groupScalingEnabledSubtree);
+        this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>();
+        readConfigurations();
+        autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
+        autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
+        autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
+        autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_MIN_CHECK_DROOL_FILE);
+        autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
+
+        this.obsoleteCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
+        this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
+        this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.VM_MIN_CHECK_DROOL_FILE);
+        this.dependentScaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+                StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
+    }
+
+    private static void terminateMember(String memberId) {
+        try {
+            CloudControllerClient.getInstance().terminate(memberId);
+
+        } catch (TerminationException e) {
+            log.error("Unable to terminate member [member id ] " + memberId, e);
+        }
+    }
+
+    private static void createClusterInstance(String serviceType, String clusterId, String alias,
+                                              String instanceId, String partitionId, String networkPartitionId) {
+        CloudControllerClient.getInstance().createClusterInstance(serviceType, clusterId, alias,
+                instanceId, partitionId, networkPartitionId);
+    }
+
+    public void addClusterLevelNWPartitionContext(ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) {
+        networkPartitionIdToClusterLevelNetworkPartitionCtxts.put(clusterLevelNWPartitionCtxt.getId(), clusterLevelNWPartitionCtxt);
+    }
+
+    public ClusterLevelNetworkPartitionContext getClusterLevelNWPartitionContext(String nwPartitionId) {
+        return networkPartitionIdToClusterLevelNetworkPartitionCtxts.get(nwPartitionId);
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        String instanceId = averageLoadAverageEvent.getInstanceId();
+        float value = averageLoadAverageEvent.getValue();
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+
+        ClusterInstanceContext clusterInstanceContext = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        if (null != clusterInstanceContext) {
+            clusterInstanceContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!isDestroyed()) {
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Cluster monitor is running.. " + this.toString());
+                }
+                monitor();
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed." + this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
+            }
+        }
+
+
+    }
+
+    private boolean isPrimaryMember(MemberContext memberContext) {
+        Properties props = AutoscalerUtil.toCommonProperties(memberContext.getProperties());
+        if (log.isDebugEnabled()) {
+            log.debug(" Properties [" + props + "] ");
+        }
+        if (props != null && props.getProperties() != null) {
+            for (Property prop : props.getProperties()) {
+                if (prop.getName().equals("PRIMARY")) {
+                    if (Boolean.parseBoolean(prop.getValue())) {
+                        log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
+                                "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    public synchronized void monitor() {
+
+        for (ClusterLevelNetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts()) {
+
+            final Collection<InstanceContext> clusterInstanceContexts = networkPartitionContext.
+                    getInstanceIdToInstanceContextMap().values();
+
+            for (final InstanceContext pInstanceContext : clusterInstanceContexts) {
+                final ClusterInstanceContext instanceContext = (ClusterInstanceContext) pInstanceContext;
+                ClusterInstance instance = (ClusterInstance) this.instanceIdToInstanceMap.
+                        get(instanceContext.getId());
+
+                if ((instance.getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
+                        (instance.getStatus() == ClusterStatus.Inactive && !hasStartupDependents)
+                                && !this.hasFaultyMember) {
+
+                    Runnable monitoringRunnable = new Runnable() {
+                        @Override
+                        public void run() {
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Monitor is running for [cluster] : " + getClusterId());
+                            }
+                            // store primary members in the cluster instance context
+                            List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+
+                            for (ClusterLevelPartitionContext partitionContext :
+                                                            instanceContext.getPartitionCtxts()) {
+
+                                // get active primary members in this cluster instance context
+                                for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+                                    if (isPrimaryMember(memberContext)) {
+                                        primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                    }
+                                }
+
+                                // get pending primary members in this cluster instance context
+                                for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+                                    if (isPrimaryMember(memberContext)) {
+                                        primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                                    }
+                                }
+
+                                obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+                                        getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext);
+
+                            }
+
+                            getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
+                            getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                            getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                            //FIXME when parent chosen the partition
+                            String paritionAlgo = instanceContext.getPartitionAlgorithm();
+
+                            getMinCheckKnowledgeSession().setGlobal("algorithmName",
+                                    paritionAlgo);
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("Running minimum check for cluster instance %s ",
+                                        instanceContext.getId() + " for the cluster: " + clusterId));
+                            }
+
+                            minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
+                                    minCheckFactHandle, instanceContext);
+
+
+                            //checking the status of the cluster
+                            boolean rifReset = instanceContext.isRifReset();
+                            boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
+                            boolean loadAverageReset = instanceContext.isLoadAverageReset();
+                            boolean averageRequestServedPerInstanceReset
+                                    = instanceContext.isAverageRequestServedPerInstanceReset();
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
+                                        + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
+                                        + " [Is loadAverage Reset] : " + loadAverageReset);
+                            }
+
+                            if (rifReset || memoryConsumptionReset || loadAverageReset) {
+
+                                log.info("Executing scaling rule as statistics have been reset");
+                                ClusterContext vmClusterContext = (ClusterContext) clusterContext;
+
+                                getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                                getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                                getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                                getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+                                getScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                                getScaleCheckKnowledgeSession().setGlobal("algorithmName", paritionAlgo);
+                                getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
+                                        vmClusterContext.getAutoscalePolicy());
+                                getScaleCheckKnowledgeSession().setGlobal("arspiReset",
+                                        averageRequestServedPerInstanceReset);
+                                getScaleCheckKnowledgeSession().setGlobal("primaryMembers",
+                                        primaryMemberListInClusterInstance);
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug(String.format("Running scale check for [cluster instance context] %s ",
+                                            instanceContext.getId()));
+                                    log.debug(" Primary members : " + primaryMemberListInClusterInstance);
+                                }
+
+                                scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
+                                        , scaleCheckFactHandle, instanceContext);
+
+                                instanceContext.setRifReset(false);
+                                instanceContext.setMemoryConsumptionReset(false);
+                                instanceContext.setLoadAverageReset(false);
+                            } else if (log.isDebugEnabled()) {
+                                log.debug(String.format("Scale rule will not run since the LB statistics have not " +
+                                                "received before this cycle for [cluster instance context] %s [cluster] %s",
+                                        instanceContext.getId(), clusterId));
+                            }
+
+                        }
+                    };
+                    monitoringRunnable.run();
+                }
+
+                for (final ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+                    Runnable monitoringRunnable = new Runnable() {
+                        @Override
+                        public void run() {
+                            obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+                                    getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext);
+                        }
+                    };
+
+                    monitoringRunnable.run();
+
+                }
+
+            }
+        }
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("ClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getObsoleteCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("ClusterMonitor Drools session has been disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterMonitor [clusterId=" + getClusterId() +
+                ", hasPrimary=" + hasPrimary + " ]";
+    }
+
+    public boolean isHasPrimary() {
+        return hasPrimary;
+    }
+
+    public void setHasPrimary(boolean hasPrimary) {
+        this.hasPrimary = hasPrimary;
+    }
+
+    @Override
+    public void onChildStatusEvent(MonitorStatusEvent statusEvent) {
+
+    }
+
+    @Override
+    public void onParentStatusEvent(MonitorStatusEvent statusEvent) {
+        String instanceId = statusEvent.getInstanceId();
+        // send the ClusterTerminating event
+        if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+                ApplicationStatus.Terminating) {
+            if (log.isInfoEnabled()) {
+                log.info("Publishing Cluster terminating event for [application] " + appId +
+                        " [cluster] " + this.getClusterId() + " [instance] " + instanceId);
+            }
+            ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), getServiceId(), getClusterId(), instanceId);
+        }
+    }
+
+    @Override
+    public void onChildScalingEvent(ScalingEvent scalingEvent) {
+
+    }
+
+    @Override
+    public void onChildScalingOverMaxEvent(ScalingOverMaxEvent scalingOverMaxEvent) {
+
+    }
+
+    @Override
+    public void onParentScalingEvent(ScalingEvent scalingEvent) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Parent scaling event received to [cluster]: " + this.getClusterId()
+                    + ", [network partition]: " + scalingEvent.getNetworkPartitionId()
+                    + ", [event] " + scalingEvent.getId() + ", [group instance] " + scalingEvent.getInstanceId());
+        }
+
+        this.scalingFactorBasedOnDependencies = scalingEvent.getFactor();
+        ClusterContext vmClusterContext = (ClusterContext) clusterContext;
+        String instanceId = scalingEvent.getInstanceId();
+
+        ClusterInstanceContext clusterInstanceContext =
+                getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId);
+
+
+        // store primary members in the cluster instance context
+        List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+
+        for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) {
+
+            // get active primary members in this cluster instance context
+            for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+                if (isPrimaryMember(memberContext)) {
+                    primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                }
+            }
+
+            // get pending primary members in this cluster instance context
+            for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+                if (isPrimaryMember(memberContext)) {
+                    primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+                }
+            }
+        }
+
+
+        //TODO get min instance count from instance context
+        float requiredInstanceCount = clusterInstanceContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;
+        int roundedRequiredInstanceCount = getRoundedInstanceCount(requiredInstanceCount,
+                vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor());
+        clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
+
+        getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+        getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount);
+        getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm());
+        getDependentScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+
+        dependentScaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getDependentScaleCheckKnowledgeSession()
+                , dependentScaleCheckFactHandle, clusterInstanceContext);
+
+    }
+
+    public void sendClusterScalingEvent(String networkPartitionId, String instanceId, float factor) {
+
+        MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, networkPartitionId, instanceId, factor, this.id);
+    }
+
+    public void sendScalingOverMaxEvent(String networkPartitionId, String instanceId) {
+
+        MonitorStatusEventBuilder.handleScalingOverMaxEvent(this.parent, networkPartitionId, instanceId,
+                this.id);
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        String instanceId = gradientOfLoadAverageEvent.getInstanceId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+        String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        String instanceId = secondDerivativeOfLoadAverageEvent.getInstanceId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        String instanceId = averageMemoryConsumptionEvent.getInstanceId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
+                    + "[value] %s", clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Network partition context is not available for :"
+                                + " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        String instanceId = gradientOfMemoryConsumptionEvent.getInstanceId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        String instanceId = secondDerivativeOfMemoryConsumptionEvent.getInstanceId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    public void handleAverageRequestsServingCapabilityEvent(
+            AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent) {
+
+        String clusterId = averageRequestsServingCapabilityEvent.getClusterId();
+        String instanceId = averageRequestsServingCapabilityEvent.getInstanceId();
+        String networkPartitionId = averageRequestsServingCapabilityEvent.getNetworkPartitionId();
+        Float floatValue = averageRequestsServingCapabilityEvent.getValue();
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, floatValue));
+        }
+
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue);
+
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        String instanceId = averageRequestsInFlightEvent.getInstanceId();
+        Float servedCount = averageRequestsInFlightEvent.getServedCount();
+        Float activeInstances = averageRequestsInFlightEvent.getActiveInstances();
+        Float requestsServedPerInstance = servedCount / activeInstances;
+        if (requestsServedPerInstance.isInfinite()) {
+            requestsServedPerInstance = 0f;
+        }
+        float value = averageRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        String instanceId = gradientOfRequestsInFlightEvent.getInstanceId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+                    clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+        String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        String instanceId = secondDerivativeOfRequestsInFlightEvent.getInstanceId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        if (null != clusterLevelNetworkPartitionContext) {
+            clusterLevelNetworkPartitionContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+        String instanceId = memberAverageMemoryConsumptionEvent.getInstanceId();
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+        String instanceId = memberGradientOfMemoryConsumptionEvent.getInstanceId();
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        String instanceId = memberAverageLoadAverageEvent.getInstanceId();
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageLoadAverageEvent.getValue();
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+        String instanceId = memberGradientOfLoadAverageEvent.getInstanceId();
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+        String instanceId = memberSecondDerivativeOfLoadAverageEvent.getInstanceId();
+        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+                member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+        String memberId = memberFaultEvent.getMemberId();
+        String clusterId = memberFaultEvent.getClusterId();
+        Member member = getMemberByMemberId(memberId);
+        String instanceId = memberFaultEvent.getInstanceId();
+        String networkPartitionId = memberFaultEvent.getNetworkPartitionId();
+        if (null == member) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+            }
+            return;
+        }
+        if (!member.isActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member activated event has not received for the member %s. "
+                        + "Therefore ignoring" + " the member fault health stat", memberId));
+            }
+            return;
+        }
+
+        ClusterInstanceContext nwPartitionCtxt;
+        nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
+        String partitionId = getPartitionOfMember(memberId);
+        ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+        if (!partitionCtxt.activeMemberExist(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Could not find the active member in partition context, "
+                        + "[member] %s ", memberId));
+            }
+            return;
+        }
+
+        // move member to obsolete list
+        synchronized (this) {
+            partitionCtxt.moveMemberToObsoleteList(memberId);
+        }
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Faulty member is added to obsolete list and removed from the active members list: "
+                    + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+        }
+
+        ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+                ClusterStatusInactiveProcessor.class.getName(), clusterId, instanceId);
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        String instanceId = memberActivatedEvent.getInstanceId();
+        String clusterId = memberActivatedEvent.getClusterId();
+        String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+        String partitionId = memberActivatedEvent.getPartitionId();
+        String memberId = memberActivatedEvent.getMemberId();
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
+        ClusterLevelPartitionContext clusterLevelPartitionContext;
+        clusterLevelPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+        clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member stat context has been added successfully: "
+                    + "[member] %s", memberId));
+        }
+        clusterLevelPartitionContext.movePendingMemberToActiveMembers(memberId);
+        ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+                ClusterStatusActiveProcessor.class.getName(), clusterId, instanceId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
+        String partitionId = maintenanceModeEvent.getPartitionId();
+        String memberId = maintenanceModeEvent.getMemberId();
+        String instanceId = maintenanceModeEvent.getInstanceId();
+        ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+                instanceId);
+        ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt.
+                getPartitionCtxt(partitionId);
+        clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member has been moved as pending termination: "
+                    + "[member] %s", memberId));
+        }
+        clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+        ClusterInstanceContext nwPartitionCtxt;
+        String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
+        String instanceId = memberReadyToShutdownEvent.getInstanceId();
+        nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
+
+        // start a new member in the same Partition
+        String memberId = memberReadyToShutdownEvent.getMemberId();
+        String partitionId = getPartitionOfMember(memberId);
+        ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+        try {
+            String clusterId = memberReadyToShutdownEvent.getClusterId();
+            //move member to pending termination list
+            if (partitionCtxt.getPendingTerminationMember(memberId) != null) {
+                partitionCtxt.movePendingTerminationMemberToObsoleteMembers(memberId);
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is removed from the pending termination members " +
+                            "and moved to obsolete list: [member] %s " +
+                            "[partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+                }
+            } else if (partitionCtxt.getObsoleteMember(memberId) != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member is  in obsolete list: [member] %s " +
+                            "[partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+                }
+            } //TODO else part
+
+            //when no more members are there to terminate Invoking it monitor directly
+            // to speed up the termination process
+            if (partitionCtxt.getTotalMemberCount() == 0) {
+                this.monitor();
+            }
+
+
+        } catch (Exception e) {
+            String msg = "Error processing event " + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+        String memberId = memberTerminatedEvent.getMemberId();
+        String clusterId = memberTerminatedEvent.getClusterId();
+        String instanceId = memberTerminatedEvent.getInstanceId();
+        String partitionId = memberTerminatedEvent.getPartitionId();
+        ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+                networkPartitionId, instanceId);
+        ClusterLevelPartitionContext clusterMonitorPartitionContext =
+                clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId);
+        clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
+
+        if (clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination pending members list: "
+                        + "[member] %s", memberId));
+            }
+        } else if (clusterMonitorPartitionContext.removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending members list: "
+                        + "[member] %s", memberId));
+            }
+        } else if (clusterMonitorPartitionContext.removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is removed from "
+                    + "active members list: %s", memberId));
+        } else if (clusterMonitorPartitionContext.removeObsoleteMember(memberId)) {
+            log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and"
+                    + " it is removed from obsolete members list: %s", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list active, "
+                    + "pending and termination pending: %s", memberId));
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member stat context has been removed successfully: "
+                    + "[member] %s", memberId));
+        }
+        //Checking whether the cluster state can be changed either from in_active to created/terminating to terminated
+        ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+                ClusterStatusTerminatedProcessor.class.getName(), clusterId, instanceId);
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+    }
+
+    @Override
+    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+
+    }
+
+    private String getNetworkPartitionIdByMemberId(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getNetworkPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    private Member getMemberByMemberId(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for (Service service : TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    public String getPartitionOfMember(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void terminateAllMembers(final String instanceId, final String networkPartitionId) {
+        final ClusterMonitor monitor = this;
+        Thread memberTerminator = new Thread(new Runnable() {
+            public void run() {
+
+                ClusterInstanceContext instanceContext =
+                        (ClusterInstanceContext) getAllNetworkPartitionCtxts().get(networkPartitionId)
+                                                                    .getInstanceContext(instanceId);
+                boolean allMovedToObsolete = true;
+                for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Starting to terminate all members in cluster [" + getClusterId() + "] " +
+                                "Network Partition [" + instanceContext.getNetworkPartitionId() + "], Partition [" +
+                                partitionContext.getPartitionId() + "]");
+                    }
+                    // need to terminate active, pending and obsolete members
+                    //FIXME to traverse concurrent
+                    // active members
+                    List<String> activeMembers = new ArrayList<String>();
+                    Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator();
+                    while (iterator.hasNext()) {
+                        MemberContext activeMemberCtxt = iterator.next();
+                        activeMembers.add(activeMemberCtxt.getMemberId());
+
+                    }
+                    for (String memberId : activeMembers) {
+                        log.info("Sending instance cleanup event for the active member: [member-id] " + memberId);
+                        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+                        InstanceNotificationPublisher.getInstance().
+                                sendInstanceCleanupEventForMember(memberId);
+                    }
+                    Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator();
+
+                    List<String> pendingMembers = new ArrayList<String>();
+                    while (pendingIterator.hasNext()) {
+                        MemberContext activeMemberCtxt = pendingIterator.next();
+                        pendingMembers.add(activeMemberCtxt.getMemberId());
+
+                    }
+                    for (String memberId : pendingMembers) {
+                        MemberContext pendingMemberCtxt = pendingIterator.next();
+                        // pending members
+                        String memeberId = pendingMemberCtxt.getMemberId();
+                        if (log.isDebugEnabled()) {
+                            log.debug("Moving pending member [member id] " + memeberId + " to obsolete list");
+                        }
+                        partitionContext.movePendingMemberToObsoleteMembers(memeberId);
+                    }
+                    if(partitionContext.getTotalMemberCount() == 0) {
+                        allMovedToObsolete = allMovedToObsolete && true;
+                    } else {
+                        allMovedToObsolete = false;
+                    }
+                }
+
+                if(allMovedToObsolete) {
+                    monitor.monitor();
+                }
+            }
+        }, "Member Terminator - [cluster id] " + getClusterId());
+
+        memberTerminator.start();
+    }
+
+    public Map<String, ClusterLevelNetworkPartitionContext> getAllNetworkPartitionCtxts() {
+        return ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts();
+    }
+
+    public ClusterInstanceContext getClusterInstanceContext(String networkPartitionId, String instanceId) {
+        Map<String, ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContextMap =
+                ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts();
+        ClusterLevelNetworkPartitionContext networkPartitionContext =
+                clusterLevelNetworkPartitionContextMap.get(networkPartitionId);
+        ClusterInstanceContext instanceContext = (ClusterInstanceContext) networkPartitionContext.
+                                                        getInstanceContext(instanceId);
+        return instanceContext;
+    }
+
+    public Collection<ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() {
+        return ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts().values();
+    }
+
+    public void createClusterInstance(List<String> parentInstanceIds, Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        for (String parentInstanceId : parentInstanceIds) {
+            createInstance(parentInstanceId, cluster);
+        }
+
+    }
+
+    public boolean createInstanceOnDemand(String instanceId) {
+        Cluster cluster = TopologyManager.getTopology().getService(this.serviceType).
+                getCluster(this.clusterId);
+        try {
+            return createInstance(instanceId, cluster);
+            //TODO exception
+        } catch (PolicyValidationException e) {
+            log.error("Error while creating the cluster instance", e);
+        } catch (PartitionValidationException e) {
+            log.error("Error while creating the cluster instance", e);
+
+        }
+        return false;
+
+    }
+
+    private boolean createInstance(String parentInstanceId, Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        Instance parentMonitorInstance = this.parent.getInstance(parentInstanceId);
+        String partitionId = null;
+        if (parentMonitorInstance instanceof GroupInstance) {
+            partitionId = parentMonitorInstance.getPartitionId();
+        }
+        if (parentMonitorInstance != null) {
+
+            ClusterInstance clusterInstance = cluster.getInstanceContexts(parentInstanceId);
+            if (clusterInstance != null) {
+
+                // Cluster instance is already there. No need to create one.
+                ClusterContext clusterContext = (ClusterContext) this.getClusterContext();
+                if (clusterContext == null) {
+
+                    clusterContext = ClusterContextFactory.getVMClusterContext(clusterInstance.getInstanceId(), cluster,
+                            hasScalingDependents());
+                    this.setClusterContext(clusterContext);
+                }
+
+                // create VMClusterContext and then add all the instanceContexts
+                clusterContext.addInstanceContext(parentInstanceId, cluster, hasScalingDependents(),
+                        groupScalingEnabledSubtree());
+                if (this.getInstance(clusterInstance.getInstanceId()) == null) {
+                    this.addInstance(clusterInstance);
+                }
+                // Checking the current status of the cluster instance
+                boolean stateChanged =
+                        ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain()
+                                .process("", cluster.getClusterId(), clusterInstance.getInstanceId());
+                if (!stateChanged && clusterInstance.getStatus() != ClusterStatus.Created) {
+                    this.notifyParentMonitor(clusterInstance.getStatus(),
+                            clusterInstance.getInstanceId());
+
+                    if (this.hasMonitoringStarted().compareAndSet(false, true)) {
+                        this.startScheduler();
+                        log.info("Monitoring task for Cluster Monitor with cluster id " +
+                                cluster.getClusterId() + " started successfully");
+                    }
+                }
+            } else {
+                createClusterInstance(cluster.getServiceName(), cluster.getClusterId(), null, parentInstanceId, partitionId,
+                        parentMonitorInstance.getNetworkPartitionId());
+
+            }
+            return true;
+
+        } else {
+            return false;
+
+        }
+
+    }
+
+    /**
+     * Move all the members of the cluster instance to termiantion pending
+     *
+     * @param instanceId
+     */
+    public void moveMembersFromActiveToPendingTermination(String instanceId) {
+
+        //TODO take read lock for network partition context
+        //FIXME to iterate properly
+        for (ClusterLevelNetworkPartitionContext networkPartitionContext :
+                ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts().values()) {
+            ClusterInstanceContext clusterInstanceContext =
+                    (ClusterInstanceContext) networkPartitionContext.getInstanceContext(instanceId);
+            if (clusterInstanceContext != null) {
+                for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) {
+                    List<String> members = new ArrayList<String>();
+
+                    Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator();
+                    while (iterator.hasNext()) {
+                        MemberContext activeMember = iterator.next();
+                        members.add(activeMember.getMemberId());
+                    }
+
+                    for (String memberId : members) {
+                        partitionContext.moveActiveMemberToTerminationPendingMembers(
+                                memberId);
+                    }
+                    List<String> pendingMembers = new ArrayList<String>();
+
+                    Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator();
+                    while (pendingIterator.hasNext()) {
+                        MemberContext activeMember = pendingIterator.next();
+                        pendingMembers.add(activeMember.getMemberId());
+                    }
+                    for (String memberId : members) {
+                        // pending members
+                        if (log.isDebugEnabled()) {
+                            log.debug("Moving pending member [member id] " + memberId + " the obsolete list");
+                        }
+                        partitionContext.movePendingMemberToObsoleteMembers(memberId);
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
index e5fa765..f870e11 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -21,14 +21,9 @@ package org.apache.stratos.autoscaler.monitor.cluster;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
 import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
 /*
  * Factory class for creating cluster monitors.
@@ -47,19 +42,12 @@ public class ClusterMonitorFactory {
                                                     boolean groupScalingEnabledSubtree)
             throws PolicyValidationException, PartitionValidationException {
 
-        AbstractClusterMonitor clusterMonitor;
-//        if (cluster.isKubernetesCluster()) {
-//            clusterMonitor = getDockerServiceClusterMonitor(cluster);
-//////        } else if (cluster.isLbCluster()) {
-//////            clusterMonitor = getVMLbClusterMonitor(cluster);
-//        } else {
-            clusterMonitor = getVMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
-//        }
-
+        AbstractClusterMonitor clusterMonitor =
+                getVMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
         return clusterMonitor;
     }
 
-    private static VMClusterMonitor getVMClusterMonitor(Cluster cluster, boolean hasScalingDependents,
+    private static ClusterMonitor getVMClusterMonitor(Cluster cluster, boolean hasScalingDependents,
                                                         boolean groupScalingEnabledSubtree)
             throws PolicyValidationException, PartitionValidationException {
 
@@ -67,7 +55,7 @@ public class ClusterMonitorFactory {
             return null;
         }
 
-        VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
+        ClusterMonitor clusterMonitor = new ClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
 
         // find lb reference type
         java.util.Properties props = cluster.getProperties();
@@ -90,121 +78,4 @@ public class ClusterMonitorFactory {
         log.info("VMClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-//
-//    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
-//            throws PolicyValidationException, PartitionValidationException {
-//
-//        if (null == cluster) {
-//            return null;
-//        }
-//
-//        VMLbClusterMonitor clusterMonitor =
-//                new VMLbClusterMonitor(cluster.getServiceName(), cluster.getClusterId());
-//        clusterMonitor.notifyParentMonitor(ClusterStatus.Created);
-//
-//        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
-//        return clusterMonitor;
-//    }
-
-    /**
-     * @param cluster - the cluster which needs to be monitored
-     * @return - the cluster monitor
-     */
-    private static KubernetesClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
-            throws PolicyValidationException {
-
-        if (null == cluster) {
-            return null;
-        }
-
-//        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//
-//        AutoscalePolicy autoscalePolicy =
-//                PolicyManager.getInstance()
-//                        .getAutoscalePolicy(autoscalePolicyName);
-//        if (log.isDebugEnabled()) {
-//            log.debug("Autoscaling policy name: " + autoscalePolicyName);
-//        }
-//
-//        AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-//
-//        if (policy == null) {
-//            String msg = String.format("Autoscaling policy is null: [policy-name] %s", autoscalePolicyName);
-//            log.error(msg);
-//            throw new PolicyValidationException(msg);
-//        }
-//
-        java.util.Properties properties = cluster.getProperties();
-        if (properties == null) {
-            String message = String.format("Properties not found in kubernetes cluster: [cluster-id] %s",
-                    cluster.getClusterId());
-            log.error(message);
-            throw new RuntimeException(message);
-        }
-//        String minReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
-//        int minReplicas = 0;
-//        if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
-//            minReplicas = Integer.parseInt(minReplicasProperty);
-//        }
-//
-//        int maxReplicas = 0;
-//        String maxReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
-//        if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
-//            maxReplicas = Integer.parseInt(maxReplicasProperty);
-//        }
-//
-//        String kubernetesHostClusterID = properties.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-//        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
-//                cluster.getClusterId(), cluster.getServiceName(),  autoscalePolicy, minReplicas, maxReplicas);
-
-
-//        KubernetesClusterMonitor dockerClusterMonitor = new KubernetesClusterMonitor(cluster);
-
-        //populate the members after restarting
-//        for (Member member : cluster.getMembers()) {
-//            String memberId = member.getMemberId();
-//            String clusterId = member.getClusterId();
-//            MemberContext memberContext = new MemberContext();
-//            memberContext.setMemberId(memberId);
-//            memberContext.setClusterId(clusterId);
-//            memberContext.setInitTime(member.getInitTime());
-//
-//            // if there is at least one member in the topology, that means service has been created already
-//            // this is to avoid calling startContainer() method again
-//            //kubernetesClusterCtxt.setServiceClusterCreated(true);
-//
-//            if (MemberStatus.Activated.equals(member.getStatus())) {
-//                if (log.isDebugEnabled()) {
-//                    String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-//                    log.debug(msg);
-//                }
-//                ((VMClusterContext) dockerClusterMonitor.getClusterContext()).addActiveMember(memberContext);
-//            } else if (MemberStatus.Created.equals(member.getStatus())
-//                    || MemberStatus.Starting.equals(member.getStatus())) {
-//                if (log.isDebugEnabled()) {
-//                    String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-//                    log.debug(msg);
-//                }
-//                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-//            }
-//
-//            //kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
-//            if (log.isInfoEnabled()) {
-//                log.info(String.format("Member stat context has been added: [member] %s", memberId));
-//            }
-//        }
-//
-//        // find lb reference type
-//        if (properties.containsKey(StratosConstants.LOAD_BALANCER_REF)) {
-//            String value = properties.getProperty(StratosConstants.LOAD_BALANCER_REF);
-//            dockerClusterMonitor.setLbReferenceType(value);
-//            if (log.isDebugEnabled()) {
-//                log.debug("Set the lb reference type: " + value);
-//            }
-//        }
-
-//        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
-//        return dockerClusterMonitor;
-        return null;
-    }
 }