You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/31 05:04:05 UTC

[2/5] Adding autoscaler topology event listeners introduced by service grouping

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
new file mode 100644
index 0000000..e286187
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.partition.PartitionGroup;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.util.Constants;
+
+/*
+ * Factory class for creating cluster monitors.
+ */
+public class ClusterMonitorFactory {
+
+    private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+    /**
+     * @param cluster the cluster to be monitored
+     * @return the created cluster monitor
+     * @throws PolicyValidationException    when deployment policy is not valid
+     * @throws PartitionValidationException when partition is not valid
+     */
+    public static AbstractClusterMonitor getMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+
+        AbstractClusterMonitor clusterMonitor;
+        if (cluster.isKubernetesCluster()) {
+            clusterMonitor = getDockerServiceClusterMonitor(cluster);
+        } else if (cluster.isLbCluster()) {
+            clusterMonitor = getVMLbClusterMonitor(cluster);
+        } else {
+            clusterMonitor = getVMServiceClusterMonitor(cluster);
+        }
+
+        return clusterMonitor;
+    }
+
+    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Deployment policy name: " + deploymentPolicyName);
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy =
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
+        DeploymentPolicy deploymentPolicy =
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
+
+        if (deploymentPolicy == null) {
+            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
+        if (allPartitions == null) {
+            String msg =
+                    "Deployment Policy's Partitions are null. Policy name: " +
+                    deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
+
+        VMServiceClusterMonitor clusterMonitor =
+                new VMServiceClusterMonitor(cluster.getClusterId(),
+                                            cluster.getServiceName(),
+                                            deploymentPolicy, policy);
+        clusterMonitor.setStatus(ClusterStatus.Created);
+
+        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
+
+            for (Partition partition : partitionGroup.getPartitions()) {
+                PartitionContext partitionContext = new PartitionContext(partition);
+                partitionContext.setServiceName(cluster.getServiceName());
+                partitionContext.setProperties(cluster.getProperties());
+                partitionContext.setNetworkPartitionId(partitionGroup.getId());
+
+                for (Member member : cluster.getMembers()) {
+                    String memberId = member.getMemberId();
+                    if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
+                        MemberContext memberContext = new MemberContext();
+                        memberContext.setClusterId(member.getClusterId());
+                        memberContext.setMemberId(memberId);
+                        memberContext.setInitTime(member.getInitTime());
+                        memberContext.setPartition(partition);
+                        memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
+
+                        if (MemberStatus.Activated.equals(member.getStatus())) {
+                        	if (log.isDebugEnabled()) {
+                        		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+            					log.debug(msg);
+            				}
+                            partitionContext.addActiveMember(memberContext);
+//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+//                            partitionContext.incrementCurrentActiveMemberCount(1);
+
+                        } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
+                        	if (log.isDebugEnabled()) {
+                        		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+            					log.debug(msg);
+            				}
+                            partitionContext.addPendingMember(memberContext);
+
+//                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+                        } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+//                            partitionContext.addFaultyMember(memberId);
+                        }
+                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member stat context has been added: [member] %s", memberId));
+                        }
+                    }
+
+                }
+                networkPartitionContext.addPartitionContext(partitionContext);
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Partition context has been added: [partition] %s",
+                                           partitionContext.getPartitionId()));
+                }
+            }
+
+            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Network partition context has been added: [network partition] %s",
+                                       networkPartitionContext.getId()));
+            }
+        }
+
+
+        // find lb reference type
+        java.util.Properties props = cluster.getProperties();
+
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+            clusterMonitor.setLbReferenceType(value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
+            }
+        }
+
+        // set hasPrimary property
+        // hasPrimary is true if there are primary members available in that cluster
+        clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
+
+        log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
+        return clusterMonitor;
+    }
+
+    private static Properties convertMemberPropsToMemberContextProps(
+            java.util.Properties properties) {
+        Properties props = new Properties();
+        for (Map.Entry<Object, Object> e : properties.entrySet()) {
+            Property prop = new Property();
+            prop.setName((String) e.getKey());
+            prop.setValue((String) e.getValue());
+            props.addProperties(prop);
+        }
+        return props;
+    }
+
+
+    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Deployment policy name: " + deploymentPolicyName);
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy =
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
+        DeploymentPolicy deploymentPolicy =
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
+
+        if (deploymentPolicy == null) {
+            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+
+        String clusterId = cluster.getClusterId();
+        VMLbClusterMonitor clusterMonitor =
+                new VMLbClusterMonitor(clusterId,
+                                       cluster.getServiceName(),
+                                       deploymentPolicy, policy);
+        clusterMonitor.setStatus(ClusterStatus.Created);
+        // partition group = network partition context
+        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+            NetworkPartitionLbHolder networkPartitionLbHolder =
+                    PartitionManager.getInstance()
+                            .getNetworkPartitionLbHolder(partitionGroup.getId());
+//                                                              PartitionManager.getInstance()
+//                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
+            // FIXME pick a random partition
+            Partition partition =
+                    partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+            PartitionContext partitionContext = new PartitionContext(partition);
+            partitionContext.setServiceName(cluster.getServiceName());
+            partitionContext.setProperties(cluster.getProperties());
+            partitionContext.setNetworkPartitionId(partitionGroup.getId());
+            partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
+
+            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
+            for (Member member : cluster.getMembers()) {
+                String memberId = member.getMemberId();
+                if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
+                    MemberContext memberContext = new MemberContext();
+                    memberContext.setClusterId(member.getClusterId());
+                    memberContext.setMemberId(memberId);
+                    memberContext.setPartition(partition);
+                    memberContext.setInitTime(member.getInitTime());
+
+                    if (MemberStatus.Activated.equals(member.getStatus())) {
+                    	if (log.isDebugEnabled()) {
+                    		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+        					log.debug(msg);
+        				}
+                        partitionContext.addActiveMember(memberContext);
+//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+//                        partitionContext.incrementCurrentActiveMemberCount(1);
+                    } else if (MemberStatus.Created.equals(member.getStatus()) ||
+                               MemberStatus.Starting.equals(member.getStatus())) {
+                    	if (log.isDebugEnabled()) {
+                    		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+        					log.debug(msg);
+        				}
+                        partitionContext.addPendingMember(memberContext);
+//                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+                    } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+//                        partitionContext.addFaultyMember(memberId);
+                    }
+
+                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Member stat context has been added: [member] %s", memberId));
+                    }
+                }
+
+            }
+            networkPartitionContext.addPartitionContext(partitionContext);
+
+            // populate lb cluster id in network partition context.
+            java.util.Properties props = cluster.getProperties();
+
+            // get service type of load balanced cluster
+            String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
+
+            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+                String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+
+                if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
+                    networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
+
+                } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
+                    String serviceName = cluster.getServiceName();
+                    // TODO: check if this is correct
+                    networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
+
+                    if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
+                        networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
+                        }
+                    }
+                }
+            }
+
+            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+        }
+
+        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
+        return clusterMonitor;
+    }
+
+    /**
+     * @param cluster - the cluster which needs to be monitored
+     * @return - the cluster monitor
+     */
+    private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
+            throws PolicyValidationException {
+
+        if (null == cluster) {
+            return null;
+        }
+
+        String autoscalePolicyName = cluster.getAutoscalePolicyName();
+        if (log.isDebugEnabled()) {
+            log.debug("Autoscaler policy name: " + autoscalePolicyName);
+        }
+
+        AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
+        
+        if (policy == null) {
+            String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
+            log.error(msg);
+            throw new PolicyValidationException(msg);
+        }
+        
+        java.util.Properties props = cluster.getProperties();
+        String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
+        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+                                                                                      cluster.getClusterId());
+
+        String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
+        if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
+            int minReplicas = Integer.parseInt(minReplicasProperty);
+            kubernetesClusterCtxt.setMinReplicas(minReplicas);
+        }
+
+        String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
+        if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
+            int maxReplicas = Integer.parseInt(maxReplicasProperty);
+            kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
+        }
+
+        KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+                kubernetesClusterCtxt,
+                cluster.getClusterId(),
+                cluster.getServiceName(),
+                policy.getId());
+
+        dockerClusterMonitor.setStatus(ClusterStatus.Created);
+
+        //populate the members after restarting        
+        for (Member member : cluster.getMembers()) {
+            String memberId = member.getMemberId();
+            String clusterId = member.getClusterId();
+            MemberContext memberContext = new MemberContext();
+            memberContext.setMemberId(memberId);
+            memberContext.setClusterId(clusterId);
+            memberContext.setInitTime(member.getInitTime());
+            
+            // if there is at least one member in the topology, that means service has been created already
+            // this is to avoid calling startContainer() method again
+            kubernetesClusterCtxt.setServiceClusterCreated(true);
+            
+            if (MemberStatus.Activated.equals(member.getStatus())) {
+            	if (log.isDebugEnabled()) {
+            		String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+					log.debug(msg);
+				}
+                dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+            } else if (MemberStatus.Created.equals(member.getStatus())
+                       || MemberStatus.Starting.equals(member.getStatus())) {
+            	if (log.isDebugEnabled()) {
+            		String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+					log.debug(msg);
+				}
+                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+            }
+            
+            kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Member stat context has been added: [member] %s", memberId));
+            }
+        }
+
+        // find lb reference type
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+            dockerClusterMonitor.setLbReferenceType(value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
+            }
+        }
+
+        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
+        return dockerClusterMonitor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
new file mode 100644
index 0000000..39fbd46
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/*
+ * Every kubernetes cluster monitor should extend this class
+ */
+public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
+
+    private KubernetesClusterContext kubernetesClusterCtxt;
+    protected String autoscalePolicyId;
+
+    protected KubernetesClusterMonitor(String clusterId, String serviceId,
+                                       KubernetesClusterContext kubernetesClusterContext,
+                                       AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+                                       String autoscalePolicyId) {
+
+        super(clusterId, serviceId, autoscalerRuleEvaluator);
+        this.kubernetesClusterCtxt = kubernetesClusterContext;
+        this.autoscalePolicyId = autoscalePolicyId;
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        float value = averageLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        float value = averageRequestsInFlightEvent.getValue();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        float value = memberAverageLoadAverageEvent.getValue();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+    	// kill the container
+        String memberId = memberFaultEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        if (null == member) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+            }
+            return;
+        }
+        if (!member.isActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member activated event has not received for the member %s. "
+                                        + "Therefore ignoring" + " the member fault health stat", memberId));
+            }
+            return;
+        }
+        
+        if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Could not find the active member in kubernetes cluster context, "
+                                        + "[member] %s ", memberId));
+            }
+            return;
+        }
+        // terminate the faulty member
+        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+        try {
+            ccClient.terminateContainer(memberId);
+            // remove from active member list
+            getKubernetesClusterCtxt().removeActiveMemberById(memberId);
+            if (log.isInfoEnabled()) {
+                String clusterId = memberFaultEvent.getClusterId();
+                String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+				log.info(String.format("Faulty member is terminated and removed from the active members list: "
+                                       + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
+            }
+        } catch (TerminationException e) {
+            String msg = "Cannot delete a container " + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        KubernetesClusterContext kubernetesClusterContext;
+        kubernetesClusterContext = getKubernetesClusterCtxt();
+        String memberId = memberActivatedEvent.getMemberId();
+        kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been added successfully: "
+                                   + "[member] %s", memberId));
+        }
+        kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+        // we will only receive member terminated event
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+    	// we will only receive member terminated event
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        String memberId = memberTerminatedEvent.getMemberId();
+        if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is removed from "
+                                   + "active members list", memberId));
+        } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+            log.warn(String.format("Member's obsolated timeout has been expired and "
+                                   + "it is removed from obsolated members list", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list active, "
+                                   + "pending and termination pending", memberId));
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been removed successfully: "
+                                   + "[member] %s", memberId));
+        }
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+    	getKubernetesClusterCtxt().getPendingMembers().clear();
+    	getKubernetesClusterCtxt().getActiveMembers().clear();
+    	getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
+    	getKubernetesClusterCtxt().getObsoletedMembers().clear();
+    }
+
+    public KubernetesClusterContext getKubernetesClusterCtxt() {
+        return kubernetesClusterCtxt;
+    }
+
+    public void setKubernetesClusterCtxt(
+            KubernetesClusterContext kubernetesClusterCtxt) {
+        this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+    }
+
+    public AutoscalePolicy getAutoscalePolicy() {
+        return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
+    }
+
+    private Member getMemberByMemberId(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for (Service service : TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    @Override
+    public void terminateAllMembers() {
+        try {
+            CloudControllerClient.getInstance().terminateAllContainers(getKubernetesClusterCtxt().getClusterId());
+        } catch (TerminationException e) {
+            log.error(String.format("Could not terminate containers: [cluster-id] %s",
+                    getKubernetesClusterCtxt().getClusterId()), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
new file mode 100644
index 0000000..2615651
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.List;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/*
+ * It is monitoring a kubernetes service cluster periodically.
+ */
+public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
+
+    private String lbReferenceType;
+
+    public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
+                                           String serviceClusterID, String serviceId,
+                                           String autoscalePolicyId) {
+        super(serviceClusterID, serviceId, kubernetesClusterCtxt,
+              new AutoscalerRuleEvaluator(
+                      StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
+                      StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
+              autoscalePolicyId);
+        readConfigurations();
+    }
+
+    @Override
+    public void run() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
+        }
+        try {
+            if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
+                monitor();
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+                            + getStatus() + "state");
+                }
+            }
+        } catch (Exception e) {
+            log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
+                      e);
+        }
+    }
+
+    @Override
+    protected void monitor() {
+        minCheck();
+        scaleCheck();
+    }
+
+    private void scaleCheck() {
+        boolean rifReset = getKubernetesClusterCtxt().isRifReset();
+        boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
+        boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
+        if (log.isDebugEnabled()) {
+            log.debug("flag of rifReset : " + rifReset
+                      + " flag of memoryConsumptionReset : "
+                      + memoryConsumptionReset + " flag of loadAverageReset : "
+                      + loadAverageReset);
+        }
+        String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+        String clusterId = getClusterId();
+        if (rifReset || memoryConsumptionReset || loadAverageReset) {
+            getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
+            getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
+            getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+            getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+            getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format(
+                        "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+            }
+            scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
+                    getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
+            getKubernetesClusterCtxt().setRifReset(false);
+            getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
+            getKubernetesClusterCtxt().setLoadAverageReset(false);
+        } else if (log.isDebugEnabled()) {
+            log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
+                                    + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
+        }
+    }
+
+	private void minCheck() {
+		getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+		String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format(
+                    "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+        }
+		minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
+				getMinCheckKnowledgeSession(), minCheckFactHandle,
+				getKubernetesClusterCtxt());
+	}
+
+	@Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "KubernetesServiceClusterMonitor "
+               + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
+               + ", clusterId=" + getClusterId()
+               + ", serviceId=" + getServiceId() + "]";
+    }
+
+    public String getLbReferenceType() {
+        return lbReferenceType;
+    }
+
+    public void setLbReferenceType(String lbReferenceType) {
+        this.lbReferenceType = lbReferenceType;
+    }
+
+    @Override
+    public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+        
+        if (properties != null) {
+            Property[] propertyArray = properties.getProperties();
+            if (propertyArray == null) {
+                return;
+            }
+            List<Property> propertyList = Arrays.asList(propertyArray);
+            
+            for (Property property : propertyList) {
+                String key = property.getName();
+                String value = property.getValue();
+                
+                if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
+                    int min = Integer.parseInt(value);
+                    int max = getKubernetesClusterCtxt().getMaxReplicas();
+                    if (min > max) {
+                        String msg = String.format("%s should be less than %s . But %s is not less than %s.", 
+                                StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
+                        log.error(msg);
+                        throw new InvalidArgumentException(msg);
+                    }
+                    getKubernetesClusterCtxt().setMinReplicas(min);
+                    break;
+                }
+            }
+            
+        }
+    }
+}
\ No newline at end of file