You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by sa...@apache.org on 2014/10/06 19:43:07 UTC

[1/7] git commit: executor service instead of thread sleep

Repository: stratos
Updated Branches:
  refs/heads/container-autoscaling 7d616494a -> fb68de94a


executor service instead of thread sleep

Signed-off-by: sajhak <sa...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/fb68de94
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/fb68de94
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/fb68de94

Branch: refs/heads/container-autoscaling
Commit: fb68de94a1df0a7c679d5881f680dba5671340eb
Parents: 244030b
Author: R-Rajkumar <rr...@gmail.com>
Authored: Mon Oct 6 11:25:54 2014 +0530
Committer: sajhak <sa...@gmail.com>
Committed: Mon Oct 6 23:11:29 2014 +0530

----------------------------------------------------------------------
 .../monitor/AbstractClusterMonitor.java         | 10 +++----
 .../KubernetesServiceClusterMonitor.java        | 28 ++++++++++----------
 .../autoscaler/monitor/VMLbClusterMonitor.java  | 26 +++++++++---------
 .../monitor/VMServiceClusterMonitor.java        | 26 +++++++++---------
 4 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index e44bd72..3238d46 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -66,7 +66,7 @@ public abstract class AbstractClusterMonitor implements Runnable {
     private boolean isDestroyed;
 
     private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-    
+
     private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
     protected AbstractClusterMonitor(String clusterId, String serviceId,
@@ -81,13 +81,13 @@ public abstract class AbstractClusterMonitor implements Runnable {
     }
 
     protected abstract void readConfigurations();
-    
+
     public void startScheduler() {
-    	scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+        scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
     }
-    
+
     protected void stopScheduler() {
-    	scheduler.shutdownNow();
+        scheduler.shutdownNow();
     }
 
     protected abstract void monitor();

http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
index 93580d9..6e14ce0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -59,22 +59,22 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
     @Override
     public void run() {
 
-            if (log.isDebugEnabled()) {
-                log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
-            }
-            try {
-                if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-                                  + ClusterStatus.In_Maintenance + " mode......");
-                    }
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
+        }
+        try {
+            if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+                monitor();
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+                              + ClusterStatus.In_Maintenance + " mode......");
                 }
-            } catch (Exception e) {
-                log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
-                          e);
             }
+        } catch (Exception e) {
+            log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
+                      e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index f950f9d..1c27380 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -57,21 +57,21 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
     @Override
     public void run() {
 
-            if (log.isDebugEnabled()) {
-                log.debug("VMLbClusterMonitor is running.. " + this.toString());
-            }
-            try {
-                if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
-                                  ClusterStatus.In_Maintenance + " mode......");
-                    }
+        if (log.isDebugEnabled()) {
+            log.debug("VMLbClusterMonitor is running.. " + this.toString());
+        }
+        try {
+            if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+                monitor();
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
+                              ClusterStatus.In_Maintenance + " mode......");
                 }
-            } catch (Exception e) {
-                log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
             }
+        } catch (Exception e) {
+            log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index d8c9e69..6f9fb26 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -67,21 +67,21 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
         } catch (InterruptedException ignore) {
         }
 
-            if (log.isDebugEnabled()) {
-                log.debug("VMServiceClusterMonitor is running.. " + this.toString());
-            }
-            try {
-                if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-                    monitor();
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
-                                  ClusterStatus.In_Maintenance + " mode......");
-                    }
+        if (log.isDebugEnabled()) {
+            log.debug("VMServiceClusterMonitor is running.. " + this.toString());
+        }
+        try {
+            if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+                monitor();
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
+                              ClusterStatus.In_Maintenance + " mode......");
                 }
-            } catch (Exception e) {
-                log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
             }
+        } catch (Exception e) {
+            log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
+        }
     }
 
     @Override


[6/7] code review changes to cluster monitors

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index e1d7cc5..88d8dee 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -21,24 +21,41 @@ package org.apache.stratos.autoscaler.message.receiver.health;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.DockerServiceClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.policy.model.LoadAverage;
-import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.*;
-import org.apache.stratos.messaging.listener.health.stat.*;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.listener.health.stat.AverageLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.AverageMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.AverageRequestsInFlightEventListener;
+import org.apache.stratos.messaging.listener.health.stat.GradientOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.GradientOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.GradientOfRequestsInFlightEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberAverageLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberAverageMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberFaultEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfRequestsInFlightEventListener;
 import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
@@ -54,7 +71,7 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
     private HealthStatEventReceiver healthStatEventReceiver;
 
     public AutoscalerHealthStatEventReceiver() {
-		this.healthStatEventReceiver = new HealthStatEventReceiver();
+        this.healthStatEventReceiver = new HealthStatEventReceiver();
         addEventListeners();
     }
 
@@ -67,18 +84,18 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
         }
         Thread thread = new Thread(healthStatEventReceiver);
         thread.start();
-        if(log.isInfoEnabled()) {
+        if (log.isInfoEnabled()) {
             log.info("Autoscaler health stat event receiver thread started");
         }
 
         // Keep the thread live until terminated
-        while (!terminated){
-        	try {
+        while (!terminated) {
+            try {
                 Thread.sleep(1000);
             } catch (InterruptedException ignore) {
             }
         }
-        if(log.isInfoEnabled()) {
+        if (log.isInfoEnabled()) {
             log.info("Autoscaler health stat event receiver thread terminated");
         }
     }
@@ -88,876 +105,396 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
         healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
+                AverageLoadAverageEvent averageLoadAverageEvent = (AverageLoadAverageEvent) event;
+                String clusterId = averageLoadAverageEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setAverageLoadAverage(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setAverageLoadAverage(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleAverageLoadAverageEvent(averageLoadAverageEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
-                    		+ "[value] %s", clusterId, networkPartitionId, floatValue));
-                }
+                AverageMemoryConsumptionEvent averageMemoryConsumptionEvent = (AverageMemoryConsumptionEvent) event;
+                String clusterId = averageMemoryConsumptionEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setAverageMemoryConsumption(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setAverageMemoryConsumption(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleAverageMemoryConsumptionEvent(averageMemoryConsumptionEvent);
             }
         });
+
         healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-                Float floatValue = e.getValue();
-
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
+                AverageRequestsInFlightEvent averageRequestsInFlightEvent = (AverageRequestsInFlightEvent) event;
+                String clusterId = averageRequestsInFlightEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setAverageRequestsInFlight(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setAverageRequestsInFlight(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleAverageRequestsInFlightEvent(averageRequestsInFlightEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
+                GradientOfLoadAverageEvent gradientOfLoadAverageEvent = (GradientOfLoadAverageEvent) event;
+                String clusterId = gradientOfLoadAverageEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setLoadAverageGradient(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setLoadAverageGradient(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleGradientOfLoadAverageEvent(gradientOfLoadAverageEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
-                    		+ "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
-                }
+                GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent = (GradientOfMemoryConsumptionEvent) event;
+                String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setMemoryConsumptionGradient(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setMemoryConsumptionGradient(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleGradientOfMemoryConsumptionEvent(gradientOfMemoryConsumptionEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
+                GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent = (GradientOfRequestsInFlightEvent) event;
+                String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setRequestsInFlightGradient(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setRequestsInFlightGradient(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleGradientOfRequestsInFlightEvent(gradientOfRequestsInFlightEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
-                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
-                if(loadAverage != null) {
-
-                    Float floatValue = e.getValue();
-                    loadAverage.setAverage(floatValue);
-
+                MemberAverageLoadAverageEvent memberAverageLoadAverageEvent = (MemberAverageLoadAverageEvent) event;
+                String memberId = memberAverageLoadAverageEvent.getMemberId();
+                Member member = getMemberByMemberId(memberId);
+                if (null == member) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", 
-                        		e.getMemberId(), floatValue));
+                        log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
                     }
+                    return;
                 }
-
+                if (!member.isActive()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member activated event has not received for the member %s. "
+                                                + "Therefore ignoring" + " the health stat", memberId));
+                    }
+                    return;
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractClusterMonitor monitor;
+                String clusterId = member.getClusterId();
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                + "[cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                monitor.handleMemberAverageLoadAverageEvent(memberAverageLoadAverageEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
-                MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
-                if(memoryConsumption != null) {
-
-                    Float floatValue = e.getValue();
-                    memoryConsumption.setAverage(floatValue);
-
+                MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent = (MemberAverageMemoryConsumptionEvent) event;
+                String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+                Member member = getMemberByMemberId(memberId);
+                if (null == member) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member avg Memory Consumption event: [member] %s [value] %s", 
-                        		e.getMemberId(), floatValue));
+                        log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
                     }
+                    return;
                 }
-
+                if (!member.isActive()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member activated event has not received for the member %s. "
+                                                + "Therefore ignoring" + " the health stat", memberId));
+                    }
+                    return;
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractClusterMonitor monitor;
+                String clusterId = member.getClusterId();
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                + "[cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                monitor.handleMemberAverageMemoryConsumptionEvent(memberAverageMemoryConsumptionEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberFaultEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberFaultEvent e = (MemberFaultEvent) event;
-                String clusterId = e.getClusterId();
-                String memberId = e.getMemberId();
-
+                MemberFaultEvent memberFaultEvent = (MemberFaultEvent) event;
+                String clusterId = memberFaultEvent.getClusterId();
+                String memberId = memberFaultEvent.getMemberId();
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Member fault event: [member] %s ", memberId));
+                }
                 if (memberId == null || memberId.isEmpty()) {
-                    if(log.isErrorEnabled()) {
-                        log.error("Member id not found in received message");
-                    }
-                } else {
-
+                    log.error("Member id not found in received message");
+                    return;
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractClusterMonitor monitor;
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member fault event: [member] %s ", e.getMemberId()));
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                + "[cluster] %s", clusterId));
                     }
-                    handleMemberFaultEvent(clusterId, memberId);
+                    return;
                 }
+                monitor.handleMemberFaultEvent(memberFaultEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
-                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
-                if(loadAverage != null) {
-
-                    Float floatValue = e.getValue();
-                    loadAverage.setGradient(floatValue);
-
+                MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent = (MemberGradientOfLoadAverageEvent) event;
+                String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+                Member member = getMemberByMemberId(memberId);
+                if (null == member) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member grad of load avg event: [member] %s "
-                        		+ "[value] %s", e.getMemberId(), floatValue));
+                        log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
                     }
+                    return;
                 }
-
+                if (!member.isActive()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member activated event has not received for the member %s. "
+                                                + "Therefore ignoring" + " the health stat", memberId));
+                    }
+                    return;
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractClusterMonitor monitor;
+                String clusterId = member.getClusterId();
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                + "[cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                monitor.handleMemberGradientOfLoadAverageEvent(memberGradientOfLoadAverageEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
-                MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
-                if(memoryConsumption != null) {
-
-                    Float floatValue = e.getValue();
-                    memoryConsumption.setGradient(floatValue);
-
+                MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent = (MemberGradientOfMemoryConsumptionEvent) event;
+                String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+                Member member = getMemberByMemberId(memberId);
+                if (null == member) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member grad of Memory Consumption event: [member] %s "
-                        		+ "[value] %s", e.getMemberId(), floatValue));
+                        log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
                     }
+                    return;
                 }
-
+                if (!member.isActive()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member activated event has not received for the member %s. "
+                                                + "Therefore ignoring" + " the health stat", memberId));
+                    }
+                    return;
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractClusterMonitor monitor;
+                String clusterId = member.getClusterId();
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                + "[cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                monitor.handleMemberGradientOfMemoryConsumptionEvent(memberGradientOfMemoryConsumptionEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
-                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
-                if(loadAverage != null) {
-
-                    Float floatValue = e.getValue();
-                    loadAverage.setSecondDerivative(floatValue);
-
+                MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent = (MemberSecondDerivativeOfLoadAverageEvent) event;
+                String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+                Member member = getMemberByMemberId(memberId);
+                if (null == member) {
                     if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member Second Derivation of load avg event: [member] %s "
-                        		+ "[value] %s", e.getMemberId(), floatValue));
+                        log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
                     }
+                    return;
+                }
+                if (!member.isActive()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member activated event has not received for the member %s. "
+                                                + "Therefore ignoring" + " the health stat", memberId));
+                    }
+                    return;
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractClusterMonitor monitor;
+                String clusterId = member.getClusterId();
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                + "[cluster] %s", clusterId));
+                    }
+                    return;
                 }
+                monitor.handleMemberSecondDerivativeOfLoadAverageEvent(memberSecondDerivativeOfLoadAverageEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-            }
 
+            }
         });
+
         healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
-                    		+ "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
-                }
+                SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent = (SecondDerivativeOfLoadAverageEvent) event;
+                String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setLoadAverageSecondDerivative(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleSecondDerivativeOfLoadAverageEvent(secondDerivativeOfLoadAverageEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
-                    		+ "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
-                }
+                SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent = (SecondDerivativeOfMemoryConsumptionEvent) event;
+                String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setMemoryConsumptionSecondDerivative(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleSecondDerivativeOfMemoryConsumptionEvent(secondDerivativeOfMemoryConsumptionEvent);
             }
-
         });
+
         healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
             @Override
             protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Second derivative of Rif event: [cluster] %s "
-                    		+ "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
-                }
+                SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent = (SecondDerivativeOfRequestsInFlightEvent) event;
+                String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
                 AutoscalerContext asCtx = AutoscalerContext.getInstance();
                 AbstractClusterMonitor monitor;
-
-                if(asCtx.clusterMonitorExist(clusterId)){
-                    monitor = asCtx.getClusterMonitor(clusterId);
-                } else {
-                    if(log.isDebugEnabled()){
+                monitor = asCtx.getClusterMonitor(clusterId);
+                if (null == monitor) {
+                    if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                        		+ "[cluster] %s", clusterId));
+                                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
-                
-                if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-                	
-                    if(null != monitor){
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-                        if(null != networkPartitionContext){
-                            networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
-                        } else {
-                            if(log.isDebugEnabled()) {
-                               log.debug(String.format("Network partition context is not available for :" +
-                                       " [network partition] %s", networkPartitionId));
-                            }
-                        }
-                    }
-                } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	KubernetesClusterContext kubernetesClusterContext = 
-                			((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
-                	if (null != kubernetesClusterContext) {
-						kubernetesClusterContext.setRequestsInFlightSecondDerivative(floatValue);
-					} else {
-                        if(log.isDebugEnabled()) {
-                            log.debug(String.format("Kubernetes cluster context is not available for :" +
-                                    " [cluster] %s", clusterId));
-                         }
-					}
-                }
+                monitor.handleSecondDerivativeOfRequestsInFlightEvent(secondDerivativeOfRequestsInFlightEvent);
             }
         });
     }
 
-
-    private LoadAverage findLoadAverage(String memberId) {
-//        String memberId = event.getProperties().get("member_id");
-        Member member = findMember(memberId);
-        
-        if(null == member){
-        	if(log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
-            }
-        	return null;
-        }
-        String clusterId = member.getClusterId();
-
-        AutoscalerContext asCtx = AutoscalerContext.getInstance();
-        AbstractClusterMonitor monitor;
-
-        if(asCtx.clusterMonitorExist(clusterId)){
-            monitor = asCtx.getClusterMonitor(clusterId);
-        } else {
-            if(log.isDebugEnabled()){
-                log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                		+ "[cluster] %s", clusterId));
-            }
-            return null;
-        }
-        
-        if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-        		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-        	
-            String networkPartitionId = findNetworkPartitionId(memberId);
-            MemberStatsContext memberStatsContext = 
-            		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId)
-                            .getPartitionCtxt(member.getPartitionId())
-                            .getMemberStatsContext(memberId);
-            if(null == memberStatsContext){
-                if(log.isDebugEnabled()) {
-                   log.debug(String.format("Member context is not available for : [member] %s", memberId));
-                }
-                return null;
-            }
-            else if(!member.isActive()){
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Member activated event has not received for the member %s. "
-                    		+ "Therefore ignoring" + " the health stat", memberId));
-                }
-                return null;
-            }
-
-            LoadAverage loadAverage = memberStatsContext.getLoadAverage();
-            return loadAverage;
-        } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-        	MemberStatsContext memberStatsContext = 
-        			((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt().getMemberStatsContext(memberId);
-            if(null == memberStatsContext){
-                if(log.isDebugEnabled()) {
-                   log.debug(String.format("Member context is not available for : [member] %s", memberId));
-                }
-                return null;
-            }
-            else if(!member.isActive()){
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Member activated event has not received for the member %s. "
-                    		+ "Therefore ignoring" + " the health stat", memberId));
-                }
-                return null;
-            }
-
-            LoadAverage loadAverage = memberStatsContext.getLoadAverage();
-            return loadAverage;
-        }
-
-        return null;
-    }
-
-    private MemoryConsumption findMemoryConsumption(String memberId) {
-//        String memberId = event.getProperties().get("member_id");
-        Member member = findMember(memberId);
-        
-        if(null == member){
-        	if(log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology : [member] %s", memberId));
-            }
-        	return null;
-        }
-        
-        AutoscalerContext asCtx = AutoscalerContext.getInstance();
-        AbstractClusterMonitor monitor;
-
-        if(asCtx.clusterMonitorExist(member.getClusterId())){
-            monitor = asCtx.getClusterMonitor(member.getClusterId());
-        } else {
-            if(log.isDebugEnabled()){
-                log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                		+ "[cluster] %s", member.getClusterId()));
-            }
-            return null;
-        }
-
-        if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-        		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-        	
-            String networkPartitionId = findNetworkPartitionId(memberId);
-            NetworkPartitionContext networkPartitionCtxt = 
-            		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-			PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
-			MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
-            if(null == memberStatsContext){
-                if(log.isDebugEnabled()) {
-                   log.debug(String.format("Member context is not available for : [member] %s", memberId));
-                }
-                return null;
-            }else if(!member.isActive()){
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Member activated event has not received for the member %s. "
-                    		+ "Therefore ignoring" + " the health stat", memberId));
-                }
-                return null;
-            }
-            MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
-            return memoryConsumption;
-        } else if (monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-        	KubernetesClusterContext kubernetesClusterCtxt = 
-        			((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
-			MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-            if(null == memberStatsContext){
-                if(log.isDebugEnabled()) {
-                   log.debug(String.format("Member context is not available for : [member] %s", memberId));
-                }
-                return null;
-            }else if(!member.isActive()){
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Member activated event has not received for the member %s. "
-                    		+ "Therefore ignoring" + " the health stat", memberId));
-                }
-                return null;
-            }
-            MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
-            return memoryConsumption;
-        }
-        
-        return null;
-    }
-
-    private String findNetworkPartitionId(String memberId) {
-        for(Service service: TopologyManager.getTopology().getServices()){
-            for(Cluster cluster: service.getClusters()){
-                if(cluster.memberExists(memberId)){
-                    return cluster.getMember(memberId).getNetworkPartitionId();
-                }
-            }
-        }
-        return null;
-    }
-
-    private Member findMember(String memberId) {
+    private Member getMemberByMemberId(String memberId) {
         try {
             TopologyManager.acquireReadLock();
-            for(Service service : TopologyManager.getTopology().getServices()) {
-                for(Cluster cluster : service.getClusters()) {
-                    if(cluster.memberExists(memberId)) {
+            for (Service service : TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
                         return cluster.getMember(memberId);
                     }
                 }
             }
             return null;
-        }
-        finally {
+        } finally {
             TopologyManager.releaseReadLock();
         }
     }
 
-    private void handleMemberFaultEvent(String clusterId, String memberId) {
-        try {
-        	AutoscalerContext asCtx = AutoscalerContext.getInstance();
-        	AbstractClusterMonitor monitor;
-        	
-            if(asCtx.clusterMonitorExist(clusterId)){
-                monitor = asCtx.getClusterMonitor(clusterId);
-            } else {
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                    		+ "[cluster] %s", clusterId));
-                }
-                return;
-            }
-
-            if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-            		|| monitor.getClusterType() == ClusterType.VMLbCluster){
-            	
-            	NetworkPartitionContext nwPartitionCtxt;
-                try{
-                	TopologyManager.acquireReadLock();
-                	Member member = findMember(memberId);
-                	
-                	if(null == member){
-                		return;
-                	}
-                    if(!member.isActive()){
-                        if(log.isDebugEnabled()){
-                            log.debug(String.format("Member activated event has not received for the member %s. "
-                            		+ "Therefore ignoring" + " the member fault health stat", memberId));
-                        }
-                        return;
-                    }
-    	            
-    	            nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(member);
-    	            
-                }finally{
-                	TopologyManager.releaseReadLock();
-                }
-                // start a new member in the same Partition
-                String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
-                PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-                if(!partitionCtxt.activeMemberExist(memberId)){
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("Could not find the active member in partition context, "
-                        		+ "[member] %s ", memberId));
-                    }
-                    return;
-                }
-                // terminate the faulty member
-                CloudControllerClient ccClient = CloudControllerClient.getInstance();
-                ccClient.terminate(memberId);
-
-                // remove from active member list
-                partitionCtxt.removeActiveMemberById(memberId);
-
-                if (log.isInfoEnabled()) {
-                    log.info(String.format("Faulty member is terminated and removed from the active members list: "
-                    		+ "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-                }
-            } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-            	// no need to do anything
-            }
-            
-        } catch (TerminationException e) {
-            log.error(e);
-        }
-    }
-
-    public void terminate(){
-    	this.terminated = true;
+    public void terminate() {
+        this.terminated = true;
     }
 }


[7/7] git commit: code review changes to cluster monitors

Posted by sa...@apache.org.
code review changes to cluster monitors

Signed-off-by: sajhak <sa...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/31056109
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/31056109
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/31056109

Branch: refs/heads/container-autoscaling
Commit: 31056109c27256e0fc84319e428390eef09f28cf
Parents: 7d61649
Author: R-Rajkumar <rr...@gmail.com>
Authored: Sun Oct 5 15:04:57 2014 +0530
Committer: sajhak <sa...@gmail.com>
Committed: Mon Oct 6 23:11:29 2014 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/AutoscalerContext.java   |  36 +-
 .../autoscaler/KubernetesClusterContext.java    | 863 ++++++++--------
 .../stratos/autoscaler/MemberStatsContext.java  |  29 +-
 .../AutoscalerHealthStatEventReceiver.java      | 991 +++++--------------
 .../AutoscalerTopologyEventReceiver.java        | 458 ++-------
 .../monitor/AbstractClusterMonitor.java         | 307 +++---
 .../monitor/ClusterMonitorFactory.java          | 250 ++---
 .../monitor/ContainerClusterMonitor.java        |  59 --
 .../monitor/DockerServiceClusterMonitor.java    | 176 ----
 .../monitor/KubernetesClusterMonitor.java       | 427 ++++++++
 .../KubernetesServiceClusterMonitor.java        | 181 ++++
 .../autoscaler/monitor/VMClusterMonitor.java    | 597 ++++++++++-
 .../autoscaler/monitor/VMLbClusterMonitor.java  |  87 +-
 .../monitor/VMServiceClusterMonitor.java        |  73 +-
 .../stratos/autoscaler/util/AutoscalerUtil.java | 391 +-------
 .../stratos/common/enums/ClusterType.java       |   5 -
 16 files changed, 2440 insertions(+), 2490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index 581d633..2d10954 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -33,6 +33,8 @@ import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
 public class AutoscalerContext {
 
     private static final Log log = LogFactory.getLog(AutoscalerContext.class);
+    private static final AutoscalerContext INSTANCE = new AutoscalerContext();
+
     private AutoscalerContext() {
         try {
             setClusterMonitors(new HashMap<String, AbstractClusterMonitor>());
@@ -40,17 +42,13 @@ public class AutoscalerContext {
             log.error("Rule evaluateMinCheck error", e);
         }
     }
-    
+
     // Map<ClusterId, AbstractClusterMonitor>
     private Map<String, AbstractClusterMonitor> clusterMonitors;
 
-	private static class Holder {
-		private static final AutoscalerContext INSTANCE = new AutoscalerContext();
-	}
-
-	public static AutoscalerContext getInstance() {
-		return Holder.INSTANCE;
-	}
+    public static AutoscalerContext getInstance() {
+        return INSTANCE;
+    }
 
     public void addClusterMonitor(AbstractClusterMonitor clusterMonitor) {
         clusterMonitors.put(clusterMonitor.getClusterId(), clusterMonitor);
@@ -59,11 +57,7 @@ public class AutoscalerContext {
     public AbstractClusterMonitor getClusterMonitor(String clusterId) {
         return clusterMonitors.get(clusterId);
     }
-    
-    public boolean clusterMonitorExist(String clusterId) {
-        return clusterMonitors.containsKey(clusterId);
-    }
-    
+
     public Map<String, AbstractClusterMonitor> getClusterMonitors() {
         return clusterMonitors;
     }
@@ -71,13 +65,15 @@ public class AutoscalerContext {
     public void setClusterMonitors(Map<String, AbstractClusterMonitor> clusterMonitors) {
         this.clusterMonitors = clusterMonitors;
     }
-    
+
     public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
-    	if(!clusterMonitorExist(clusterId)) {
-    		log.fatal("ClusterMonitor not found for cluster id: "+clusterId);
-    		return null;
-    	}
-    	log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
-        return clusterMonitors.remove(clusterId);
+
+        AbstractClusterMonitor monitor = clusterMonitors.remove(clusterId);
+        if (monitor == null) {
+            log.fatal("ClusterMonitor not found for cluster id: " + clusterId);
+        } else {
+            log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
+        }
+        return monitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
index 16bc653..c8b6e39 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
@@ -40,474 +40,475 @@ import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
 /*
  * It holds the runtime data of a kubernetes cluster
  */
-public class KubernetesClusterContext implements Serializable{
-	
-	private static final long serialVersionUID = 808741789615481596L;
-	private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
-	
-	private String kubernetesClusterId;
-	private String serviceName;
-	
+public class KubernetesClusterContext implements Serializable {
+
+    private static final long serialVersionUID = 808741789615481596L;
+    private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
+
+    private String kubernetesClusterId;
+    private String serviceName;
+
     private int minReplicas;
     private int maxReplicas;
     private int currentReplicas = 0;
-    
+
     // properties
     private Properties properties;
-    
+
     // 15 mints as the default
     private long expiryTime;
     // pending members
     private List<MemberContext> pendingMembers;
-    
+
     // active members
     private List<MemberContext> activeMembers;
 
     //Keep statistics come from CEP
     private Map<String, MemberStatsContext> memberStatsContexts;
-	
+
     //Following information will keep events details
     private RequestsInFlight requestsInFlight;
     private MemoryConsumption memoryConsumption;
     private LoadAverage loadAverage;
-    
+
     // cluster id
     private String clusterId;
-    
+
     //boolean values to keep whether the requests in flight parameters are reset or not
-    private boolean rifReset = false, averageRifReset = false, 
-    		gradientRifReset = false, secondDerivativeRifRest = false;
+    private boolean rifReset = false, averageRifReset = false,
+            gradientRifReset = false, secondDerivativeRifRest = false;
     //boolean values to keep whether the memory consumption parameters are reset or not
     private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false,
             gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false;
     //boolean values to keep whether the load average parameters are reset or not
-    private boolean loadAverageReset = false, averageLoadAverageReset = false, 
-    		gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
-    
-	public KubernetesClusterContext(String kubernetesClusterId, String clusterId){
-		this.kubernetesClusterId = kubernetesClusterId;
-		this.clusterId = clusterId;
+    private boolean loadAverageReset = false, averageLoadAverageReset = false,
+            gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
+
+    public KubernetesClusterContext(String kubernetesClusterId, String clusterId) {
+        this.kubernetesClusterId = kubernetesClusterId;
+        this.clusterId = clusterId;
         this.pendingMembers = new ArrayList<MemberContext>();
         this.activeMembers = new ArrayList<MemberContext>();
         this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>();
         this.requestsInFlight = new RequestsInFlight();
         this.loadAverage = new LoadAverage();
         this.memoryConsumption = new MemoryConsumption();
-        
+
         // check if a different value has been set for expiryTime
         XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
         expiryTime = conf.getLong("autoscaler.member.expiryTimeout", 300000);
         if (log.isDebugEnabled()) {
             log.debug("Member expiry time is set to: " + expiryTime);
         }
-        
+
         Thread th = new Thread(new PendingMemberWatcher(this));
         th.start();
-	}
-	
-	public String getKubernetesClusterID() {
-		return kubernetesClusterId;
-	}
-	public void setKubernetesClusterID(String kubernetesClusterId) {
-		this.kubernetesClusterId = kubernetesClusterId;
-	}
-	
-	public List<MemberContext> getPendingMembers() {
-		return pendingMembers;
-	}
-
-	public void setPendingMembers(List<MemberContext> pendingMembers) {
-		this.pendingMembers = pendingMembers;
-	}
-
-	public int getActiveMemberCount() {
-		return activeMembers.size();
-	}
-
-	public void setActiveMembers(List<MemberContext> activeMembers) {
-		this.activeMembers = activeMembers;
-	}
-	    
-	public int getMinReplicas() {
-		return minReplicas;
-	}
-
-	public void setMinReplicas(int minReplicas) {
-		this.minReplicas = minReplicas;
-	}
-
-	public int getMaxReplicas() {
-		return maxReplicas;
-	}
-
-	public void setMaxReplicas(int maxReplicas) {
-		this.maxReplicas = maxReplicas;
-	}
-
-	public int getCurrentReplicas() {
-		return currentReplicas;
-	}
-
-	public void setCurrentReplicas(int currentReplicas) {
-		this.currentReplicas = currentReplicas;
-	}
-
-	public void addPendingMember(MemberContext ctxt) {
-		this.pendingMembers.add(ctxt);
-	}
-	    
-	public boolean removePendingMember(String id) {
-		if (id == null) {
-			return false;
-		}
-		for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext();) {
-			MemberContext pendingMember = (MemberContext) iterator.next();
-			if (id.equals(pendingMember.getMemberId())) {
-				iterator.remove();
-				return true;
-			}
-
-		}
-
-		return false;
-	}
-
-	public void movePendingMemberToActiveMembers(String memberId) {
-		if (memberId == null) {
-			return;
-		}
-		Iterator<MemberContext> iterator = pendingMembers.listIterator();
-		while (iterator.hasNext()) {
-			MemberContext pendingMember = iterator.next();
-			if (pendingMember == null) {
-				iterator.remove();
-				continue;
-			}
-			if (memberId.equals(pendingMember.getMemberId())) {
-				// member is activated
-				// remove from pending list
-				iterator.remove();
-				// add to the activated list
-				this.activeMembers.add(pendingMember);
-				if (log.isDebugEnabled()) {
-					log.debug(String.format(
-							"Pending member is removed and added to the "
-									+ "activated member list. [Member Id] %s",
-							memberId));
-				}
-				break;
-			}
-		}
-	}
-
-	public void addActiveMember(MemberContext ctxt) {
-		this.activeMembers.add(ctxt);
-	}
-
-	public void removeActiveMember(MemberContext ctxt) {
-		this.activeMembers.remove(ctxt);
-	}
-
-	public long getExpiryTime() {
-		return expiryTime;
-	}
-
-	public void setExpiryTime(long expiryTime) {
-		this.expiryTime = expiryTime;
-	}
-	    
-	public Map<String, MemberStatsContext> getMemberStatsContexts() {
-		return memberStatsContexts;
-	}
-
-	public MemberStatsContext getMemberStatsContext(String memberId) {
-		return memberStatsContexts.get(memberId);
-	}
-
-	public void addMemberStatsContext(MemberStatsContext ctxt) {
-		this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
-	}
-
-	public void removeMemberStatsContext(String memberId) {
-		this.memberStatsContexts.remove(memberId);
-	}
-
-	public Properties getProperties() {
-		return properties;
-	}
-
-	public void setProperties(Properties properties) {
-		this.properties = properties;
-	}
-
-	public String getServiceName() {
-		return serviceName;
-	}
-
-	public void setServiceName(String serviceName) {
-		this.serviceName = serviceName;
-	}
-
-	public List<MemberContext> getActiveMembers() {
-		return activeMembers;
-	}
-
-	public boolean removeActiveMemberById(String memberId) {
-		boolean removeActiveMember = false;
-		synchronized (activeMembers) {
-			Iterator<MemberContext> iterator = activeMembers.listIterator();
-			while (iterator.hasNext()) {
-				MemberContext memberContext = iterator.next();
-				if (memberId.equals(memberContext.getMemberId())) {
-					iterator.remove();
-					removeActiveMember = true;
-
-					break;
-				}
-			}
-		}
-		return removeActiveMember;
-	}
-
-	public boolean activeMemberExist(String memberId) {
-
-		for (MemberContext memberContext : activeMembers) {
-			if (memberId.equals(memberContext.getMemberId())) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	private class PendingMemberWatcher implements Runnable {
-		private KubernetesClusterContext ctxt;
-
-		public PendingMemberWatcher(KubernetesClusterContext ctxt) {
-			this.ctxt = ctxt;
-		}
-
-		@Override
-		public void run() {
-
-			while (true) {
-				long expiryTime = ctxt.getExpiryTime();
-				List<MemberContext> pendingMembers = ctxt.getPendingMembers();
-
-				synchronized (pendingMembers) {
-					Iterator<MemberContext> iterator = pendingMembers
-							.listIterator();
-					while (iterator.hasNext()) {
-						MemberContext pendingMember = iterator.next();
-
-						if (pendingMember == null) {
-							continue;
-						}
-						long pendingTime = System.currentTimeMillis()
-								- pendingMember.getInitTime();
-						if (pendingTime >= expiryTime) {
-
-							// terminate all containers of this cluster
-							try {
-								CloudControllerClient.getInstance().terminateAllContainers(clusterId);
-								iterator.remove();
-							} catch (TerminationException e) {
-								log.error(e.getMessage(), e);
-							}
-							
-						}
-					}
-				}
-
-				try {
-					// TODO find a constant
-					Thread.sleep(15000);
-				} catch (InterruptedException ignore) {
-				}
-			}
-		}
-
-	}
-
-	public float getAverageRequestsInFlight() {
-		return requestsInFlight.getAverage();
-	}
-
-	public void setAverageRequestsInFlight(float averageRequestsInFlight) {
-		requestsInFlight.setAverage(averageRequestsInFlight);
-		averageRifReset = true;
-		if (secondDerivativeRifRest && gradientRifReset) {
-			rifReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Requests in flights stats are reset, "
-						+ "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getRequestsInFlightSecondDerivative() {
-		return requestsInFlight.getSecondDerivative();
-	}
-
-	public void setRequestsInFlightSecondDerivative(
-			float requestsInFlightSecondDerivative) {
-		requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
-		secondDerivativeRifRest = true;
-		if (averageRifReset && gradientRifReset) {
-			rifReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getRequestsInFlightGradient() {
-		return requestsInFlight.getGradient();
-	}
-
-	public void setRequestsInFlightGradient(float requestsInFlightGradient) {
-		requestsInFlight.setGradient(requestsInFlightGradient);
-		gradientRifReset = true;
-		if (secondDerivativeRifRest && averageRifReset) {
-			rifReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public boolean isRifReset() {
-		return rifReset;
-	}
-
-	public void setRifReset(boolean rifReset) {
-		this.rifReset = rifReset;
-		this.averageRifReset = rifReset;
-		this.gradientRifReset = rifReset;
-		this.secondDerivativeRifRest = rifReset;
-	}
-
-	public float getAverageMemoryConsumption() {
-		return memoryConsumption.getAverage();
-	}
-
-	public void setAverageMemoryConsumption(float averageMemoryConsumption) {
-		memoryConsumption.setAverage(averageMemoryConsumption);
-		averageMemoryConsumptionReset = true;
-		if (secondDerivativeMemoryConsumptionRest
-				&& gradientMemoryConsumptionReset) {
-			memoryConsumptionReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getMemoryConsumptionSecondDerivative() {
-		return memoryConsumption.getSecondDerivative();
-	}
-
-	public void setMemoryConsumptionSecondDerivative(
-			float memoryConsumptionSecondDerivative) {
-		memoryConsumption
-				.setSecondDerivative(memoryConsumptionSecondDerivative);
-		secondDerivativeMemoryConsumptionRest = true;
-		if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
-			memoryConsumptionReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getMemoryConsumptionGradient() {
-		return memoryConsumption.getGradient();
-	}
-
-	public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
-		memoryConsumption.setGradient(memoryConsumptionGradient);
-		gradientMemoryConsumptionReset = true;
-		if (secondDerivativeMemoryConsumptionRest
-				&& averageMemoryConsumptionReset) {
-			memoryConsumptionReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public boolean isMemoryConsumptionReset() {
-		return memoryConsumptionReset;
-	}
-
-	public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
-		this.memoryConsumptionReset = memoryConsumptionReset;
-		this.averageMemoryConsumptionReset = memoryConsumptionReset;
-		this.gradientMemoryConsumptionReset = memoryConsumptionReset;
-		this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
-	}
-
-
-	public float getAverageLoadAverage() {
-		return loadAverage.getAverage();
-	}
-
-	public void setAverageLoadAverage(float averageLoadAverage) {
-		loadAverage.setAverage(averageLoadAverage);
-		averageLoadAverageReset = true;
-		if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
-			loadAverageReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Load average stats are reset, ready to do scale check "
-						+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getLoadAverageSecondDerivative() {
-		return loadAverage.getSecondDerivative();
-	}
-
-	public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
-		loadAverage.setSecondDerivative(loadAverageSecondDerivative);
-		secondDerivativeLoadAverageRest = true;
-		if (averageLoadAverageReset && gradientLoadAverageReset) {
-			loadAverageReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Load average stats are reset, ready to do scale check "
-						+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getLoadAverageGradient() {
-		return loadAverage.getGradient();
-	}
-
-	public void setLoadAverageGradient(float loadAverageGradient) {
-		loadAverage.setGradient(loadAverageGradient);
-		gradientLoadAverageReset = true;
-		if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
-			loadAverageReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Load average stats are reset, ready to do scale check "
-						+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public boolean isLoadAverageReset() {
-		return loadAverageReset;
-	}
-
-	public void setLoadAverageReset(boolean loadAverageReset) {
-		this.loadAverageReset = loadAverageReset;
-		this.averageLoadAverageReset = loadAverageReset;
-		this.gradientLoadAverageReset = loadAverageReset;
-		this.secondDerivativeLoadAverageRest = loadAverageReset;
-	}
+    }
+
+    public String getKubernetesClusterID() {
+        return kubernetesClusterId;
+    }
+
+    public void setKubernetesClusterID(String kubernetesClusterId) {
+        this.kubernetesClusterId = kubernetesClusterId;
+    }
+
+    public List<MemberContext> getPendingMembers() {
+        return pendingMembers;
+    }
+
+    public void setPendingMembers(List<MemberContext> pendingMembers) {
+        this.pendingMembers = pendingMembers;
+    }
+
+    public int getActiveMemberCount() {
+        return activeMembers.size();
+    }
+
+    public void setActiveMembers(List<MemberContext> activeMembers) {
+        this.activeMembers = activeMembers;
+    }
+
+    public int getMinReplicas() {
+        return minReplicas;
+    }
+
+    public void setMinReplicas(int minReplicas) {
+        this.minReplicas = minReplicas;
+    }
+
+    public int getMaxReplicas() {
+        return maxReplicas;
+    }
+
+    public void setMaxReplicas(int maxReplicas) {
+        this.maxReplicas = maxReplicas;
+    }
+
+    public int getCurrentReplicas() {
+        return currentReplicas;
+    }
+
+    public void setCurrentReplicas(int currentReplicas) {
+        this.currentReplicas = currentReplicas;
+    }
+
+    public void addPendingMember(MemberContext ctxt) {
+        this.pendingMembers.add(ctxt);
+    }
+
+    public boolean removePendingMember(String id) {
+        if (id == null) {
+            return false;
+        }
+        for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) {
+            MemberContext pendingMember = (MemberContext) iterator.next();
+            if (id.equals(pendingMember.getMemberId())) {
+                iterator.remove();
+                return true;
+            }
+
+        }
+
+        return false;
+    }
+
+    public void movePendingMemberToActiveMembers(String memberId) {
+        if (memberId == null) {
+            return;
+        }
+        Iterator<MemberContext> iterator = pendingMembers.listIterator();
+        while (iterator.hasNext()) {
+            MemberContext pendingMember = iterator.next();
+            if (pendingMember == null) {
+                iterator.remove();
+                continue;
+            }
+            if (memberId.equals(pendingMember.getMemberId())) {
+                // member is activated
+                // remove from pending list
+                iterator.remove();
+                // add to the activated list
+                this.activeMembers.add(pendingMember);
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format(
+                            "Pending member is removed and added to the "
+                            + "activated member list. [Member Id] %s",
+                            memberId));
+                }
+                break;
+            }
+        }
+    }
+
+    public void addActiveMember(MemberContext ctxt) {
+        this.activeMembers.add(ctxt);
+    }
+
+    public void removeActiveMember(MemberContext ctxt) {
+        this.activeMembers.remove(ctxt);
+    }
+
+    public long getExpiryTime() {
+        return expiryTime;
+    }
+
+    public void setExpiryTime(long expiryTime) {
+        this.expiryTime = expiryTime;
+    }
+
+    public Map<String, MemberStatsContext> getMemberStatsContexts() {
+        return memberStatsContexts;
+    }
+
+    public MemberStatsContext getMemberStatsContext(String memberId) {
+        return memberStatsContexts.get(memberId);
+    }
+
+    public void addMemberStatsContext(MemberStatsContext ctxt) {
+        this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
+    }
+
+    public void removeMemberStatsContext(String memberId) {
+        this.memberStatsContexts.remove(memberId);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    public List<MemberContext> getActiveMembers() {
+        return activeMembers;
+    }
+
+    public boolean removeActiveMemberById(String memberId) {
+        boolean removeActiveMember = false;
+        synchronized (activeMembers) {
+            Iterator<MemberContext> iterator = activeMembers.listIterator();
+            while (iterator.hasNext()) {
+                MemberContext memberContext = iterator.next();
+                if (memberId.equals(memberContext.getMemberId())) {
+                    iterator.remove();
+                    removeActiveMember = true;
+
+                    break;
+                }
+            }
+        }
+        return removeActiveMember;
+    }
+
+    public boolean activeMemberExist(String memberId) {
+
+        for (MemberContext memberContext : activeMembers) {
+            if (memberId.equals(memberContext.getMemberId())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private class PendingMemberWatcher implements Runnable {
+        private KubernetesClusterContext ctxt;
+
+        public PendingMemberWatcher(KubernetesClusterContext ctxt) {
+            this.ctxt = ctxt;
+        }
+
+        @Override
+        public void run() {
+
+            while (true) {
+                long expiryTime = ctxt.getExpiryTime();
+                List<MemberContext> pendingMembers = ctxt.getPendingMembers();
+
+                synchronized (pendingMembers) {
+                    Iterator<MemberContext> iterator = pendingMembers
+                            .listIterator();
+                    while (iterator.hasNext()) {
+                        MemberContext pendingMember = iterator.next();
+
+                        if (pendingMember == null) {
+                            continue;
+                        }
+                        long pendingTime = System.currentTimeMillis()
+                                           - pendingMember.getInitTime();
+                        if (pendingTime >= expiryTime) {
+
+                            // terminate all containers of this cluster
+                            try {
+                                CloudControllerClient.getInstance().terminateAllContainers(clusterId);
+                                iterator.remove();
+                            } catch (TerminationException e) {
+                                log.error(e.getMessage(), e);
+                            }
+
+                        }
+                    }
+                }
+
+                try {
+                    // TODO find a constant
+                    Thread.sleep(15000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        }
+
+    }
+
+    public float getAverageRequestsInFlight() {
+        return requestsInFlight.getAverage();
+    }
+
+    public void setAverageRequestsInFlight(float averageRequestsInFlight) {
+        requestsInFlight.setAverage(averageRequestsInFlight);
+        averageRifReset = true;
+        if (secondDerivativeRifRest && gradientRifReset) {
+            rifReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Requests in flights stats are reset, "
+                                        + "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getRequestsInFlightSecondDerivative() {
+        return requestsInFlight.getSecondDerivative();
+    }
+
+    public void setRequestsInFlightSecondDerivative(
+            float requestsInFlightSecondDerivative) {
+        requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
+        secondDerivativeRifRest = true;
+        if (averageRifReset && gradientRifReset) {
+            rifReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getRequestsInFlightGradient() {
+        return requestsInFlight.getGradient();
+    }
+
+    public void setRequestsInFlightGradient(float requestsInFlightGradient) {
+        requestsInFlight.setGradient(requestsInFlightGradient);
+        gradientRifReset = true;
+        if (secondDerivativeRifRest && averageRifReset) {
+            rifReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public boolean isRifReset() {
+        return rifReset;
+    }
+
+    public void setRifReset(boolean rifReset) {
+        this.rifReset = rifReset;
+        this.averageRifReset = rifReset;
+        this.gradientRifReset = rifReset;
+        this.secondDerivativeRifRest = rifReset;
+    }
+
+    public float getAverageMemoryConsumption() {
+        return memoryConsumption.getAverage();
+    }
+
+    public void setAverageMemoryConsumption(float averageMemoryConsumption) {
+        memoryConsumption.setAverage(averageMemoryConsumption);
+        averageMemoryConsumptionReset = true;
+        if (secondDerivativeMemoryConsumptionRest
+            && gradientMemoryConsumptionReset) {
+            memoryConsumptionReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getMemoryConsumptionSecondDerivative() {
+        return memoryConsumption.getSecondDerivative();
+    }
+
+    public void setMemoryConsumptionSecondDerivative(
+            float memoryConsumptionSecondDerivative) {
+        memoryConsumption
+                .setSecondDerivative(memoryConsumptionSecondDerivative);
+        secondDerivativeMemoryConsumptionRest = true;
+        if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
+            memoryConsumptionReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getMemoryConsumptionGradient() {
+        return memoryConsumption.getGradient();
+    }
+
+    public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
+        memoryConsumption.setGradient(memoryConsumptionGradient);
+        gradientMemoryConsumptionReset = true;
+        if (secondDerivativeMemoryConsumptionRest
+            && averageMemoryConsumptionReset) {
+            memoryConsumptionReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public boolean isMemoryConsumptionReset() {
+        return memoryConsumptionReset;
+    }
+
+    public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
+        this.memoryConsumptionReset = memoryConsumptionReset;
+        this.averageMemoryConsumptionReset = memoryConsumptionReset;
+        this.gradientMemoryConsumptionReset = memoryConsumptionReset;
+        this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
+    }
+
+
+    public float getAverageLoadAverage() {
+        return loadAverage.getAverage();
+    }
+
+    public void setAverageLoadAverage(float averageLoadAverage) {
+        loadAverage.setAverage(averageLoadAverage);
+        averageLoadAverageReset = true;
+        if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
+            loadAverageReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Load average stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getLoadAverageSecondDerivative() {
+        return loadAverage.getSecondDerivative();
+    }
+
+    public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
+        loadAverage.setSecondDerivative(loadAverageSecondDerivative);
+        secondDerivativeLoadAverageRest = true;
+        if (averageLoadAverageReset && gradientLoadAverageReset) {
+            loadAverageReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Load average stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getLoadAverageGradient() {
+        return loadAverage.getGradient();
+    }
+
+    public void setLoadAverageGradient(float loadAverageGradient) {
+        loadAverage.setGradient(loadAverageGradient);
+        gradientLoadAverageReset = true;
+        if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
+            loadAverageReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Load average stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public boolean isLoadAverageReset() {
+        return loadAverageReset;
+    }
+
+    public void setLoadAverageReset(boolean loadAverageReset) {
+        this.loadAverageReset = loadAverageReset;
+        this.averageLoadAverageReset = loadAverageReset;
+        this.gradientLoadAverageReset = loadAverageReset;
+        this.secondDerivativeLoadAverageRest = loadAverageReset;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
index ac8b61a..bd3a6c3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
@@ -31,10 +31,10 @@ public class MemberStatsContext {
     private MemoryConsumption memoryConsumption;
     private String memberId;
 
-    public MemberStatsContext(String memberId){
+    public MemberStatsContext(String memberId) {
         this.memberId = memberId;
         memoryConsumption = new MemoryConsumption();
-        loadAverage =  new LoadAverage();
+        loadAverage = new LoadAverage();
     }
 
     public String getMemberId() {
@@ -52,4 +52,29 @@ public class MemberStatsContext {
     public MemoryConsumption getMemoryConsumption() {
         return memoryConsumption;
     }
+
+    public void setAverageLoadAverage(float value) {
+        loadAverage.setAverage(value);
+    }
+
+    public void setAverageMemoryConsumption(float value) {
+        memoryConsumption.setAverage(value);
+    }
+
+    public void setGradientOfLoadAverage(float value) {
+        loadAverage.setGradient(value);
+    }
+
+    public void setGradientOfMemoryConsumption(float value) {
+        memoryConsumption.setGradient(value);
+    }
+
+    public void setSecondDerivativeOfLoadAverage(float value) {
+        loadAverage.setSecondDerivative(value);
+    }
+
+    public void setSecondDerivativeOfMemoryConsumption(float value) {
+        memoryConsumption.setSecondDerivative(value);
+    }
+
 }


[4/7] code review changes to cluster monitors

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
new file mode 100644
index 0000000..d90e0b6
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+
+/*
+ * Every kubernetes cluster monitor should extend this class
+ */
+public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
+
+    private KubernetesClusterContext kubernetesClusterCtxt;
+    protected AutoscalePolicy autoscalePolicy;
+
+    protected KubernetesClusterMonitor(String clusterId, String serviceId,
+                                       KubernetesClusterContext kubernetesClusterContext,
+                                       AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+                                       AutoscalePolicy autoscalePolicy) {
+
+        super(clusterId, serviceId, autoscalerRuleEvaluator);
+        this.kubernetesClusterCtxt = kubernetesClusterContext;
+        this.autoscalePolicy = autoscalePolicy;
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        float value = averageLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        float value = averageRequestsInFlightEvent.getValue();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
+                                    clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+                                    + "[value] %s", clusterId, value));
+        }
+        KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+        if (null != kubernetesClusterContext) {
+            kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Kubernetes cluster context is not available for :" +
+                                        " [cluster] %s", clusterId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        float value = memberAverageLoadAverageEvent.getValue();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+        MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+    	// kill the container
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        KubernetesClusterContext kubernetesClusterContext;
+        kubernetesClusterContext = getKubernetesClusterCtxt();
+        String memberId = memberActivatedEvent.getMemberId();
+        kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been added successfully: "
+                                   + "[member] %s", memberId));
+        }
+        kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+        // because we just kill the containers
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+        // because we just kill the containers
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        // no need to do anything here
+        // we will not be receiving this event for containers
+        // because we just kill the containers
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+    }
+
+    public KubernetesClusterContext getKubernetesClusterCtxt() {
+        return kubernetesClusterCtxt;
+    }
+
+    public void setKubernetesClusterCtxt(
+            KubernetesClusterContext kubernetesClusterCtxt) {
+        this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+    }
+
+    public AutoscalePolicy getAutoscalePolicy() {
+        return autoscalePolicy;
+    }
+
+    public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
+        this.autoscalePolicy = autoscalePolicy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
new file mode 100644
index 0000000..3c81ba3
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor;
+
+import java.util.Properties;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.SpawningException;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/*
+ * It is monitoring a kubernetes service cluster periodically.
+ */
+public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
+
+    private String lbReferenceType;
+    private int numberOfReplicasInServiceCluster = 0;
+    int retryInterval = 60000;
+
+    public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
+                                           String serviceClusterID, String serviceId,
+                                           AutoscalePolicy autoscalePolicy) {
+        super(serviceClusterID, serviceId, kubernetesClusterCtxt,
+              new AutoscalerRuleEvaluator(), autoscalePolicy);
+        readConfigurations();
+    }
+
+    @Override
+    public void run() {
+
+        while (!isDestroyed()) {
+            if (log.isDebugEnabled()) {
+                log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
+            }
+            try {
+                if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+                    monitor();
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+                                  + ClusterStatus.In_Maintenance + " mode......");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
+                          e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
+            }
+        }
+    }
+
+    @Override
+    protected void monitor() {
+
+        int minReplicas;
+        try {
+            TopologyManager.acquireReadLock();
+            Service service = TopologyManager.getTopology().getService(getServiceId());
+            Cluster cluster = service.getCluster(getClusterId());
+            Properties props = cluster.getProperties();
+            minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+
+        // is container created successfully?
+        boolean success = false;
+        String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID();
+        int activeMembers = getKubernetesClusterCtxt().getActiveMembers().size();
+        int pendingMembers = getKubernetesClusterCtxt().getPendingMembers().size();
+        int nonTerminatedMembers = activeMembers + pendingMembers;
+
+        if (nonTerminatedMembers == 0) {
+            while (!success) {
+                try {
+                    CloudControllerClient ccClient = CloudControllerClient.getInstance();
+                    MemberContext memberContext = ccClient.createContainer(kubernetesClusterId, getClusterId());
+                    if (null != memberContext) {
+                        getKubernetesClusterCtxt().addPendingMember(memberContext);
+                        success = true;
+                        numberOfReplicasInServiceCluster = minReplicas;
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Pending member added, [member] %s [kub cluster] %s",
+                                                    memberContext.getMemberId(), kubernetesClusterId));
+                        }
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Returned member context is null, did not add to pending members");
+                        }
+                    }
+                } catch (SpawningException spawningException) {
+                    if (log.isDebugEnabled()) {
+                        String message = "Cannot create containers, will retry in "
+                                         + (retryInterval / 1000) + "s";
+                        log.debug(message, spawningException);
+                    }
+                } catch (Exception exception) {
+                    if (log.isDebugEnabled()) {
+                        String message = "Error while creating containers, will retry in "
+                                         + (retryInterval / 1000) + "s";
+                        log.debug(message, exception);
+                    }
+                }
+                try {
+                    Thread.sleep(retryInterval);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        }
+    }
+
+    @Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "KubernetesServiceClusterMonitor "
+               + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
+               + ", clusterId=" + getClusterId()
+               + ", serviceId=" + getServiceId() + "]";
+    }
+
+    public String getLbReferenceType() {
+        return lbReferenceType;
+    }
+
+    public void setLbReferenceType(String lbReferenceType) {
+        this.lbReferenceType = lbReferenceType;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
index ffd6713..38ed1a6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
@@ -22,62 +22,581 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.MemberStatsContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
 import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
 /**
  * Is responsible for monitoring a service cluster. This runs periodically
  * and perform minimum instance check and scaling check using the underlying
  * rules engine.
- *
  */
-   abstract public class VMClusterMonitor extends AbstractClusterMonitor{
-
-	private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
-	// Map<NetworkpartitionId, Network Partition Context>
-	protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
-	protected DeploymentPolicy deploymentPolicy;
-	protected AutoscalePolicy autoscalePolicy;
-	
-    protected VMClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, 
-    		AutoscalerRuleEvaluator autoscalerRuleEvaluator, 
-    		DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy, 
-    		Map<String, NetworkPartitionContext> networkPartitionCtxts) {
-    	super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator);
-    	this.deploymentPolicy = deploymentPolicy;
-    	this.autoscalePolicy = autoscalePolicy;
-    	this.networkPartitionCtxts = networkPartitionCtxts;
-    }
-
-   	public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
-   		log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
-		String networkPartitionId = member.getNetworkPartitionId();
-    	if(networkPartitionCtxts.containsKey(networkPartitionId)) {
-    		log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
-    		return networkPartitionCtxts.get(networkPartitionId);
-    	}
-    	log.info("returning null getNetworkPartitionCtxt");
-   	    return null;
-   	}
-   	
-    public String getPartitionOfMember(String memberId){
-        for(Service service: TopologyManager.getTopology().getServices()){
-            for(Cluster cluster: service.getClusters()){
-                if(cluster.memberExists(memberId)){
+abstract public class VMClusterMonitor extends AbstractClusterMonitor {
+
+    private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
+    // Map<NetworkpartitionId, Network Partition Context>
+    protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
+    protected DeploymentPolicy deploymentPolicy;
+    protected AutoscalePolicy autoscalePolicy;
+
+    protected VMClusterMonitor(String clusterId, String serviceId,
+                               AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+                               DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
+                               Map<String, NetworkPartitionContext> networkPartitionCtxts) {
+        super(clusterId, serviceId, autoscalerRuleEvaluator);
+        this.deploymentPolicy = deploymentPolicy;
+        this.autoscalePolicy = autoscalePolicy;
+        this.networkPartitionCtxts = networkPartitionCtxts;
+    }
+
+    @Override
+    public void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent) {
+
+        String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = averageLoadAverageEvent.getClusterId();
+        float value = averageLoadAverageEvent.getValue();
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setAverageLoadAverage(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+
+    }
+
+    @Override
+    public void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+        String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = gradientOfLoadAverageEvent.getClusterId();
+        float value = gradientOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setLoadAverageGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+        String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+        float value = secondDerivativeOfLoadAverageEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setLoadAverageSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+        String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = averageMemoryConsumptionEvent.getClusterId();
+        float value = averageMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
+                                    + "[value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setAverageMemoryConsumption(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                                  .format("Network partition context is not available for :"
+                                          + " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+        float value = gradientOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setMemoryConsumptionGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+        String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+        float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+        String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = averageRequestsInFlightEvent.getClusterId();
+        float value = averageRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setAverageRequestsInFlight(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+        String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+        float value = gradientOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+                                    clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setRequestsInFlightGradient(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+        String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+        String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+        float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+                                    + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+        }
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        if (null != networkPartitionContext) {
+            networkPartitionContext.setRequestsInFlightSecondDerivative(value);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Network partition context is not available for :" +
+                                        " [network partition] %s", networkPartitionId));
+            }
+        }
+    }
+
+    @Override
+    public void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+        String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageMemoryConsumptionEvent.getValue();
+        memberStatsContext.setAverageMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+        String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfMemoryConsumptionEvent.getValue();
+        memberStatsContext.setGradientOfMemoryConsumption(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+    }
+
+    @Override
+    public void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+        String memberId = memberAverageLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberAverageLoadAverageEvent.getValue();
+        memberStatsContext.setAverageLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+        String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberGradientOfLoadAverageEvent.getValue();
+        memberStatsContext.setGradientOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+        String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+        MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+        if (null == memberStatsContext) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return;
+        }
+        float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+        memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+    }
+
+    @Override
+    public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+        String memberId = memberFaultEvent.getMemberId();
+        Member member = getMemberByMemberId(memberId);
+        if (null == member) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+            }
+            return;
+        }
+        if (!member.isActive()) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member activated event has not received for the member %s. "
+                                        + "Therefore ignoring" + " the member fault health stat", memberId));
+            }
+            return;
+        }
+
+        NetworkPartitionContext nwPartitionCtxt;
+        nwPartitionCtxt = getNetworkPartitionCtxt(member);
+        String partitionId = getPartitionOfMember(memberId);
+        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+        if (!partitionCtxt.activeMemberExist(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Could not find the active member in partition context, "
+                                        + "[member] %s ", memberId));
+            }
+            return;
+        }
+        // terminate the faulty member
+        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+        try {
+            ccClient.terminate(memberId);
+        } catch (TerminationException e) {
+            String msg = "TerminationException " + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+        // remove from active member list
+        partitionCtxt.removeActiveMemberById(memberId);
+        if (log.isInfoEnabled()) {
+            String clusterId = memberFaultEvent.getClusterId();
+            log.info(String.format("Faulty member is terminated and removed from the active members list: "
+                                   + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+        }
+    }
+
+    @Override
+    public void handleMemberStartedEvent(
+            MemberStartedEvent memberStartedEvent) {
+
+    }
+
+    @Override
+    public void handleMemberActivatedEvent(
+            MemberActivatedEvent memberActivatedEvent) {
+
+        String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+        String partitionId = memberActivatedEvent.getPartitionId();
+        String memberId = memberActivatedEvent.getMemberId();
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionContext;
+        partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been added successfully: "
+                                   + "[member] %s", memberId));
+        }
+        partitionContext.movePendingMemberToActiveMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+        String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
+        String partitionId = maintenanceModeEvent.getPartitionId();
+        String memberId = maintenanceModeEvent.getMemberId();
+        NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Member has been moved as pending termination: "
+                                    + "[member] %s", memberId));
+        }
+        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+    }
+
+    @Override
+    public void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+        NetworkPartitionContext nwPartitionCtxt;
+        String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
+        nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+
+        // start a new member in the same Partition
+        String memberId = memberReadyToShutdownEvent.getMemberId();
+        String partitionId = getPartitionOfMember(memberId);
+        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+        // terminate the shutdown ready member
+        CloudControllerClient ccClient = CloudControllerClient.getInstance();
+        try {
+            ccClient.terminate(memberId);
+            // remove from active member list
+            partitionCtxt.removeActiveMemberById(memberId);
+
+            String clusterId = memberReadyToShutdownEvent.getClusterId();
+            log.info(String.format("Member is terminated and removed from the active members list: "
+                                   + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+        } catch (TerminationException e) {
+            String msg = "TerminationException" + e.getLocalizedMessage();
+            log.error(msg, e);
+        }
+    }
+
+    @Override
+    public void handleMemberTerminatedEvent(
+            MemberTerminatedEvent memberTerminatedEvent) {
+
+        String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+        String memberId = memberTerminatedEvent.getMemberId();
+        String partitionId = memberTerminatedEvent.getPartitionId();
+        NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+        PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
+        partitionContext.removeMemberStatsContext(memberId);
+
+        if (partitionContext.removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (partitionContext.removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (partitionContext.removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is removed from "
+                                   + "active members list", memberId));
+        } else if (partitionContext.removeObsoleteMember(memberId)) {
+            log.warn(String.format("Member's obsolated timeout has been expired and "
+                                   + "it is removed from obsolated members list", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list active, "
+                                   + "pending and termination pending", memberId));
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been removed successfully: "
+                                   + "[member] %s", memberId));
+        }
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+    }
+
+    private String getNetworkPartitionIdByMemberId(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
+                    return cluster.getMember(memberId).getNetworkPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    private Member getMemberByMemberId(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for (Service service : TopologyManager.getTopology().getServices()) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        } finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
+        log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
+        String networkPartitionId = member.getNetworkPartitionId();
+        if (networkPartitionCtxts.containsKey(networkPartitionId)) {
+            log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
+            return networkPartitionCtxts.get(networkPartitionId);
+        }
+        log.info("returning null getNetworkPartitionCtxt");
+        return null;
+    }
+
+    public String getPartitionOfMember(String memberId) {
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.memberExists(memberId)) {
                     return cluster.getMember(memberId).getPartitionId();
                 }
             }
         }
         return null;
-   	}
-    
+    }
+
     public DeploymentPolicy getDeploymentPolicy() {
         return deploymentPolicy;
     }
@@ -92,7 +611,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
     public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
         this.autoscalePolicy = autoscalePolicy;
-    }    
+    }
 
     public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
         return networkPartitionCtxts;
@@ -113,7 +632,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
     public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
         this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
     }
-    
+
     public NetworkPartitionContext getPartitionCtxt(String id) {
         return this.networkPartitionCtxts.get(id);
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index f547cb1..a0c66f0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -18,36 +18,39 @@
  */
 package org.apache.stratos.autoscaler.monitor;
 
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
 import org.apache.stratos.autoscaler.PartitionContext;
 import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
 import org.apache.stratos.autoscaler.util.AutoScalerConstants;
 import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
 
 /**
  * Is responsible for monitoring a service cluster. This runs periodically
  * and perform minimum instance check and scaling check using the underlying
  * rules engine.
- *
  */
-public class VMLbClusterMonitor extends VMClusterMonitor{
+public class VMLbClusterMonitor extends VMClusterMonitor {
 
     private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
 
     public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
-                            AutoscalePolicy autoscalePolicy) {
-    	super(clusterId, serviceId, ClusterType.VMLbCluster, new AutoscalerRuleEvaluator(), 
-    			deploymentPolicy, autoscalePolicy, 
-    			new ConcurrentHashMap<String, NetworkPartitionContext>());
+                              AutoscalePolicy autoscalePolicy) {
+        super(clusterId, serviceId, new AutoscalerRuleEvaluator(),
+              deploymentPolicy, autoscalePolicy,
+              new ConcurrentHashMap<String, NetworkPartitionContext>());
         readConfigurations();
     }
 
@@ -56,27 +59,27 @@ public class VMLbClusterMonitor extends VMClusterMonitor{
 
         while (!isDestroyed()) {
             if (log.isDebugEnabled()) {
-                log.debug("VMLbClusterMonitor is running.. "+this.toString());
+                log.debug("VMLbClusterMonitor is running.. " + this.toString());
             }
             try {
-                if( !ClusterStatus.In_Maintenance.equals(getStatus())) {
+                if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
                     monitor();
                 } else {
                     if (log.isDebugEnabled()) {
                         log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
-                                    ClusterStatus.In_Maintenance + " mode......");
+                                  ClusterStatus.In_Maintenance + " mode......");
                     }
                 }
             } catch (Exception e) {
-                log.error("VMLbClusterMonitor : Monitor failed. "+this.toString(), e);
+                log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
             }
             try {
-                Thread.sleep(getMonitorInterval());
+                Thread.sleep(getMonitorIntervalMilliseconds());
             } catch (InterruptedException ignore) {
             }
         }
     }
-    
+
     @Override
     protected void monitor() {
         // TODO make this concurrent
@@ -84,21 +87,21 @@ public class VMLbClusterMonitor extends VMClusterMonitor{
 
             // minimum check per partition
             for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
-                                                                            .values()) {
+                    .values()) {
 
                 if (partitionContext != null) {
                     getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
                     getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
-                    
+
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("Running minimum check for partition %s ",
                                                 partitionContext.getPartitionId()));
                     }
 
                     minCheckFactHandle =
-                                         AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
-                                                                                  minCheckFactHandle,
-                                                                                  partitionContext);
+                            AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
+                                                                     minCheckFactHandle,
+                                                                     partitionContext);
                     // start only in the first partition context
                     break;
                 }
@@ -106,25 +109,55 @@ public class VMLbClusterMonitor extends VMClusterMonitor{
             }
 
         }
-    }     
-    
-	@Override
+    }
+
+    @Override
     public void destroy() {
         getMinCheckKnowledgeSession().dispose();
         getMinCheckKnowledgeSession().dispose();
         setDestroyed(true);
-        if(log.isDebugEnabled()) {
-            log.debug("VMLbClusterMonitor Drools session has been disposed. "+this.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
         }
     }
-    
+
     @Override
-    protected void readConfigurations () {
+    protected void readConfigurations() {
         XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
         int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
-        setMonitorInterval(monitorInterval);
+        setMonitorIntervalMilliseconds(monitorInterval);
         if (log.isDebugEnabled()) {
-            log.debug("VMLbClusterMonitor task interval: " + getMonitorInterval());
+            log.debug("VMLbClusterMonitor task interval: " + getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public void handleClusterRemovedEvent(
+            ClusterRemovedEvent clusterRemovedEvent) {
+
+        String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
+        String clusterId = clusterRemovedEvent.getClusterId();
+        DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
+        if (depPolicy != null) {
+            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+                    .getNetworkPartitionLbHolders(depPolicy);
+
+            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+                // removes lb cluster ids
+                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+                if (isRemoved) {
+                    log.info("Removed the lb cluster [id]:"
+                             + clusterId
+                             + " reference from Network Partition [id]: "
+                             + networkPartitionLbHolder
+                            .getNetworkPartitionId());
+
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug(networkPartitionLbHolder);
+                }
+
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index 9e97e19..0452e32 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -35,14 +35,12 @@ import org.apache.stratos.autoscaler.util.ConfUtil;
 import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
 import org.apache.stratos.cloud.controller.stub.pojo.Properties;
 import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 
 /**
  * Is responsible for monitoring a service cluster. This runs periodically
  * and perform minimum instance check and scaling check using the underlying
  * rules engine.
- *
  */
 public class VMServiceClusterMonitor extends VMClusterMonitor {
 
@@ -50,11 +48,12 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
     private String lbReferenceType;
     private boolean hasPrimary;
 
-    public VMServiceClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
-                          AutoscalePolicy autoscalePolicy) {
-    	super(clusterId, serviceId, ClusterType.VMServiceCluster, new AutoscalerRuleEvaluator(), 
-    			deploymentPolicy, autoscalePolicy, 
-    			new ConcurrentHashMap<String, NetworkPartitionContext>());
+    public VMServiceClusterMonitor(String clusterId, String serviceId,
+                                   DeploymentPolicy deploymentPolicy,
+                                   AutoscalePolicy autoscalePolicy) {
+        super(clusterId, serviceId, new AutoscalerRuleEvaluator(),
+              deploymentPolicy, autoscalePolicy,
+              new ConcurrentHashMap<String, NetworkPartitionContext>());
         readConfigurations();
     }
 
@@ -73,19 +72,19 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
                 log.debug("VMServiceClusterMonitor is running.. " + this.toString());
             }
             try {
-                if(!ClusterStatus.In_Maintenance.equals(getStatus())) {
+                if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
                     monitor();
                 } else {
                     if (log.isDebugEnabled()) {
                         log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
-                                    ClusterStatus.In_Maintenance + " mode......");
+                                  ClusterStatus.In_Maintenance + " mode......");
                     }
                 }
             } catch (Exception e) {
                 log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
             }
             try {
-                Thread.sleep(getMonitorInterval());
+                Thread.sleep(getMonitorIntervalMilliseconds());
             } catch (InterruptedException ignore) {
             }
         }
@@ -105,13 +104,13 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
                 List<String> primaryMemberListInPartition = new ArrayList<String>();
                 // get active primary members in this partition context
                 for (MemberContext memberContext : partitionContext.getActiveMembers()) {
-                    if (isPrimaryMember(memberContext)){
+                    if (isPrimaryMember(memberContext)) {
                         primaryMemberListInPartition.add(memberContext.getMemberId());
                     }
                 }
                 // get pending primary members in this partition context
                 for (MemberContext memberContext : partitionContext.getPendingMembers()) {
-                    if (isPrimaryMember(memberContext)){
+                    if (isPrimaryMember(memberContext)) {
                         primaryMemberListInPartition.add(memberContext.getMemberId());
                     }
                 }
@@ -134,19 +133,19 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
             boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
             boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
             if (log.isDebugEnabled()) {
-                log.debug("flag of rifReset: "  + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
-                        + " flag of loadAverageReset" + loadAverageReset);
+                log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
+                          + " flag of loadAverageReset" + loadAverageReset);
             }
             if (rifReset || memoryConsumptionReset || loadAverageReset) {
-            	getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
                 //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
-            	getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
-            	getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-            	getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-            	getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-            	getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
-            	getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
-            	getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
+                getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
+                getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+                getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+                getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+                getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
 
                 if (log.isDebugEnabled()) {
                     log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
@@ -161,12 +160,12 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
                 networkPartitionContext.setLoadAverageReset(false);
             } else if (log.isDebugEnabled()) {
                 log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
-                        "cycle for network partition %s", networkPartitionContext.getId()));
+                                        "cycle for network partition %s", networkPartitionContext.getId()));
             }
         }
     }
-    
-    private boolean isPrimaryMember(MemberContext memberContext){
+
+    private boolean isPrimaryMember(MemberContext memberContext) {
         Properties props = memberContext.getProperties();
         if (log.isDebugEnabled()) {
             log.debug(" Properties [" + props + "] ");
@@ -176,7 +175,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
                 if (prop.getName().equals("PRIMARY")) {
                     if (Boolean.parseBoolean(prop.getValue())) {
                         log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
-                                "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
+                                  "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
                         return true;
                     }
                 }
@@ -184,33 +183,33 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
         }
         return false;
     }
-    
+
     @Override
-    protected void readConfigurations () {
+    protected void readConfigurations() {
         XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
         int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
-        setMonitorInterval(monitorInterval);
+        setMonitorIntervalMilliseconds(monitorInterval);
         if (log.isDebugEnabled()) {
-            log.debug("VMServiceClusterMonitor task interval: " + getMonitorInterval());
+            log.debug("VMServiceClusterMonitor task interval: " + getMonitorIntervalMilliseconds());
         }
     }
-    
-	@Override
+
+    @Override
     public void destroy() {
         getMinCheckKnowledgeSession().dispose();
         getScaleCheckKnowledgeSession().dispose();
         setDestroyed(true);
-        if(log.isDebugEnabled()) {
-            log.debug("VMServiceClusterMonitor Drools session has been disposed. "+this.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
         }
     }
 
     @Override
     public String toString() {
         return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
-                ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
-                ", lbReferenceType=" + lbReferenceType +
-                ", hasPrimary=" + hasPrimary + " ]";
+               ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
+               ", lbReferenceType=" + lbReferenceType +
+               ", hasPrimary=" + hasPrimary + " ]";
     }
 
     public String getLbReferenceType() {


[3/7] code review changes to cluster monitors

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index 4f58e8d..e7c16fe 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -19,39 +19,16 @@
 
 package org.apache.stratos.autoscaler.util;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.om.OMElement;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.VMServiceClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.DockerServiceClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.VMLbClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
 import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.util.Constants;
-
-import javax.xml.namespace.QName;
-
-import java.util.*;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
 
 /**
  * This class contains utility methods used by Autoscaler.
@@ -64,302 +41,6 @@ public class AutoscalerUtil {
 
     }
 
-
-    /**
-     * Updates ClusterContext for given cluster
-     *
-     * @param cluster
-     * @return ClusterMonitor - Updated ClusterContext
-     * @throws PolicyValidationException
-     * @throws PartitionValidationException
-     */
-//    public static ClusterMonitor getClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-//        // FIXME fix the following code to correctly update
-//        // AutoscalerContext context = AutoscalerContext.getInstance();
-//        if (null == cluster) {
-//            return null;
-//        }
-//
-//        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//        String deploymentPolicyName = cluster.getDeploymentPolicyName();
-//
-//        if (log.isDebugEnabled()) {
-//            log.debug("Deployment policy name: " + deploymentPolicyName);
-//            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-//        }
-//
-//        AutoscalePolicy policy =
-//                                 PolicyManager.getInstance()
-//                                              .getAutoscalePolicy(autoscalePolicyName);
-//        DeploymentPolicy deploymentPolicy =
-//                                            PolicyManager.getInstance()
-//                                                         .getDeploymentPolicy(deploymentPolicyName);
-//
-//        if (deploymentPolicy == null) {
-//            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-//            log.error(msg);
-//            throw new PolicyValidationException(msg);
-//        }
-//
-//        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-//        if (allPartitions == null) {
-//            String msg =
-//                         "Deployment Policy's Partitions are null. Policy name: " +
-//                                 deploymentPolicyName;
-//            log.error(msg);
-//            throw new PolicyValidationException(msg);
-//        }
-//
-//        CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-//
-//        ClusterMonitor clusterMonitor =
-//                                        new ClusterMonitor(cluster.getClusterId(),
-//                                                           cluster.getServiceName(),
-//                                                           deploymentPolicy, policy);
-//        clusterMonitor.setStatus(ClusterStatus.Created);
-//        
-//        for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
-//
-//            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-//                    partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
-//
-//            for(Partition partition: partitionGroup.getPartitions()){
-//                PartitionContext partitionContext = new PartitionContext(partition);
-//                partitionContext.setServiceName(cluster.getServiceName());
-//                partitionContext.setProperties(cluster.getProperties());
-//                partitionContext.setNetworkPartitionId(partitionGroup.getId());
-//                
-//                for (Member member: cluster.getMembers()){
-//                    String memberId = member.getMemberId();
-//                    if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
-//                        MemberContext memberContext = new MemberContext();
-//                        memberContext.setClusterId(member.getClusterId());
-//                        memberContext.setMemberId(memberId);
-//                        memberContext.setPartition(partition);
-//                        memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-//                        
-//                        if(MemberStatus.Activated.equals(member.getStatus())){
-//                            partitionContext.addActiveMember(memberContext);
-////                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-////                            partitionContext.incrementCurrentActiveMemberCount(1);
-//
-//                        } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){
-//                            partitionContext.addPendingMember(memberContext);
-//
-////                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                        } else if(MemberStatus.Suspended.equals(member.getStatus())){
-////                            partitionContext.addFaultyMember(memberId);
-//                        }
-//                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-//                        if(log.isInfoEnabled()){
-//                            log.info(String.format("Member stat context has been added: [member] %s", memberId));
-//                        }
-//                    }
-//
-//                }
-//                networkPartitionContext.addPartitionContext(partitionContext);
-//                if(log.isInfoEnabled()){
-//                    log.info(String.format("Partition context has been added: [partition] %s",
-//                            partitionContext.getPartitionId()));
-//                }
-//            }
-//
-//            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-//            if(log.isInfoEnabled()){
-//                log.info(String.format("Network partition context has been added: [network partition] %s",
-//                            networkPartitionContext.getId()));
-//            }
-//        }
-//        
-//        
-//        // find lb reference type
-//        java.util.Properties props = cluster.getProperties();
-//        
-//        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
-//            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-//            clusterMonitor.setLbReferenceType(value);
-//            if(log.isDebugEnabled()) {
-//                log.debug("Set the lb reference type: "+value);
-//            }
-//        }
-//        
-//        // set hasPrimary property
-//        // hasPrimary is true if there are primary members available in that cluster
-//        clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
-//
-//        log.info("Cluster monitor created: "+clusterMonitor.toString());
-//        return clusterMonitor;
-//    }
-//    
-//    private static Properties convertMemberPropsToMemberContextProps(
-//			java.util.Properties properties) {
-//    	Properties props = new Properties();
-//    	for (Map.Entry<Object, Object> e : properties.entrySet()	) {
-//			Property prop = new Property();
-//			prop.setName((String)e.getKey());
-//			prop.setValue((String)e.getValue());
-//			props.addProperties(prop);
-//		}    	
-//		return props;
-//	}
-//
-//
-//	public static LbClusterMonitor getLBClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-//        // FIXME fix the following code to correctly update
-//        // AutoscalerContext context = AutoscalerContext.getInstance();
-//        if (null == cluster) {
-//            return null;
-//        }
-//
-//        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//        String deploymentPolicyName = cluster.getDeploymentPolicyName();
-//
-//        if (log.isDebugEnabled()) {
-//            log.debug("Deployment policy name: " + deploymentPolicyName);
-//            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-//        }
-//
-//        AutoscalePolicy policy =
-//                                 PolicyManager.getInstance()
-//                                              .getAutoscalePolicy(autoscalePolicyName);
-//        DeploymentPolicy deploymentPolicy =
-//                                            PolicyManager.getInstance()
-//                                                         .getDeploymentPolicy(deploymentPolicyName);
-//
-//        if (deploymentPolicy == null) {
-//            String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-//            log.error(msg);
-//            throw new PolicyValidationException(msg);
-//        }
-//
-//        String clusterId = cluster.getClusterId();
-//        LbClusterMonitor clusterMonitor =
-//                                        new LbClusterMonitor(clusterId,
-//                                                           cluster.getServiceName(),
-//                                                           deploymentPolicy, policy);
-//        clusterMonitor.setStatus(ClusterStatus.Created);
-//        // partition group = network partition context
-//        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-//
-//            NetworkPartitionLbHolder networkPartitionLbHolder =
-//                                                              PartitionManager.getInstance()
-//                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
-////                                                              PartitionManager.getInstance()
-////                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
-//            // FIXME pick a random partition
-//            Partition partition =
-//                                  partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
-//            PartitionContext partitionContext = new PartitionContext(partition);
-//            partitionContext.setServiceName(cluster.getServiceName());
-//            partitionContext.setProperties(cluster.getProperties());
-//            partitionContext.setNetworkPartitionId(partitionGroup.getId());
-//            partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
-//
-//            NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-//                    partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ;
-//            for (Member member : cluster.getMembers()) {
-//                String memberId = member.getMemberId();
-//                if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
-//                    MemberContext memberContext = new MemberContext();
-//                    memberContext.setClusterId(member.getClusterId());
-//                    memberContext.setMemberId(memberId);
-//                    memberContext.setPartition(partition);
-//
-//                    if (MemberStatus.Activated.equals(member.getStatus())) {
-//                        partitionContext.addActiveMember(memberContext);
-////                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-////                        partitionContext.incrementCurrentActiveMemberCount(1);
-//                    } else if (MemberStatus.Created.equals(member.getStatus()) ||
-//                               MemberStatus.Starting.equals(member.getStatus())) {
-//                        partitionContext.addPendingMember(memberContext);
-////                        networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//                    } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-////                        partitionContext.addFaultyMember(memberId);
-//                    }
-//
-//                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-//                    if(log.isInfoEnabled()){
-//                        log.info(String.format("Member stat context has been added: [member] %s", memberId));
-//                    }
-//                }
-//
-//            }
-//            networkPartitionContext.addPartitionContext(partitionContext);
-//            
-//            // populate lb cluster id in network partition context.
-//            java.util.Properties props = cluster.getProperties();
-//
-//            // get service type of load balanced cluster
-//            String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-//            
-//            if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
-//                String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-//                
-//                if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
-//                    networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
-//
-//                } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
-//                    String serviceName = cluster.getServiceName();
-//                    // TODO: check if this is correct
-//                    networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
-//
-//                    if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
-//                        networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
-//                        if (log.isDebugEnabled()) {
-//                            log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
-//                        }
-//                    }
-//                }
-//            }
-//
-//            clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-//        }
-//
-//        log.info("LB Cluster monitor created: "+clusterMonitor.toString());
-//        return clusterMonitor;
-//    }
-//	
-//    public static DockerClusterMonitor getDockerClusterMonitor(Cluster cluster) {
-//
-//    	if (null == cluster) {
-//            return null;
-//        }
-//
-//        String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//        if (log.isDebugEnabled()) {
-//            log.debug("Autoscaler policy name: " + autoscalePolicyName);
-//        }
-//
-//        AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-//        java.util.Properties props = cluster.getProperties();
-//        String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-//		KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID);
-//
-//        DockerClusterMonitor dockerClusterMonitor = new DockerClusterMonitor(
-//        		kubernetesClusterCtxt, 
-//        		cluster.getClusterId(), 
-//        		cluster.getServiceName(), 
-//        		policy);
-//                                        
-//        dockerClusterMonitor.setStatus(ClusterStatus.Created);
-//        
-//        // find lb reference type
-//        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
-//            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-//            dockerClusterMonitor.setLbReferenceType(value);
-//            if(log.isDebugEnabled()) {
-//                log.debug("Set the lb reference type: "+value);
-//            }
-//        }
-//        
-////        // set hasPrimary property
-////        // hasPrimary is true if there are primary members available in that cluster
-////        dockerClusterMonitor.setHasPrimary(Boolean.parseBoolean(props.getProperty(Constants.IS_PRIMARY)));
-//
-//        log.info("Docker cluster monitor created: "+ dockerClusterMonitor.toString());
-//        return dockerClusterMonitor;
-//    }
-    
     public static Properties getProperties(final OMElement elt) {
 
         Iterator<?> it = elt.getChildrenWithName(new QName(AutoScalerConstants.PROPERTY_ELEMENT));
@@ -400,64 +81,4 @@ public class AutoscalerUtil {
         properties.setProperties(propertyArray);
         return properties;
     }
-
-//    public static LbClusterMonitor getLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-//        if (null == cluster) {
-//               return null;
-//           }
-//
-//           String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//           String deploymentPolicyName = cluster.getDeploymentPolicyName();
-//
-//           if (log.isDebugEnabled()) {
-//               log.debug("Deployment policy name: " + deploymentPolicyName);
-//               log.debug("Autoscaler policy name: " + autoscalePolicyName);
-//           }
-//
-//           AutoscalePolicy policy =
-//                                    PolicyManager.getInstance()
-//                                                 .getAutoscalePolicy(autoscalePolicyName);
-//           DeploymentPolicy deploymentPolicy =
-//                                               PolicyManager.getInstance()
-//                                                            .getDeploymentPolicy(deploymentPolicyName);
-//
-//           if (deploymentPolicy == null) {
-//               String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-//               log.error(msg);
-//               throw new PolicyValidationException(msg);
-//           }
-//
-//           Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-//           if (allPartitions == null) {
-//               String msg =
-//                            "Deployment Policy's Partitions are null. Policy name: " +
-//                                    deploymentPolicyName;
-//               log.error(msg);
-//               throw new PolicyValidationException(msg);
-//           }
-//
-//           try {
-//               validateExistenceOfPartions(allPartitions);
-//           } catch (InvalidPartitionException e) {
-//               String msg = "Deployment Policy is invalid. Policy name: " + deploymentPolicyName;
-//               log.error(msg, e);
-//               throw new PolicyValidationException(msg, e);
-//           }
-//
-//           CloudControllerClient.getInstance()
-//                                .validateDeploymentPolicy(cluster.getServiceName(),
-//                                                            allPartitions);
-//
-//           LbClusterMonitor clusterMonitor =
-//                                           new LbClusterMonitor(cluster.getClusterId(),
-//                                                              cluster.getServiceName(),
-//                                                              deploymentPolicy, policy);
-//           for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
-//
-//               NetworkPartitionContext networkPartitionContext
-//                       = PartitionManager.getInstance().getNetworkPartitionLbHolder(partitionGroup.getNetworkPartitionId());
-//               clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-//           }
-//        return null;
-//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java
deleted file mode 100644
index 8842fb6..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.stratos.common.enums;
-
-public enum ClusterType {
-	VMServiceCluster, VMLbCluster, DockerServiceCluster, DockerLbCluster;
-}


[2/7] git commit: using executor instead of nornal threads

Posted by sa...@apache.org.
using executor instead of nornal threads

Signed-off-by: sajhak <sa...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/244030bb
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/244030bb
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/244030bb

Branch: refs/heads/container-autoscaling
Commit: 244030bb906aadfc0058c84883fa96431d8d4a79
Parents: 3105610
Author: R-Rajkumar <rr...@gmail.com>
Authored: Mon Oct 6 11:11:08 2014 +0530
Committer: sajhak <sa...@gmail.com>
Committed: Mon Oct 6 23:11:29 2014 +0530

----------------------------------------------------------------------
 .../topology/AutoscalerTopologyEventReceiver.java     |  8 ++++----
 .../autoscaler/monitor/AbstractClusterMonitor.java    | 14 ++++++++++++++
 .../monitor/KubernetesServiceClusterMonitor.java      |  7 +------
 .../autoscaler/monitor/VMLbClusterMonitor.java        |  7 +------
 .../autoscaler/monitor/VMServiceClusterMonitor.java   |  7 +------
 5 files changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index e857eaf..de058ec 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -342,10 +342,10 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 log.error(msg);
                 throw new RuntimeException(msg);
             }
-            //TODO  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-            //		scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS);
-            Thread th = new Thread(monitor);
-            th.start();
+
+//            Thread th = new Thread(monitor);
+//            th.start();
+            monitor.startScheduler();
             AutoscalerContext.getInstance().addClusterMonitor(monitor);
             if (log.isInfoEnabled()) {
                 log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index 6061c3b..e44bd72 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -18,6 +18,10 @@
  */
 package org.apache.stratos.autoscaler.monitor;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
@@ -62,6 +66,8 @@ public abstract class AbstractClusterMonitor implements Runnable {
     private boolean isDestroyed;
 
     private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+    
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
     protected AbstractClusterMonitor(String clusterId, String serviceId,
                                      AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
@@ -75,6 +81,14 @@ public abstract class AbstractClusterMonitor implements Runnable {
     }
 
     protected abstract void readConfigurations();
+    
+    public void startScheduler() {
+    	scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+    }
+    
+    protected void stopScheduler() {
+    	scheduler.shutdownNow();
+    }
 
     protected abstract void monitor();
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
index 3c81ba3..93580d9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -59,7 +59,6 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
     @Override
     public void run() {
 
-        while (!isDestroyed()) {
             if (log.isDebugEnabled()) {
                 log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
             }
@@ -76,11 +75,6 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
                 log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
                           e);
             }
-            try {
-                Thread.sleep(getMonitorIntervalMilliseconds());
-            } catch (InterruptedException ignore) {
-            }
-        }
     }
 
     @Override
@@ -148,6 +142,7 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
         getMinCheckKnowledgeSession().dispose();
         getScaleCheckKnowledgeSession().dispose();
         setDestroyed(true);
+        stopScheduler();
         if (log.isDebugEnabled()) {
             log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index a0c66f0..f950f9d 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -57,7 +57,6 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
     @Override
     public void run() {
 
-        while (!isDestroyed()) {
             if (log.isDebugEnabled()) {
                 log.debug("VMLbClusterMonitor is running.. " + this.toString());
             }
@@ -73,11 +72,6 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
             } catch (Exception e) {
                 log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
             }
-            try {
-                Thread.sleep(getMonitorIntervalMilliseconds());
-            } catch (InterruptedException ignore) {
-            }
-        }
     }
 
     @Override
@@ -116,6 +110,7 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
         getMinCheckKnowledgeSession().dispose();
         getMinCheckKnowledgeSession().dispose();
         setDestroyed(true);
+        stopScheduler();
         if (log.isDebugEnabled()) {
             log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index 0452e32..d8c9e69 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -67,7 +67,6 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
         } catch (InterruptedException ignore) {
         }
 
-        while (!isDestroyed()) {
             if (log.isDebugEnabled()) {
                 log.debug("VMServiceClusterMonitor is running.. " + this.toString());
             }
@@ -83,11 +82,6 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
             } catch (Exception e) {
                 log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
             }
-            try {
-                Thread.sleep(getMonitorIntervalMilliseconds());
-            } catch (InterruptedException ignore) {
-            }
-        }
     }
 
     @Override
@@ -199,6 +193,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
         getMinCheckKnowledgeSession().dispose();
         getScaleCheckKnowledgeSession().dispose();
         setDestroyed(true);
+        stopScheduler();
         if (log.isDebugEnabled()) {
             log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
         }


[5/7] code review changes to cluster monitors

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1603aef..e857eaf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,29 +19,16 @@
 
 package org.apache.stratos.autoscaler.message.receiver.topology;
 
-import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
 import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
 import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
@@ -112,7 +99,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
                     TopologyManager.acquireReadLock();
                     for (Service service : TopologyManager.getTopology().getServices()) {
@@ -121,167 +107,108 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                         }
                     }
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
-
         });
 
         topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
+                    MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+                    String clusterId = memberReadyToShutdownEvent.getClusterId();
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
                     AbstractClusterMonitor monitor;
-                    String clusterId = memberReadyToShutdownEvent.getClusterId();
-                    String memberId = memberReadyToShutdownEvent.getMemberId();
-
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    TopologyManager.acquireReadLock();
-                    
-                    if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                    		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                    	
-                        NetworkPartitionContext nwPartitionCtxt;
-                        String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
-						nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                        // start a new member in the same Partition
-                        String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
-                        PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                        // terminate the shutdown ready member
-                        CloudControllerClient ccClient = CloudControllerClient.getInstance();
-                        ccClient.terminate(memberId);
-
-                        // remove from active member list
-                        partitionCtxt.removeActiveMemberById(memberId);
-                        
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member is terminated and removed from the active members list: "
-                            		+ "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-                        }
-                    } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                    	// no need to do anything
-                    }
+                    monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
+                }
+            }
+        });
 
-                } catch (TerminationException e) {
-                    log.error(e);
+        topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    log.info("Event received: " + event);
+                    ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
+                    Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId());
+                    startClusterMonitor(cluster);
+                } catch (Exception e) {
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
         });
 
-        topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
-                    @Override
-                    protected void onEvent(Event event) {
-                        try {
-                            log.info("Event received: " + event);
-                            ClusterCreatedEvent e = (ClusterCreatedEvent) event;
-                            TopologyManager.acquireReadLock();
-                            Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                            Cluster cluster = service.getCluster(e.getClusterId());
-                            startClusterMonitor(cluster);
-                        } catch (Exception e) {
-                            log.error("Error processing event", e);
-                        } finally {
-                            TopologyManager.releaseReadLock();
-                        }
-                    }
-
-                });
-
         topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
                     log.info("Event received: " + event);
-                    ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event;
+                    ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = (ClusterMaintenanceModeEvent) event;
                     TopologyManager.acquireReadLock();
-                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                    Cluster cluster = service.getCluster(e.getClusterId());
-                    if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
-                    	AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
-                    } else {
+                    Service service = TopologyManager.getTopology().getService(clusterMaintenanceModeEvent.getServiceName());
+                    Cluster cluster = service.getCluster(clusterMaintenanceModeEvent.getClusterId());
+                    AbstractClusterMonitor monitor;
+                    monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+                    if (null == monitor) {
                         log.error("cluster monitor not exists for the cluster: " + cluster.toString());
+                        return;
                     }
+                    monitor.setStatus(clusterMaintenanceModeEvent.getStatus());
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 } finally {
                     TopologyManager.releaseReadLock();
                 }
             }
-
-                });
+        });
 
         topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
-                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-                    TopologyManager.acquireReadLock();
-
-                    String clusterId = e.getClusterId();
-                    String deploymentPolicy = e.getDeploymentPolicy();
-
-                    AbstractClusterMonitor monitor = null;
-
-                    if (e.isLbCluster()) {
-                        DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
-                        if (depPolicy != null) {
-                            List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
-                                    .getNetworkPartitionLbHolders(depPolicy);
-
-                            for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
-                                // removes lb cluster ids
-                                boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
-                                if (isRemoved) {
-                                    log.info("Removed the lb cluster [id]:"
-                                            + clusterId
-                                            + " reference from Network Partition [id]: "
-                                            + networkPartitionLbHolder
-                                            .getNetworkPartitionId());
-
-                                }
-                                if (log.isDebugEnabled()) {
-                                    log.debug(networkPartitionLbHolder);
-                                }
-
-                            }
+                    ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+                    String clusterId = clusterRemovedEvent.getClusterId();
+                    AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                    AbstractClusterMonitor monitor;
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("A cluster monitor is not found in autoscaler context "
+                                                    + "[cluster] %s", clusterId));
                         }
+                        return;
                     }
-                    
-                    monitor = AutoscalerContext.getInstance().removeClusterMonitor(clusterId);                               
-
-                    // runTerminateAllRule(monitor);
-                    if (monitor != null) {
-                        monitor.destroy();
-                        log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
-                                clusterId));
-                    }
+                    monitor.handleClusterRemovedEvent(clusterRemovedEvent);
+                    asCtx.removeClusterMonitor(clusterId);
+                    monitor.destroy();
+                    log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+                                           clusterId));
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
-
         });
 
         topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
@@ -295,70 +222,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-                    MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    String partitionId = e.getPartitionId();
-                    String memberId = e.getMemberId();
+                    MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+                    String clusterId = memberTerminatedEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                    		|| monitor.getClusterType() == ClusterType.VMLbCluster) {
-                    	
-                        NetworkPartitionContext networkPartitionContext = 
-                        		((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                        PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
-                        partitionContext.removeMemberStatsContext(memberId);
-
-                        if (partitionContext.removeTerminationPendingMember(memberId)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is removed from termination pending members list: "
-                                		+ "[member] %s", memberId));
-                            }
-                        } else if (partitionContext.removePendingMember(memberId)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("Member is removed from pending members list: "
-                                		+ "[member] %s", memberId));
-                            }
-                        } else if (partitionContext.removeActiveMemberById(memberId)) {
-                            log.warn(String.format("Member is in the wrong list and it is removed from "
-                            		+ "active members list", memberId));
-                        } else if (partitionContext.removeObsoleteMember(memberId)){
-                        	log.warn(String.format("Member's obsolated timeout has been expired and "
-                        			+ "it is removed from obsolated members list", memberId));
-                        } else {
-                            log.warn(String.format("Member is not available in any of the list active, "
-                            		+ "pending and termination pending", memberId));
-                        }
-
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been removed successfully: "
-                            		+ "[member] %s", memberId));
-                        }
-                    } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                    	// no need to do anything
-                    }
-                    
+                    monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
 
@@ -367,160 +247,47 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-
-                    MemberActivatedEvent e = (MemberActivatedEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String clusterId = e.getClusterId();
+                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+                    String clusterId = memberActivatedEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    if(asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = asCtx.getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    if (monitor.getClusterType() == ClusterType.VMServiceCluster 
-                    		|| monitor.getClusterType() == ClusterType.VMLbCluster) {    
-                    	PartitionContext partitionContext;
-                        partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been added successfully: "
-                            		+ "[member] %s", memberId));
-                        }
-                        partitionContext.movePendingMemberToActiveMembers(memberId);
-					} else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-						KubernetesClusterContext kubernetesClusterContext;
-						kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
-						kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isInfoEnabled()) {
-                            log.info(String.format("Member stat context has been added successfully: "
-                            		+ "[member] %s", memberId));
-                        }
-						kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
-					}
-                    
+                    monitor.handleMemberActivatedEvent(memberActivatedEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
         });
 
-        topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
-           @Override
-           protected void onEvent(Event event) {
-               try {
-            	   TopologyManager.acquireReadLock();
-            	   
-                   MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
-                   AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                   AbstractClusterMonitor monitor;
-                   String clusterId = memberReadyToShutdownEvent.getClusterId();
-                   String memberId = memberReadyToShutdownEvent.getMemberId();
-
-                   if(asCtx.clusterMonitorExist(clusterId)) {
-                       monitor = asCtx.getClusterMonitor(clusterId);
-                   } else {
-                       if(log.isDebugEnabled()){
-                           log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                           		+ "[cluster] %s", clusterId));
-                       }
-                       return;
-                   }
-
-                   if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                		   || monitor.getClusterType() == ClusterType.VMLbCluster) {
-                	   
-                       NetworkPartitionContext nwPartitionCtxt;
-                       String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
-                       nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
-                       // start a new member in the same Partition
-                       String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
-                       PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
-                       // terminate the shutdown ready member
-                       CloudControllerClient ccClient = CloudControllerClient.getInstance();
-                       ccClient.terminate(memberId);
-
-                       // remove from active member list
-                       partitionCtxt.removeActiveMemberById(memberId);
-
-                       if (log.isInfoEnabled()) {
-                           log.info(String.format("Member is terminated and removed from the active members list: "
-                           		+ "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
-                       }
-                   } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                	   // no need to do anything
-                   }
-
-               } catch (TerminationException e) {
-                   log.error(e);
-               }
-           }
-
-       });
-
-
         topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
             @Override
             protected void onEvent(Event event) {
-
                 try {
-                    TopologyManager.acquireReadLock();
-
-                    MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String clusterId = e.getClusterId();
+                    MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+                    String clusterId = maintenanceModeEvent.getClusterId();
                     AbstractClusterMonitor monitor;
-                    
                     AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                    if (asCtx.clusterMonitorExist(clusterId)) {
-                        monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-                    } else {
-                        if(log.isDebugEnabled()){
+                    monitor = asCtx.getClusterMonitor(clusterId);
+                    if (null == monitor) {
+                        if (log.isDebugEnabled()) {
                             log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                            		+ "[cluster] %s", clusterId));
+                                                    + "[cluster] %s", clusterId));
                         }
                         return;
                     }
-                    
-                    if(monitor.getClusterType() == ClusterType.VMServiceCluster 
-                 		   || monitor.getClusterType() == ClusterType.VMLbCluster) {
-                    	
-                    	PartitionContext partitionContext;
-                    	partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-                        partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Member has been moved as pending termination: "
-                            		+ "[member] %s", memberId));
-                        }
-                        partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
-                    } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
-                    	// no need to do anything
-                    }
-
+                    monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
                 } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
+                    String msg = "Error processing event " + e.getLocalizedMessage();
+                    log.error(msg, e);
                 }
             }
         });
@@ -529,27 +296,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
         topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
-//                try {
-//                    TopologyManager.acquireReadLock();
-//
-//                    // Remove all clusters of given service from context
-//                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
-//                    for(Service service : TopologyManager.getTopology().getServices()) {
-//                        for(Cluster cluster : service.getClusters()) {
-//                            removeMonitor(cluster.getHostName());
-//                        }
-//                    }
-//                }
-//                finally {
-//                    TopologyManager.releaseReadLock();
-//                }
+
             }
         });
     }
 
     private class ClusterMonitorAdder implements Runnable {
         private Cluster cluster;
-        private String clusterMonitorType;
+
         public ClusterMonitorAdder(Cluster cluster) {
             this.cluster = cluster;
         }
@@ -567,38 +321,41 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
                 try {
                     monitor = ClusterMonitorFactory.getMonitor(cluster);
                     success = true;
-                    clusterMonitorType = monitor.getClusterType().name();
                 } catch (PolicyValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
-                    log.debug(msg, e);
+                    if (log.isDebugEnabled()) {
+                        String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                        log.debug(msg, e);
+                    }
                     retries--;
-
                 } catch (PartitionValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
-                    log.debug(msg, e);
+                    if (log.isDebugEnabled()) {
+                        String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+                        log.debug(msg, e);
+                    }
                     retries--;
                 }
             } while (!success && retries != 0);
 
             if (monitor == null) {
                 String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
-                        + "for cluster: " + cluster.getClusterId();
+                             + "for cluster: " + cluster.getClusterId();
                 log.error(msg);
                 throw new RuntimeException(msg);
             }
-
+            //TODO  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+            //		scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS);
             Thread th = new Thread(monitor);
             th.start();
             AutoscalerContext.getInstance().addClusterMonitor(monitor);
             if (log.isInfoEnabled()) {
-                log.info(String.format("%s monitor has been added successfully: [cluster] %s",
-                        clusterMonitorType, cluster.getClusterId()));
+                log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
+                                       cluster.getClusterId()));
             }
         }
     }
- 
+
     @SuppressWarnings("unused")
-	private void runTerminateAllRule(VMClusterMonitor monitor) {
+    private void runTerminateAllRule(VMClusterMonitor monitor) {
 
         FactHandle terminateAllFactHandle = null;
 
@@ -621,9 +378,13 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
 
     protected synchronized void startClusterMonitor(Cluster cluster) {
         Thread th = null;
-        if (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
-        	th = new Thread(new ClusterMonitorAdder(cluster));
-        } 
+
+        AbstractClusterMonitor monitor;
+        monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+
+        if (null == monitor) {
+            th = new Thread(new ClusterMonitorAdder(cluster));
+        }
         if (th != null) {
             th.start();
             try {
@@ -632,9 +393,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
             }
 
             if (log.isDebugEnabled()) {
-                log.debug(String
-                        .format("Cluster monitor thread has been started successfully: [cluster] %s ",
-                                cluster.getClusterId()));
+                log.debug(String.format("Cluster monitor thread has been started successfully: "
+                                        + "[cluster] %s ", cluster.getClusterId()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index cb60027..6061c3b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -19,130 +19,211 @@
 package org.apache.stratos.autoscaler.monitor;
 
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
 import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
 
 /*
  * Every cluster monitor, which are monitoring a cluster, should extend this class.
  */
-public abstract class AbstractClusterMonitor implements Runnable{
-	
+public abstract class AbstractClusterMonitor implements Runnable {
+
     private String clusterId;
     private String serviceId;
-    private ClusterType clusterType;
-	private ClusterStatus status;
-	private int monitorInterval;
-	
-	protected FactHandle minCheckFactHandle;
-	protected FactHandle scaleCheckFactHandle;
-	private StatefulKnowledgeSession minCheckKnowledgeSession;
-	private StatefulKnowledgeSession scaleCheckKnowledgeSession;
-	private boolean isDestroyed;
-	
-	private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-	
-	protected AbstractClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, 
-			AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-		
-		super();
-		this.clusterId = clusterId;
-		this.serviceId = serviceId;
-		this.clusterType = clusterType;
-		this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    private ClusterStatus status;
+    private int monitoringIntervalMilliseconds;
+
+    protected FactHandle minCheckFactHandle;
+    protected FactHandle scaleCheckFactHandle;
+    private StatefulKnowledgeSession minCheckKnowledgeSession;
+    private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+    private boolean isDestroyed;
+
+    private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+
+    protected AbstractClusterMonitor(String clusterId, String serviceId,
+                                     AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+        super();
+        this.clusterId = clusterId;
+        this.serviceId = serviceId;
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
         this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
         this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
-	}
+    }
+
+    protected abstract void readConfigurations();
+
+    protected abstract void monitor();
 
-	protected abstract void readConfigurations();
-	protected abstract void monitor();
     public abstract void destroy();
-    
-	public String getClusterId() {
-		return clusterId;
-	}
-	
-	public void setClusterId(String clusterId) {
-		this.clusterId = clusterId;
-	}
-	
-	public void setStatus(ClusterStatus status) {
-		this.status = status;
-	}
-
-	public ClusterType getClusterType() {
-		return clusterType;
-	}
-
-	public ClusterStatus getStatus() {
-		return status;
-	}
-	
-	public String getServiceId() {
-		return serviceId;
-	}
-	
-	public void setServiceId(String serviceId) {
-		this.serviceId = serviceId;
-	}
-	
-	public int getMonitorInterval() {
-		return monitorInterval;
-	}
-	
-	public void setMonitorInterval(int monitorInterval) {
-		this.monitorInterval = monitorInterval;
-	}
-
-	public FactHandle getMinCheckFactHandle() {
-		return minCheckFactHandle;
-	}
-	
-	public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
-		this.minCheckFactHandle = minCheckFactHandle;
-	}
-	
-	public FactHandle getScaleCheckFactHandle() {
-		return scaleCheckFactHandle;
-	}
-	
-	public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
-		this.scaleCheckFactHandle = scaleCheckFactHandle;
-	}
-	
-	public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
-		return minCheckKnowledgeSession;
-	}
-	
-	public void setMinCheckKnowledgeSession(
-			StatefulKnowledgeSession minCheckKnowledgeSession) {
-		this.minCheckKnowledgeSession = minCheckKnowledgeSession;
-	}
-	
-	public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
-		return scaleCheckKnowledgeSession;
-	}
-	
-	public void setScaleCheckKnowledgeSession(
-			StatefulKnowledgeSession scaleCheckKnowledgeSession) {
-		this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
-	}
-	
-	public boolean isDestroyed() {
-		return isDestroyed;
-	}
-	
-	public void setDestroyed(boolean isDestroyed) {
-		this.isDestroyed = isDestroyed;
-	}
-
-	public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
-		return autoscalerRuleEvaluator;
-	}
-
-	public void setAutoscalerRuleEvaluator(
-			AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-		this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
-	}
+
+    //handle health events
+    public abstract void handleAverageLoadAverageEvent(
+            AverageLoadAverageEvent averageLoadAverageEvent);
+
+    public abstract void handleGradientOfLoadAverageEvent(
+            GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+    public abstract void handleSecondDerivativeOfLoadAverageEvent(
+            SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleAverageMemoryConsumptionEvent(
+            AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+    public abstract void handleGradientOfMemoryConsumptionEvent(
+            GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+    public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+            SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+    public abstract void handleAverageRequestsInFlightEvent(
+            AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+    public abstract void handleGradientOfRequestsInFlightEvent(
+            GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+    public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+            SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+    public abstract void handleMemberAverageMemoryConsumptionEvent(
+            MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+    public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+            MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+    public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+            MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+    public abstract void handleMemberAverageLoadAverageEvent(
+            MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+    public abstract void handleMemberGradientOfLoadAverageEvent(
+            MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+    public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+            MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+    public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+    //handle topology events
+    public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+    public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+    public abstract void handleMemberMaintenanceModeEvent(
+            MemberMaintenanceModeEvent maintenanceModeEvent);
+
+    public abstract void handleMemberReadyToShutdownEvent(
+            MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+    public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+    public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public void setStatus(ClusterStatus status) {
+        this.status = status;
+    }
+
+    public ClusterStatus getStatus() {
+        return status;
+    }
+
+    public String getServiceId() {
+        return serviceId;
+    }
+
+    public void setServiceId(String serviceId) {
+        this.serviceId = serviceId;
+    }
+
+    public int getMonitorIntervalMilliseconds() {
+        return monitoringIntervalMilliseconds;
+    }
+
+    public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+        this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+    }
+
+    public FactHandle getMinCheckFactHandle() {
+        return minCheckFactHandle;
+    }
+
+    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+        this.minCheckFactHandle = minCheckFactHandle;
+    }
+
+    public FactHandle getScaleCheckFactHandle() {
+        return scaleCheckFactHandle;
+    }
+
+    public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+        this.scaleCheckFactHandle = scaleCheckFactHandle;
+    }
+
+    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+        return minCheckKnowledgeSession;
+    }
+
+    public void setMinCheckKnowledgeSession(
+            StatefulKnowledgeSession minCheckKnowledgeSession) {
+        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+    }
+
+    public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+        return scaleCheckKnowledgeSession;
+    }
+
+    public void setScaleCheckKnowledgeSession(
+            StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+        this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+    }
+
+    public boolean isDestroyed() {
+        return isDestroyed;
+    }
+
+    public void setDestroyed(boolean isDestroyed) {
+        this.isDestroyed = isDestroyed;
+    }
+
+    public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+        return autoscalerRuleEvaluator;
+    }
+
+    public void setAutoscalerRuleEvaluator(
+            AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+        this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
index bd01dc6..208e4ce 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -52,30 +52,32 @@ import org.apache.stratos.messaging.util.Constants;
  * Factory class for creating cluster monitors.
  */
 public class ClusterMonitorFactory {
-	
-	private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
-	/**
-	 * @param cluster the cluster to be monitored
-	 * @return the created cluster monitor
-	 * @throws PolicyValidationException when deployment policy is not valid
-	 * @throws PartitionValidationException when partition is not valid
-	 */
-	public static AbstractClusterMonitor getMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-		
-		AbstractClusterMonitor clusterMonitor;
-		if(cluster.isKubernetesCluster()){
-			clusterMonitor = getDockerServiceClusterMonitor(cluster);
-		} else if (cluster.isLbCluster()){
-			clusterMonitor = getVMLbClusterMonitor(cluster);
-		} else {
-			clusterMonitor = getVMServiceClusterMonitor(cluster);
-		}
-		
-		return clusterMonitor;
-	}
-	
-    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+
+    private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+    /**
+     * @param cluster the cluster to be monitored
+     * @return the created cluster monitor
+     * @throws PolicyValidationException    when deployment policy is not valid
+     * @throws PartitionValidationException when partition is not valid
+     */
+    public static AbstractClusterMonitor getMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
+
+        AbstractClusterMonitor clusterMonitor;
+        if (cluster.isKubernetesCluster()) {
+            clusterMonitor = getDockerServiceClusterMonitor(cluster);
+        } else if (cluster.isLbCluster()) {
+            clusterMonitor = getVMLbClusterMonitor(cluster);
+        } else {
+            clusterMonitor = getVMServiceClusterMonitor(cluster);
+        }
+
+        return clusterMonitor;
+    }
+
+    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
         // FIXME fix the following code to correctly update
         // AutoscalerContext context = AutoscalerContext.getInstance();
         if (null == cluster) {
@@ -91,11 +93,11 @@ public class ClusterMonitorFactory {
         }
 
         AutoscalePolicy policy =
-                                 PolicyManager.getInstance()
-                                              .getAutoscalePolicy(autoscalePolicyName);
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
         DeploymentPolicy deploymentPolicy =
-                                            PolicyManager.getInstance()
-                                                         .getDeploymentPolicy(deploymentPolicyName);
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
 
         if (deploymentPolicy == null) {
             String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -106,8 +108,8 @@ public class ClusterMonitorFactory {
         Partition[] allPartitions = deploymentPolicy.getAllPartitions();
         if (allPartitions == null) {
             String msg =
-                         "Deployment Policy's Partitions are null. Policy name: " +
-                                 deploymentPolicyName;
+                    "Deployment Policy's Partitions are null. Policy name: " +
+                    deploymentPolicyName;
             log.error(msg);
             throw new PolicyValidationException(msg);
         }
@@ -115,98 +117,100 @@ public class ClusterMonitorFactory {
         CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
 
         VMServiceClusterMonitor clusterMonitor =
-                                        new VMServiceClusterMonitor(cluster.getClusterId(),
-                                                           cluster.getServiceName(),
-                                                           deploymentPolicy, policy);
+                new VMServiceClusterMonitor(cluster.getClusterId(),
+                                            cluster.getServiceName(),
+                                            deploymentPolicy, policy);
         clusterMonitor.setStatus(ClusterStatus.Created);
-        
-        for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
+
+        for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
 
             NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                    partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
 
-            for(Partition partition: partitionGroup.getPartitions()){
+            for (Partition partition : partitionGroup.getPartitions()) {
                 PartitionContext partitionContext = new PartitionContext(partition);
                 partitionContext.setServiceName(cluster.getServiceName());
                 partitionContext.setProperties(cluster.getProperties());
                 partitionContext.setNetworkPartitionId(partitionGroup.getId());
-                
-                for (Member member: cluster.getMembers()){
+
+                for (Member member : cluster.getMembers()) {
                     String memberId = member.getMemberId();
-                    if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
+                    if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
                         MemberContext memberContext = new MemberContext();
                         memberContext.setClusterId(member.getClusterId());
                         memberContext.setMemberId(memberId);
                         memberContext.setPartition(partition);
                         memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-                        
-                        if(MemberStatus.Activated.equals(member.getStatus())){
+
+                        if (MemberStatus.Activated.equals(member.getStatus())) {
                             partitionContext.addActiveMember(memberContext);
 //                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
 //                            partitionContext.incrementCurrentActiveMemberCount(1);
 
-                        } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){
+                        } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
                             partitionContext.addPendingMember(memberContext);
 
 //                            networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-                        } else if(MemberStatus.Suspended.equals(member.getStatus())){
+                        } else if (MemberStatus.Suspended.equals(member.getStatus())) {
 //                            partitionContext.addFaultyMember(memberId);
                         }
                         partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                        if(log.isInfoEnabled()){
+                        if (log.isInfoEnabled()) {
                             log.info(String.format("Member stat context has been added: [member] %s", memberId));
                         }
                     }
 
                 }
                 networkPartitionContext.addPartitionContext(partitionContext);
-                if(log.isInfoEnabled()){
+                if (log.isInfoEnabled()) {
                     log.info(String.format("Partition context has been added: [partition] %s",
-                            partitionContext.getPartitionId()));
+                                           partitionContext.getPartitionId()));
                 }
             }
 
             clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-            if(log.isInfoEnabled()){
+            if (log.isInfoEnabled()) {
                 log.info(String.format("Network partition context has been added: [network partition] %s",
-                            networkPartitionContext.getId()));
+                                       networkPartitionContext.getId()));
             }
         }
-        
-        
+
+
         // find lb reference type
         java.util.Properties props = cluster.getProperties();
-        
-        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
             String value = props.getProperty(Constants.LOAD_BALANCER_REF);
             clusterMonitor.setLbReferenceType(value);
-            if(log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: "+value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
             }
         }
-        
+
         // set hasPrimary property
         // hasPrimary is true if there are primary members available in that cluster
         clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
 
-        log.info("VMServiceClusterMonitor created: "+clusterMonitor.toString());
+        log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-    
+
     private static Properties convertMemberPropsToMemberContextProps(
-			java.util.Properties properties) {
-    	Properties props = new Properties();
-    	for (Map.Entry<Object, Object> e : properties.entrySet()	) {
-			Property prop = new Property();
-			prop.setName((String)e.getKey());
-			prop.setValue((String)e.getValue());
-			props.addProperties(prop);
-		}    	
-		return props;
-	}
-
-
-	private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+            java.util.Properties properties) {
+        Properties props = new Properties();
+        for (Map.Entry<Object, Object> e : properties.entrySet()) {
+            Property prop = new Property();
+            prop.setName((String) e.getKey());
+            prop.setValue((String) e.getValue());
+            props.addProperties(prop);
+        }
+        return props;
+    }
+
+
+    private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+            throws PolicyValidationException, PartitionValidationException {
         // FIXME fix the following code to correctly update
         // AutoscalerContext context = AutoscalerContext.getInstance();
         if (null == cluster) {
@@ -222,11 +226,11 @@ public class ClusterMonitorFactory {
         }
 
         AutoscalePolicy policy =
-                                 PolicyManager.getInstance()
-                                              .getAutoscalePolicy(autoscalePolicyName);
+                PolicyManager.getInstance()
+                        .getAutoscalePolicy(autoscalePolicyName);
         DeploymentPolicy deploymentPolicy =
-                                            PolicyManager.getInstance()
-                                                         .getDeploymentPolicy(deploymentPolicyName);
+                PolicyManager.getInstance()
+                        .getDeploymentPolicy(deploymentPolicyName);
 
         if (deploymentPolicy == null) {
             String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -236,21 +240,21 @@ public class ClusterMonitorFactory {
 
         String clusterId = cluster.getClusterId();
         VMLbClusterMonitor clusterMonitor =
-                                        new VMLbClusterMonitor(clusterId,
-                                                           cluster.getServiceName(),
-                                                           deploymentPolicy, policy);
+                new VMLbClusterMonitor(clusterId,
+                                       cluster.getServiceName(),
+                                       deploymentPolicy, policy);
         clusterMonitor.setStatus(ClusterStatus.Created);
         // partition group = network partition context
         for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
 
             NetworkPartitionLbHolder networkPartitionLbHolder =
-                                                              PartitionManager.getInstance()
-                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
+                    PartitionManager.getInstance()
+                            .getNetworkPartitionLbHolder(partitionGroup.getId());
 //                                                              PartitionManager.getInstance()
 //                                                                              .getNetworkPartitionLbHolder(partitionGroup.getId());
             // FIXME pick a random partition
             Partition partition =
-                                  partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+                    partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
             PartitionContext partitionContext = new PartitionContext(partition);
             partitionContext.setServiceName(cluster.getServiceName());
             partitionContext.setProperties(cluster.getProperties());
@@ -258,7 +262,8 @@ public class ClusterMonitorFactory {
             partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
 
             NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-                    partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ;
+                                                                                          partitionGroup.getPartitionAlgo(),
+                                                                                          partitionGroup.getPartitions());
             for (Member member : cluster.getMembers()) {
                 String memberId = member.getMemberId();
                 if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
@@ -280,23 +285,23 @@ public class ClusterMonitorFactory {
                     }
 
                     partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-                    if(log.isInfoEnabled()){
+                    if (log.isInfoEnabled()) {
                         log.info(String.format("Member stat context has been added: [member] %s", memberId));
                     }
                 }
 
             }
             networkPartitionContext.addPartitionContext(partitionContext);
-            
+
             // populate lb cluster id in network partition context.
             java.util.Properties props = cluster.getProperties();
 
             // get service type of load balanced cluster
             String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-            
-            if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+            if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
                 String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-                
+
                 if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
                     networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
 
@@ -317,13 +322,17 @@ public class ClusterMonitorFactory {
             clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
         }
 
-        log.info("VMLbClusterMonitor created: "+clusterMonitor.toString());
+        log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
         return clusterMonitor;
     }
-	
-    private static DockerServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
 
-    	if (null == cluster) {
+    /**
+     * @param cluster - the cluster which needs to be monitored
+     * @return - the cluster monitor
+     */
+    private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
+
+        if (null == cluster) {
             return null;
         }
 
@@ -335,42 +344,43 @@ public class ClusterMonitorFactory {
         AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
         java.util.Properties props = cluster.getProperties();
         String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-		KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, 
-				cluster.getClusterId());
-
-        DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor(
-        		kubernetesClusterCtxt, 
-        		cluster.getClusterId(), 
-        		cluster.getServiceName(), 
-        		policy);
-                                        
+        KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+                                                                                      cluster.getClusterId());
+
+        KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+                kubernetesClusterCtxt,
+                cluster.getClusterId(),
+                cluster.getServiceName(),
+                policy);
+
         dockerClusterMonitor.setStatus(ClusterStatus.Created);
-        
-		for (Member member : cluster.getMembers()) {
-			String memberId = member.getMemberId();
-			String clusterId = member.getClusterId();
-			MemberContext memberContext = new MemberContext();
-			memberContext.setMemberId(memberId);
-			memberContext.setClusterId(clusterId);
-
-			if (MemberStatus.Activated.equals(member.getStatus())) {
-				dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
-			} else if (MemberStatus.Created.equals(member.getStatus())
-					|| MemberStatus.Starting.equals(member.getStatus())) {
-				dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-			}
-		}
+
+        //populate the members after restarting        
+        for (Member member : cluster.getMembers()) {
+            String memberId = member.getMemberId();
+            String clusterId = member.getClusterId();
+            MemberContext memberContext = new MemberContext();
+            memberContext.setMemberId(memberId);
+            memberContext.setClusterId(clusterId);
+
+            if (MemberStatus.Activated.equals(member.getStatus())) {
+                dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+            } else if (MemberStatus.Created.equals(member.getStatus())
+                       || MemberStatus.Starting.equals(member.getStatus())) {
+                dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+            }
+        }
 
         // find lb reference type
-        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+        if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
             String value = props.getProperty(Constants.LOAD_BALANCER_REF);
             dockerClusterMonitor.setLbReferenceType(value);
-            if(log.isDebugEnabled()) {
-                log.debug("Set the lb reference type: "+value);
+            if (log.isDebugEnabled()) {
+                log.debug("Set the lb reference type: " + value);
             }
         }
-        
-        log.info("KubernetesServiceClusterMonitor created: "+ dockerClusterMonitor.toString());
+
+        log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
         return dockerClusterMonitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
deleted file mode 100644
index 2621690..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
-
-/*
- * Every container cluster monitor should extend this class
- */
-public abstract class ContainerClusterMonitor extends AbstractClusterMonitor {
-
-	private KubernetesClusterContext kubernetesClusterCtxt;
-	protected AutoscalePolicy autoscalePolicy;
-	
-	protected ContainerClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, 
-			KubernetesClusterContext kubernetesClusterContext,
-			AutoscalerRuleEvaluator autoscalerRuleEvaluator, AutoscalePolicy autoscalePolicy){
-		
-		super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator);
-		this.kubernetesClusterCtxt = kubernetesClusterContext;
-		this.autoscalePolicy = autoscalePolicy;
-	}
-    
-	public KubernetesClusterContext getKubernetesClusterCtxt() {
-		return kubernetesClusterCtxt;
-	}
-
-	public void setKubernetesClusterCtxt(
-			KubernetesClusterContext kubernetesClusterCtxt) {
-		this.kubernetesClusterCtxt = kubernetesClusterCtxt;
-	}
-	
-	public AutoscalePolicy getAutoscalePolicy() {
-		return autoscalePolicy;
-	}
-
-	public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
-		this.autoscalePolicy = autoscalePolicy;
-	}
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
deleted file mode 100644
index 850a295..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Properties;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.common.enums.ClusterType;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{
-	
-	private static final Log log = LogFactory.getLog(DockerServiceClusterMonitor.class);
-
-	private String lbReferenceType;
-    private int numberOfReplicasInServiceCluster = 0;
-	int retryInterval = 60000;
-	
-    public DockerServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt, 
-    		String serviceClusterID, String serviceId, AutoscalePolicy autoscalePolicy) {
-    	super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, kubernetesClusterCtxt,
-    			new AutoscalerRuleEvaluator(), autoscalePolicy);
-        readConfigurations();
-    }
-
-	@Override
-	public void run() {
-		try {
-			// TODO make this configurable,
-			// this is the delay the min check of normal cluster monitor to wait
-			// until LB monitor is added
-			Thread.sleep(60000);
-		} catch (InterruptedException ignore) {
-		}
-
-		while (!isDestroyed()) {
-			if (log.isDebugEnabled()) {
-				log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
-			}
-			try {
-				if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
-					monitor();
-				} else {
-					if (log.isDebugEnabled()) {
-						log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
-								+ ClusterStatus.In_Maintenance + " mode......");
-					}
-				}
-			} catch (Exception e) {
-				log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
-						e);
-			}
-			try {
-				Thread.sleep(getMonitorInterval());
-			} catch (InterruptedException ignore) {
-			}
-		}
-	}
-	
-	@Override
-	protected void monitor() {
-		
-	    // is container created successfully?
-		boolean success = false;
-		String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID();
-		
-		try {
-			TopologyManager.acquireReadLock();
-			Properties props = TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
-			int minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
-			
-			int nonTerminatedMembers = getKubernetesClusterCtxt().getActiveMembers().size() + getKubernetesClusterCtxt().getPendingMembers().size();
-
-			if (nonTerminatedMembers == 0) {
-				
-				while (!success) {
-					try {
-
-						MemberContext memberContext = CloudControllerClient.getInstance().createContainer(kubernetesClusterId, getClusterId());
-						if(null != memberContext) {
-							getKubernetesClusterCtxt().addPendingMember(memberContext);
-							success = true;
-							numberOfReplicasInServiceCluster = minReplicas;
-							if(log.isDebugEnabled()){
-								log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", 
-										memberContext.getMemberId(), getKubernetesClusterCtxt().getKubernetesClusterID()));
-							}
-						} else {
-							if (log.isDebugEnabled()) {
-								log.debug("Returned member context is null, did not add to pending members");
-							}
-						}
-					} catch (Throwable e) {
-						if (log.isDebugEnabled()) {
-							String message = "Cannot create a container, will retry in "+(retryInterval/1000)+"s";
-							log.debug(message, e);
-						}
-					}
-					
-	                try {
-	                    Thread.sleep(retryInterval);
-	                } catch (InterruptedException e1) {
-	                }
-				}
-			}
-		} finally {
-			TopologyManager.releaseReadLock();
-		}
-	}
-	
-	@Override
-	public void destroy() {
-        getMinCheckKnowledgeSession().dispose();
-        getScaleCheckKnowledgeSession().dispose();
-        setDestroyed(true);
-        if(log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. "+this.toString());
-        }		
-	}
-	
-    @Override
-    protected void readConfigurations () {
-        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-        int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
-        setMonitorInterval(monitorInterval);
-        if (log.isDebugEnabled()) {
-            log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorInterval());
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KubernetesServiceClusterMonitor "
-        		+ "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
-        		+ ", clusterId=" + getClusterId() 
-        		+ ", serviceId=" + getServiceId() + "]";
-    }
-    
-	public String getLbReferenceType() {
-		return lbReferenceType;
-	}
-
-	public void setLbReferenceType(String lbReferenceType) {
-		this.lbReferenceType = lbReferenceType;
-	}
-}
\ No newline at end of file