You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/31 05:04:07 UTC

[4/5] Adding autoscaler topology event listeners introduced by service grouping

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
deleted file mode 100644
index d7238bf..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
-
-/*
- * Factory class for creating cluster monitors.
- */
-public class ClusterMonitorFactory {
-
-    private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
-    /**
-     * @param cluster the cluster to be monitored
-     * @return the created cluster monitor
-     * @throws PolicyValidationException    when deployment policy is not valid
-     * @throws PartitionValidationException when partition is not valid
-     */
-    public static AbstractClusterMonitor getMonitor(Cluster cluster)
-            throws PolicyValidationException, PartitionValidationException {
-
-        AbstractClusterMonitor clusterMonitor;
-        if (cluster.isKubernetesCluster()) {
-            clusterMonitor = getDockerServiceClusterMonitor(cluster);
-        } else if (cluster.isLbCluster()) {
-            clusterMonitor = getVMLbClusterMonitor(cluster);
-        } else {
-            clusterMonitor = getVMServiceClusterMonitor(cluster);
-        }
-
-        return clusterMonitor;
-    }
-
-    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
-            throws PolicyValidationException, PartitionValidationException {
-        // FIXME fix the following code to correctly update
-        // AutoscalerContext context = AutoscalerContext.getInstance();
-        if (null == cluster) {
-            return null;
-        }
-
-        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-        String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
-        if (log.isDebugEnabled()) {
-            log.debug("Deployment policy name: " + deploymentPolicyName);
-            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-        }
-
-        AutoscalePolicy policy =
-                PolicyManager.getInstance()
-                        .getAutoscalePolicy(autoscalePolicyName);
-        DeploymentPolicy deploymentPolicy =
-                PolicyManager.getInstance()
-                        .getDeploymentPolicy(deploymentPolicyName);
-
-        if (deploymentPolicy == null) {
-            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-
-        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-        if (allPartitions == null) {
-            String msg =
-                    "Deployment Policy's Partitions are null. Policy name: " +
-                    deploymentPolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-
-        CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-
-        VMServiceClusterMonitor clusterMonitor =
-                new VMServiceClusterMonitor(cluster.getClusterId(),
-                                            cluster.getServiceName(),
-                                            deploymentPolicy, policy);
-        clusterMonitor.setStatus(ClusterStatus.Created);
-
-        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
-            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                                                                                          partitionGroup.getPartitionAlgo(),
-                                                                                          partitionGroup.getPartitions());
-
-            for (Partition partition : partitionGroup.getPartitions()) {
-                PartitionContext partitionContext = new PartitionContext(partition);
-                partitionContext.setServiceName(cluster.getServiceName());
-                partitionContext.setProperties(cluster.getProperties());
-                partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
-                for (Member member : cluster.getMembers()) {
-                    String memberId = member.getMemberId();
-                    if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
-                        MemberContext memberContext = new MemberContext();
-                        memberContext.setClusterId(member.getClusterId());
-                        memberContext.setMemberId(memberId);
-                        memberContext.setInitTime(member.getInitTime());
-                        memberContext.setPartition(partition);
-                        memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
-                        if (MemberStatus.Activated.equals(member.getStatus())) {
-                        	if (log.isDebugEnabled()) {
-                        		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-            					log.debug(msg);
-            				}
-                            partitionContext.addActiveMember(memberContext);
-//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                            partitionContext.incrementCurrentActiveMemberCount(1);
-
-                        } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
-                        	if (log.isDebugEnabled()) {
-                        		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-            					log.debug(msg);
-            				}
-                            partitionContext.addPendingMember(memberContext);
-
-//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                        } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-//                            partitionContext.addFaultyMember(memberId);
-                        }
-                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been added: [member] %s", memberId));
-                        }
-                    }
-
-                }
-                networkPartitionContext.addPartitionContext(partitionContext);
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Partition context has been added: [partition] %s",
-                                           partitionContext.getPartitionId()));
-                }
-            }
-
-            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Network partition context has been added: [network partition] %s",
-                                       networkPartitionContext.getId()));
-            }
-        }
-
-
-        // find lb reference type
-        java.util.Properties props = cluster.getProperties();
-
-        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
-            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-            clusterMonitor.setLbReferenceType(value);
-            if (log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: " + value);
-            }
-        }
-
-        // set hasPrimary property
-        // hasPrimary is true if there are primary members available in that cluster
-        clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
-
-        log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
-        return clusterMonitor;
-    }
-
-    private static Properties convertMemberPropsToMemberContextProps(
-            java.util.Properties properties) {
-        Properties props = new Properties();
-        for (Map.Entry<Object, Object> e : properties.entrySet()) {
-            Property prop = new Property();
-            prop.setName((String) e.getKey());
-            prop.setValue((String) e.getValue());
-            props.addProperties(prop);
-        }
-        return props;
-    }
-
-
-    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
-            throws PolicyValidationException, PartitionValidationException {
-        // FIXME fix the following code to correctly update
-        // AutoscalerContext context = AutoscalerContext.getInstance();
-        if (null == cluster) {
-            return null;
-        }
-
-        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-        String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
-        if (log.isDebugEnabled()) {
-            log.debug("Deployment policy name: " + deploymentPolicyName);
-            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-        }
-
-        AutoscalePolicy policy =
-                PolicyManager.getInstance()
-                        .getAutoscalePolicy(autoscalePolicyName);
-        DeploymentPolicy deploymentPolicy =
-                PolicyManager.getInstance()
-                        .getDeploymentPolicy(deploymentPolicyName);
-
-        if (deploymentPolicy == null) {
-            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-
-        String clusterId = cluster.getClusterId();
-        VMLbClusterMonitor clusterMonitor =
-                new VMLbClusterMonitor(clusterId,
-                                       cluster.getServiceName(),
-                                       deploymentPolicy, policy);
-        clusterMonitor.setStatus(ClusterStatus.Created);
-        // partition group = network partition context
-        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
-            NetworkPartitionLbHolder networkPartitionLbHolder =
-                    PartitionManager.getInstance()
-                            .getNetworkPartitionLbHolder(partitionGroup.getId());
-//                                                              PartitionManager.getInstance()
-//                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
-            // FIXME pick a random partition
-            Partition partition =
-                    partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
-            PartitionContext partitionContext = new PartitionContext(partition);
-            partitionContext.setServiceName(cluster.getServiceName());
-            partitionContext.setProperties(cluster.getProperties());
-            partitionContext.setNetworkPartitionId(partitionGroup.getId());
-            partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
-
-            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                                                                                          partitionGroup.getPartitionAlgo(),
-                                                                                          partitionGroup.getPartitions());
-            for (Member member : cluster.getMembers()) {
-                String memberId = member.getMemberId();
-                if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
-                    MemberContext memberContext = new MemberContext();
-                    memberContext.setClusterId(member.getClusterId());
-                    memberContext.setMemberId(memberId);
-                    memberContext.setPartition(partition);
-                    memberContext.setInitTime(member.getInitTime());
-
-                    if (MemberStatus.Activated.equals(member.getStatus())) {
-                    	if (log.isDebugEnabled()) {
-                    		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-        					log.debug(msg);
-        				}
-                        partitionContext.addActiveMember(memberContext);
-//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                        partitionContext.incrementCurrentActiveMemberCount(1);
-                    } else if (MemberStatus.Created.equals(member.getStatus()) ||
-                               MemberStatus.Starting.equals(member.getStatus())) {
-                    	if (log.isDebugEnabled()) {
-                    		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-        					log.debug(msg);
-        				}
-                        partitionContext.addPendingMember(memberContext);
-//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                    } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-//                        partitionContext.addFaultyMember(memberId);
-                    }
-
-                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Member stat context has been added: [member] %s", memberId));
-                    }
-                }
-
-            }
-            networkPartitionContext.addPartitionContext(partitionContext);
-
-            // populate lb cluster id in network partition context.
-            java.util.Properties props = cluster.getProperties();
-
-            // get service type of load balanced cluster
-            String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-
-            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
-                String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-
-                if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
-                    networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
-
-                } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
-                    String serviceName = cluster.getServiceName();
-                    // TODO: check if this is correct
-                    networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
-
-                    if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
-                        networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
-                        if (log.isDebugEnabled()) {
-                            log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
-                        }
-                    }
-                }
-            }
-
-            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-        }
-
-        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
-        return clusterMonitor;
-    }
-
-    /**
-     * @param cluster - the cluster which needs to be monitored
-     * @return - the cluster monitor
-     */
-    private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
-            throws PolicyValidationException {
-
-        if (null == cluster) {
-            return null;
-        }
-
-        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-        if (log.isDebugEnabled()) {
-            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-        }
-
-        AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-        
-        if (policy == null) {
-            String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
-            log.error(msg);
-            throw new PolicyValidationException(msg);
-        }
-        
-        java.util.Properties props = cluster.getProperties();
-        String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
-                                                                                      cluster.getClusterId());
-
-        String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
-        if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
-            int minReplicas = Integer.parseInt(minReplicasProperty);
-            kubernetesClusterCtxt.setMinReplicas(minReplicas);
-        }
-
-        String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
-        if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
-            int maxReplicas = Integer.parseInt(maxReplicasProperty);
-            kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
-        }
-
-        KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
-                kubernetesClusterCtxt,
-                cluster.getClusterId(),
-                cluster.getServiceName(),
-                policy.getId());
-
-        dockerClusterMonitor.setStatus(ClusterStatus.Created);
-
-        //populate the members after restarting        
-        for (Member member : cluster.getMembers()) {
-            String memberId = member.getMemberId();
-            String clusterId = member.getClusterId();
-            MemberContext memberContext = new MemberContext();
-            memberContext.setMemberId(memberId);
-            memberContext.setClusterId(clusterId);
-            memberContext.setInitTime(member.getInitTime());
-            
-            // if there is at least one member in the topology, that means service has been created already
-            // this is to avoid calling startContainer() method again
-            kubernetesClusterCtxt.setServiceClusterCreated(true);
-            
-            if (MemberStatus.Activated.equals(member.getStatus())) {
-            	if (log.isDebugEnabled()) {
-            		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-					log.debug(msg);
-				}
-                dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
-            } else if (MemberStatus.Created.equals(member.getStatus())
-                       || MemberStatus.Starting.equals(member.getStatus())) {
-            	if (log.isDebugEnabled()) {
-            		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-					log.debug(msg);
-				}
-                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-            }
-            
-            kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Member stat context has been added: [member] %s", memberId));
-            }
-        }
-
-        // find lb reference type
-        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
-            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-            dockerClusterMonitor.setLbReferenceType(value);
-            if (log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: " + value);
-            }
-        }
-
-        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
-        return dockerClusterMonitor;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
deleted file mode 100644
index 0254030..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * Every kubernetes cluster monitor should extend this class
- */
-public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
-
-    private KubernetesClusterContext kubernetesClusterCtxt;
-    protected String autoscalePolicyId;
-
-    protected KubernetesClusterMonitor(String clusterId, String serviceId,
-                                       KubernetesClusterContext kubernetesClusterContext,
-                                       AutoscalerRuleEvaluator autoscalerRuleEvaluator,
-                                       String autoscalePolicyId) {
-
-        super(clusterId, serviceId, autoscalerRuleEvaluator);
-        this.kubernetesClusterCtxt = kubernetesClusterContext;
-        this.autoscalePolicyId = autoscalePolicyId;
-    }
-
-    @Override
-    public void handleAverageLoadAverageEvent(
-            AverageLoadAverageEvent averageLoadAverageEvent) {
-
-        String clusterId = averageLoadAverageEvent.getClusterId();
-        float value = averageLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setAverageLoadAverage(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-
-    }
-
-    @Override
-    public void handleGradientOfLoadAverageEvent(
-            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
-        String clusterId = gradientOfLoadAverageEvent.getClusterId();
-        float value = gradientOfLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setLoadAverageGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfLoadAverageEvent(
-            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
-        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
-        float value = secondDerivativeOfLoadAverageEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setLoadAverageSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleAverageMemoryConsumptionEvent(
-            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
-        String clusterId = averageMemoryConsumptionEvent.getClusterId();
-        float value = averageMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
-                                    + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setAverageMemoryConsumption(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleGradientOfMemoryConsumptionEvent(
-            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
-        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
-        float value = gradientOfMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setMemoryConsumptionGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfMemoryConsumptionEvent(
-            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
-        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
-        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleAverageRequestsInFlightEvent(
-            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
-        float value = averageRequestsInFlightEvent.getValue();
-        String clusterId = averageRequestsInFlightEvent.getClusterId();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setAverageRequestsInFlight(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleGradientOfRequestsInFlightEvent(
-            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
-        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
-        float value = gradientOfRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
-                                    clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setRequestsInFlightGradient(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleSecondDerivativeOfRequestsInFlightEvent(
-            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
-        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
-        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
-                                    + "[value] %s", clusterId, value));
-        }
-        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-        if (null != kubernetesClusterContext) {
-            kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                        " [cluster] %s", clusterId));
-            }
-        }
-    }
-
-    @Override
-    public void handleMemberAverageMemoryConsumptionEvent(
-            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
-        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberAverageMemoryConsumptionEvent.getValue();
-        memberStatsContext.setAverageMemoryConsumption(value);
-    }
-
-    @Override
-    public void handleMemberGradientOfMemoryConsumptionEvent(
-            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
-        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberGradientOfMemoryConsumptionEvent.getValue();
-        memberStatsContext.setGradientOfMemoryConsumption(value);
-    }
-
-    @Override
-    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
-            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
-    }
-
-    @Override
-    public void handleMemberAverageLoadAverageEvent(
-            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        String memberId = memberAverageLoadAverageEvent.getMemberId();
-        float value = memberAverageLoadAverageEvent.getValue();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        memberStatsContext.setAverageLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberGradientOfLoadAverageEvent(
-            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
-        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberGradientOfLoadAverageEvent.getValue();
-        memberStatsContext.setGradientOfLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberSecondDerivativeOfLoadAverageEvent(
-            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
-        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
-        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-        if (null == memberStatsContext) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return;
-        }
-        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
-        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
-    }
-
-    @Override
-    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-    	// kill the container
-        String memberId = memberFaultEvent.getMemberId();
-        Member member = getMemberByMemberId(memberId);
-        if (null == member) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
-            }
-            return;
-        }
-        if (!member.isActive()) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member activated event has not received for the member %s. "
-                                        + "Therefore ignoring" + " the member fault health stat", memberId));
-            }
-            return;
-        }
-        
-        if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Could not find the active member in kubernetes cluster context, "
-                                        + "[member] %s ", memberId));
-            }
-            return;
-        }
-        // terminate the faulty member
-        CloudControllerClient ccClient = CloudControllerClient.getInstance();
-        try {
-            ccClient.terminateContainer(memberId);
-            // remove from active member list
-            getKubernetesClusterCtxt().removeActiveMemberById(memberId);
-            if (log.isInfoEnabled()) {
-                String clusterId = memberFaultEvent.getClusterId();
-                String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-				log.info(String.format("Faulty member is terminated and removed from the active members list: "
-                                       + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
-            }
-        } catch (TerminationException e) {
-            String msg = "Cannot delete a container " + e.getLocalizedMessage();
-            log.error(msg, e);
-        }
-    }
-
-    @Override
-    public void handleMemberStartedEvent(
-            MemberStartedEvent memberStartedEvent) {
-
-    }
-
-    @Override
-    public void handleMemberActivatedEvent(
-            MemberActivatedEvent memberActivatedEvent) {
-
-        KubernetesClusterContext kubernetesClusterContext;
-        kubernetesClusterContext = getKubernetesClusterCtxt();
-        String memberId = memberActivatedEvent.getMemberId();
-        kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Member stat context has been added successfully: "
-                                   + "[member] %s", memberId));
-        }
-        kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
-    }
-
-    @Override
-    public void handleMemberMaintenanceModeEvent(
-            MemberMaintenanceModeEvent maintenanceModeEvent) {
-
-        // no need to do anything here
-        // we will not be receiving this event for containers
-        // we will only receive member terminated event
-    }
-
-    @Override
-    public void handleMemberReadyToShutdownEvent(
-            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
-        // no need to do anything here
-        // we will not be receiving this event for containers
-    	// we will only receive member terminated event
-    }
-
-    @Override
-    public void handleMemberTerminatedEvent(
-            MemberTerminatedEvent memberTerminatedEvent) {
-
-        String memberId = memberTerminatedEvent.getMemberId();
-        if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member is removed from termination pending members list: "
-                                        + "[member] %s", memberId));
-            }
-        } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Member is removed from pending members list: "
-                                        + "[member] %s", memberId));
-            }
-        } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
-            log.warn(String.format("Member is in the wrong list and it is removed from "
-                                   + "active members list", memberId));
-        } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
-            log.warn(String.format("Member's obsolated timeout has been expired and "
-                                   + "it is removed from obsolated members list", memberId));
-        } else {
-            log.warn(String.format("Member is not available in any of the list active, "
-                                   + "pending and termination pending", memberId));
-        }
-
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Member stat context has been removed successfully: "
-                                   + "[member] %s", memberId));
-        }
-    }
-
-    @Override
-    public void handleClusterRemovedEvent(
-            ClusterRemovedEvent clusterRemovedEvent) {
-    	getKubernetesClusterCtxt().getPendingMembers().clear();
-    	getKubernetesClusterCtxt().getActiveMembers().clear();
-    	getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
-    	getKubernetesClusterCtxt().getObsoletedMembers().clear();
-    }
-
-    public KubernetesClusterContext getKubernetesClusterCtxt() {
-        return kubernetesClusterCtxt;
-    }
-
-    public void setKubernetesClusterCtxt(
-            KubernetesClusterContext kubernetesClusterCtxt) {
-        this.kubernetesClusterCtxt = kubernetesClusterCtxt;
-    }
-
-    public AutoscalePolicy getAutoscalePolicy() {
-        return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
-    }
-
-    private Member getMemberByMemberId(String memberId) {
-        try {
-            TopologyManager.acquireReadLock();
-            for (Service service : TopologyManager.getTopology().getServices()) {
-                for (Cluster cluster : service.getClusters()) {
-                    if (cluster.memberExists(memberId)) {
-                        return cluster.getMember(memberId);
-                    }
-                }
-            }
-            return null;
-        } finally {
-            TopologyManager.releaseReadLock();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
deleted file mode 100644
index 15b14b6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections.ListUtils;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-import edu.emory.mathcs.backport.java.util.Arrays;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
-
-    private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
-
-    private String lbReferenceType;
-
-    public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
-                                           String serviceClusterID, String serviceId,
-                                           String autoscalePolicyId) {
-        super(serviceClusterID, serviceId, kubernetesClusterCtxt,
-              new AutoscalerRuleEvaluator(
-                      StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
-                      StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
-              autoscalePolicyId);
-        readConfigurations();
-    }
-
-    @Override
-    public void run() {
-
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
-        }
-        try {
-            if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
-                monitor();
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-                            + getStatus() + "state");
-                }
-            }
-        } catch (Exception e) {
-            log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
-                      e);
-        }
-    }
-
-    @Override
-    protected void monitor() {
-        minCheck();
-        scaleCheck();
-    }
-
-    private void scaleCheck() {
-        boolean rifReset = getKubernetesClusterCtxt().isRifReset();
-        boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
-        boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
-        if (log.isDebugEnabled()) {
-            log.debug("flag of rifReset : " + rifReset
-                      + " flag of memoryConsumptionReset : "
-                      + memoryConsumptionReset + " flag of loadAverageReset : "
-                      + loadAverageReset);
-        }
-        String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-        String clusterId = getClusterId();
-        if (rifReset || memoryConsumptionReset || loadAverageReset) {
-            getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
-            getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
-            getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-            getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-            getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-            if (log.isDebugEnabled()) {
-                log.debug(String.format(
-                        "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-            }
-            scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
-                    getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
-            getKubernetesClusterCtxt().setRifReset(false);
-            getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
-            getKubernetesClusterCtxt().setLoadAverageReset(false);
-        } else if (log.isDebugEnabled()) {
-            log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
-                                    + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
-        }
-    }
-
-	private void minCheck() {
-		getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-		String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format(
-                    "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-        }
-		minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
-				getMinCheckKnowledgeSession(), minCheckFactHandle,
-				getKubernetesClusterCtxt());
-	}
-
-	@Override
-    public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getScaleCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        stopScheduler();
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
-        }
-    }
-
-    @Override
-    protected void readConfigurations() {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
-        setMonitorIntervalMilliseconds(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KubernetesServiceClusterMonitor "
-               + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
-               + ", clusterId=" + getClusterId()
-               + ", serviceId=" + getServiceId() + "]";
-    }
-
-    public String getLbReferenceType() {
-        return lbReferenceType;
-    }
-
-    public void setLbReferenceType(String lbReferenceType) {
-        this.lbReferenceType = lbReferenceType;
-    }
-
-    @Override
-    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-        
-        if (properties != null) {
-            Property[] propertyArray = properties.getProperties();
-            if (propertyArray == null) {
-                return;
-            }
-            List<Property> propertyList = Arrays.asList(propertyArray);
-            
-            for (Property property : propertyList) {
-                String key = property.getName();
-                String value = property.getValue();
-                
-                if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
-                    int min = Integer.parseInt(value);
-                    int max = getKubernetesClusterCtxt().getMaxReplicas();
-                    if (min > max) {
-                        String msg = String.format("%s should be less than %s . But %s is not less than %s.", 
-                                StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
-                        log.error(msg);
-                        throw new InvalidArgumentException(msg);
-                    }
-                    getKubernetesClusterCtxt().setMinReplicas(min);
-                    break;
-                }
-            }
-            
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
index fa8f425..8aeae94 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
@@ -28,10 +28,9 @@ import org.apache.stratos.autoscaler.grouping.dependency.DependencyBuilder;
 import org.apache.stratos.autoscaler.grouping.dependency.DependencyTree;
 import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
 import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.status.checker.StatusChecker;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
 import org.apache.stratos.messaging.domain.topology.ParentComponent;
 
 import java.util.HashMap;