You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by sa...@apache.org on 2014/10/06 19:21:57 UTC

[05/11] git commit: code review changes to cluster monitors

code review changes to cluster monitors


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b80861ba
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b80861ba
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b80861ba

Branch: refs/heads/master
Commit: b80861baab243db0537d3f0720543d702171ec47
Parents: 0b4c95e
Author: R-Rajkumar <rr...@gmail.com>
Authored: Sun Oct 5 15:04:57 2014 +0530
Committer: R-Rajkumar <rr...@gmail.com>
Committed: Sun Oct 5 15:04:57 2014 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/AutoscalerContext.java   |  36 +-
 .../autoscaler/KubernetesClusterContext.java    | 863 ++++++++--------
 .../stratos/autoscaler/MemberStatsContext.java  |  29 +-
 .../AutoscalerHealthStatEventReceiver.java      | 991 +++++--------------
 .../AutoscalerTopologyEventReceiver.java        | 458 ++-------
 .../monitor/AbstractClusterMonitor.java         | 307 +++---
 .../monitor/ClusterMonitorFactory.java          | 250 ++---
 .../monitor/ContainerClusterMonitor.java        |  59 --
 .../monitor/DockerServiceClusterMonitor.java    | 176 ----
 .../monitor/KubernetesClusterMonitor.java       | 427 ++++++++
 .../KubernetesServiceClusterMonitor.java        | 181 ++++
 .../autoscaler/monitor/VMClusterMonitor.java    | 597 ++++++++++-
 .../autoscaler/monitor/VMLbClusterMonitor.java  |  87 +-
 .../monitor/VMServiceClusterMonitor.java        |  73 +-
 .../stratos/autoscaler/util/AutoscalerUtil.java | 391 +-------
 .../stratos/common/enums/ClusterType.java       |   5 -
 16 files changed, 2440 insertions(+), 2490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b80861ba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index 581d633..2d10954 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -33,6 +33,8 @@ import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
 public class AutoscalerContext {
 
     private static final Log log = LogFactory.getLog(AutoscalerContext.class);
+    private static final AutoscalerContext INSTANCE = new AutoscalerContext();
+
     private AutoscalerContext() {
         try {
             setClusterMonitors(new HashMap<String, AbstractClusterMonitor>());
@@ -40,17 +42,13 @@ public class AutoscalerContext {
             log.error("Rule evaluateMinCheck error", e);
         }
     }
-    
+
     // Map<ClusterId, AbstractClusterMonitor>
     private Map<String, AbstractClusterMonitor> clusterMonitors;
 
-	private static class Holder {
-		private static final AutoscalerContext INSTANCE = new AutoscalerContext();
-	}
-
-	public static AutoscalerContext getInstance() {
-		return Holder.INSTANCE;
-	}
+    public static AutoscalerContext getInstance() {
+        return INSTANCE;
+    }
 
     public void addClusterMonitor(AbstractClusterMonitor clusterMonitor) {
         clusterMonitors.put(clusterMonitor.getClusterId(), clusterMonitor);
@@ -59,11 +57,7 @@ public class AutoscalerContext {
     public AbstractClusterMonitor getClusterMonitor(String clusterId) {
         return clusterMonitors.get(clusterId);
     }
-    
-    public boolean clusterMonitorExist(String clusterId) {
-        return clusterMonitors.containsKey(clusterId);
-    }
-    
+
     public Map<String, AbstractClusterMonitor> getClusterMonitors() {
         return clusterMonitors;
     }
@@ -71,13 +65,15 @@ public class AutoscalerContext {
     public void setClusterMonitors(Map<String, AbstractClusterMonitor> clusterMonitors) {
         this.clusterMonitors = clusterMonitors;
     }
-    
+
     public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
-    	if(!clusterMonitorExist(clusterId)) {
-    		log.fatal("ClusterMonitor not found for cluster id: "+clusterId);
-    		return null;
-    	}
-    	log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
-        return clusterMonitors.remove(clusterId);
+
+        AbstractClusterMonitor monitor = clusterMonitors.remove(clusterId);
+        if (monitor == null) {
+            log.fatal("ClusterMonitor not found for cluster id: " + clusterId);
+        } else {
+            log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
+        }
+        return monitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b80861ba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
index 16bc653..c8b6e39 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
@@ -40,474 +40,475 @@ import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
 /*
  * It holds the runtime data of a kubernetes cluster
  */
-public class KubernetesClusterContext implements Serializable{
-	
-	private static final long serialVersionUID = 808741789615481596L;
-	private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
-	
-	private String kubernetesClusterId;
-	private String serviceName;
-	
+public class KubernetesClusterContext implements Serializable {
+
+    private static final long serialVersionUID = 808741789615481596L;
+    private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
+
+    private String kubernetesClusterId;
+    private String serviceName;
+
     private int minReplicas;
     private int maxReplicas;
     private int currentReplicas = 0;
-    
+
     // properties
     private Properties properties;
-    
+
     // 15 mints as the default
     private long expiryTime;
     // pending members
     private List<MemberContext> pendingMembers;
-    
+
     // active members
     private List<MemberContext> activeMembers;
 
     //Keep statistics come from CEP
     private Map<String, MemberStatsContext> memberStatsContexts;
-	
+
     //Following information will keep events details
     private RequestsInFlight requestsInFlight;
     private MemoryConsumption memoryConsumption;
     private LoadAverage loadAverage;
-    
+
     // cluster id
     private String clusterId;
-    
+
     //boolean values to keep whether the requests in flight parameters are reset or not
-    private boolean rifReset = false, averageRifReset = false, 
-    		gradientRifReset = false, secondDerivativeRifRest = false;
+    private boolean rifReset = false, averageRifReset = false,
+            gradientRifReset = false, secondDerivativeRifRest = false;
     //boolean values to keep whether the memory consumption parameters are reset or not
     private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false,
             gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false;
     //boolean values to keep whether the load average parameters are reset or not
-    private boolean loadAverageReset = false, averageLoadAverageReset = false, 
-    		gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
-    
-	public KubernetesClusterContext(String kubernetesClusterId, String clusterId){
-		this.kubernetesClusterId = kubernetesClusterId;
-		this.clusterId = clusterId;
+    private boolean loadAverageReset = false, averageLoadAverageReset = false,
+            gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
+
+    public KubernetesClusterContext(String kubernetesClusterId, String clusterId) {
+        this.kubernetesClusterId = kubernetesClusterId;
+        this.clusterId = clusterId;
         this.pendingMembers = new ArrayList<MemberContext>();
         this.activeMembers = new ArrayList<MemberContext>();
         this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>();
         this.requestsInFlight = new RequestsInFlight();
         this.loadAverage = new LoadAverage();
         this.memoryConsumption = new MemoryConsumption();
-        
+
         // check if a different value has been set for expiryTime
         XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
         expiryTime = conf.getLong("autoscaler.member.expiryTimeout", 300000);
         if (log.isDebugEnabled()) {
             log.debug("Member expiry time is set to: " + expiryTime);
         }
-        
+
         Thread th = new Thread(new PendingMemberWatcher(this));
         th.start();
-	}
-	
-	public String getKubernetesClusterID() {
-		return kubernetesClusterId;
-	}
-	public void setKubernetesClusterID(String kubernetesClusterId) {
-		this.kubernetesClusterId = kubernetesClusterId;
-	}
-	
-	public List<MemberContext> getPendingMembers() {
-		return pendingMembers;
-	}
-
-	public void setPendingMembers(List<MemberContext> pendingMembers) {
-		this.pendingMembers = pendingMembers;
-	}
-
-	public int getActiveMemberCount() {
-		return activeMembers.size();
-	}
-
-	public void setActiveMembers(List<MemberContext> activeMembers) {
-		this.activeMembers = activeMembers;
-	}
-	    
-	public int getMinReplicas() {
-		return minReplicas;
-	}
-
-	public void setMinReplicas(int minReplicas) {
-		this.minReplicas = minReplicas;
-	}
-
-	public int getMaxReplicas() {
-		return maxReplicas;
-	}
-
-	public void setMaxReplicas(int maxReplicas) {
-		this.maxReplicas = maxReplicas;
-	}
-
-	public int getCurrentReplicas() {
-		return currentReplicas;
-	}
-
-	public void setCurrentReplicas(int currentReplicas) {
-		this.currentReplicas = currentReplicas;
-	}
-
-	public void addPendingMember(MemberContext ctxt) {
-		this.pendingMembers.add(ctxt);
-	}
-	    
-	public boolean removePendingMember(String id) {
-		if (id == null) {
-			return false;
-		}
-		for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext();) {
-			MemberContext pendingMember = (MemberContext) iterator.next();
-			if (id.equals(pendingMember.getMemberId())) {
-				iterator.remove();
-				return true;
-			}
-
-		}
-
-		return false;
-	}
-
-	public void movePendingMemberToActiveMembers(String memberId) {
-		if (memberId == null) {
-			return;
-		}
-		Iterator<MemberContext> iterator = pendingMembers.listIterator();
-		while (iterator.hasNext()) {
-			MemberContext pendingMember = iterator.next();
-			if (pendingMember == null) {
-				iterator.remove();
-				continue;
-			}
-			if (memberId.equals(pendingMember.getMemberId())) {
-				// member is activated
-				// remove from pending list
-				iterator.remove();
-				// add to the activated list
-				this.activeMembers.add(pendingMember);
-				if (log.isDebugEnabled()) {
-					log.debug(String.format(
-							"Pending member is removed and added to the "
-									+ "activated member list. [Member Id] %s",
-							memberId));
-				}
-				break;
-			}
-		}
-	}
-
-	public void addActiveMember(MemberContext ctxt) {
-		this.activeMembers.add(ctxt);
-	}
-
-	public void removeActiveMember(MemberContext ctxt) {
-		this.activeMembers.remove(ctxt);
-	}
-
-	public long getExpiryTime() {
-		return expiryTime;
-	}
-
-	public void setExpiryTime(long expiryTime) {
-		this.expiryTime = expiryTime;
-	}
-	    
-	public Map<String, MemberStatsContext> getMemberStatsContexts() {
-		return memberStatsContexts;
-	}
-
-	public MemberStatsContext getMemberStatsContext(String memberId) {
-		return memberStatsContexts.get(memberId);
-	}
-
-	public void addMemberStatsContext(MemberStatsContext ctxt) {
-		this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
-	}
-
-	public void removeMemberStatsContext(String memberId) {
-		this.memberStatsContexts.remove(memberId);
-	}
-
-	public Properties getProperties() {
-		return properties;
-	}
-
-	public void setProperties(Properties properties) {
-		this.properties = properties;
-	}
-
-	public String getServiceName() {
-		return serviceName;
-	}
-
-	public void setServiceName(String serviceName) {
-		this.serviceName = serviceName;
-	}
-
-	public List<MemberContext> getActiveMembers() {
-		return activeMembers;
-	}
-
-	public boolean removeActiveMemberById(String memberId) {
-		boolean removeActiveMember = false;
-		synchronized (activeMembers) {
-			Iterator<MemberContext> iterator = activeMembers.listIterator();
-			while (iterator.hasNext()) {
-				MemberContext memberContext = iterator.next();
-				if (memberId.equals(memberContext.getMemberId())) {
-					iterator.remove();
-					removeActiveMember = true;
-
-					break;
-				}
-			}
-		}
-		return removeActiveMember;
-	}
-
-	public boolean activeMemberExist(String memberId) {
-
-		for (MemberContext memberContext : activeMembers) {
-			if (memberId.equals(memberContext.getMemberId())) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	private class PendingMemberWatcher implements Runnable {
-		private KubernetesClusterContext ctxt;
-
-		public PendingMemberWatcher(KubernetesClusterContext ctxt) {
-			this.ctxt = ctxt;
-		}
-
-		@Override
-		public void run() {
-
-			while (true) {
-				long expiryTime = ctxt.getExpiryTime();
-				List<MemberContext> pendingMembers = ctxt.getPendingMembers();
-
-				synchronized (pendingMembers) {
-					Iterator<MemberContext> iterator = pendingMembers
-							.listIterator();
-					while (iterator.hasNext()) {
-						MemberContext pendingMember = iterator.next();
-
-						if (pendingMember == null) {
-							continue;
-						}
-						long pendingTime = System.currentTimeMillis()
-								- pendingMember.getInitTime();
-						if (pendingTime >= expiryTime) {
-
-							// terminate all containers of this cluster
-							try {
-								CloudControllerClient.getInstance().terminateAllContainers(clusterId);
-								iterator.remove();
-							} catch (TerminationException e) {
-								log.error(e.getMessage(), e);
-							}
-							
-						}
-					}
-				}
-
-				try {
-					// TODO find a constant
-					Thread.sleep(15000);
-				} catch (InterruptedException ignore) {
-				}
-			}
-		}
-
-	}
-
-	public float getAverageRequestsInFlight() {
-		return requestsInFlight.getAverage();
-	}
-
-	public void setAverageRequestsInFlight(float averageRequestsInFlight) {
-		requestsInFlight.setAverage(averageRequestsInFlight);
-		averageRifReset = true;
-		if (secondDerivativeRifRest && gradientRifReset) {
-			rifReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Requests in flights stats are reset, "
-						+ "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getRequestsInFlightSecondDerivative() {
-		return requestsInFlight.getSecondDerivative();
-	}
-
-	public void setRequestsInFlightSecondDerivative(
-			float requestsInFlightSecondDerivative) {
-		requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
-		secondDerivativeRifRest = true;
-		if (averageRifReset && gradientRifReset) {
-			rifReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getRequestsInFlightGradient() {
-		return requestsInFlight.getGradient();
-	}
-
-	public void setRequestsInFlightGradient(float requestsInFlightGradient) {
-		requestsInFlight.setGradient(requestsInFlightGradient);
-		gradientRifReset = true;
-		if (secondDerivativeRifRest && averageRifReset) {
-			rifReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public boolean isRifReset() {
-		return rifReset;
-	}
-
-	public void setRifReset(boolean rifReset) {
-		this.rifReset = rifReset;
-		this.averageRifReset = rifReset;
-		this.gradientRifReset = rifReset;
-		this.secondDerivativeRifRest = rifReset;
-	}
-
-	public float getAverageMemoryConsumption() {
-		return memoryConsumption.getAverage();
-	}
-
-	public void setAverageMemoryConsumption(float averageMemoryConsumption) {
-		memoryConsumption.setAverage(averageMemoryConsumption);
-		averageMemoryConsumptionReset = true;
-		if (secondDerivativeMemoryConsumptionRest
-				&& gradientMemoryConsumptionReset) {
-			memoryConsumptionReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getMemoryConsumptionSecondDerivative() {
-		return memoryConsumption.getSecondDerivative();
-	}
-
-	public void setMemoryConsumptionSecondDerivative(
-			float memoryConsumptionSecondDerivative) {
-		memoryConsumption
-				.setSecondDerivative(memoryConsumptionSecondDerivative);
-		secondDerivativeMemoryConsumptionRest = true;
-		if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
-			memoryConsumptionReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getMemoryConsumptionGradient() {
-		return memoryConsumption.getGradient();
-	}
-
-	public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
-		memoryConsumption.setGradient(memoryConsumptionGradient);
-		gradientMemoryConsumptionReset = true;
-		if (secondDerivativeMemoryConsumptionRest
-				&& averageMemoryConsumptionReset) {
-			memoryConsumptionReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
-								+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public boolean isMemoryConsumptionReset() {
-		return memoryConsumptionReset;
-	}
-
-	public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
-		this.memoryConsumptionReset = memoryConsumptionReset;
-		this.averageMemoryConsumptionReset = memoryConsumptionReset;
-		this.gradientMemoryConsumptionReset = memoryConsumptionReset;
-		this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
-	}
-
-
-	public float getAverageLoadAverage() {
-		return loadAverage.getAverage();
-	}
-
-	public void setAverageLoadAverage(float averageLoadAverage) {
-		loadAverage.setAverage(averageLoadAverage);
-		averageLoadAverageReset = true;
-		if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
-			loadAverageReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Load average stats are reset, ready to do scale check "
-						+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getLoadAverageSecondDerivative() {
-		return loadAverage.getSecondDerivative();
-	}
-
-	public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
-		loadAverage.setSecondDerivative(loadAverageSecondDerivative);
-		secondDerivativeLoadAverageRest = true;
-		if (averageLoadAverageReset && gradientLoadAverageReset) {
-			loadAverageReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Load average stats are reset, ready to do scale check "
-						+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public float getLoadAverageGradient() {
-		return loadAverage.getGradient();
-	}
-
-	public void setLoadAverageGradient(float loadAverageGradient) {
-		loadAverage.setGradient(loadAverageGradient);
-		gradientLoadAverageReset = true;
-		if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
-			loadAverageReset = true;
-			if (log.isDebugEnabled()) {
-				log.debug(String.format("Load average stats are reset, ready to do scale check "
-						+ "[kub cluster] %s", this.kubernetesClusterId));
-			}
-		}
-	}
-
-	public boolean isLoadAverageReset() {
-		return loadAverageReset;
-	}
-
-	public void setLoadAverageReset(boolean loadAverageReset) {
-		this.loadAverageReset = loadAverageReset;
-		this.averageLoadAverageReset = loadAverageReset;
-		this.gradientLoadAverageReset = loadAverageReset;
-		this.secondDerivativeLoadAverageRest = loadAverageReset;
-	}
+    }
+
+    public String getKubernetesClusterID() {
+        return kubernetesClusterId;
+    }
+
+    public void setKubernetesClusterID(String kubernetesClusterId) {
+        this.kubernetesClusterId = kubernetesClusterId;
+    }
+
+    public List<MemberContext> getPendingMembers() {
+        return pendingMembers;
+    }
+
+    public void setPendingMembers(List<MemberContext> pendingMembers) {
+        this.pendingMembers = pendingMembers;
+    }
+
+    public int getActiveMemberCount() {
+        return activeMembers.size();
+    }
+
+    public void setActiveMembers(List<MemberContext> activeMembers) {
+        this.activeMembers = activeMembers;
+    }
+
+    public int getMinReplicas() {
+        return minReplicas;
+    }
+
+    public void setMinReplicas(int minReplicas) {
+        this.minReplicas = minReplicas;
+    }
+
+    public int getMaxReplicas() {
+        return maxReplicas;
+    }
+
+    public void setMaxReplicas(int maxReplicas) {
+        this.maxReplicas = maxReplicas;
+    }
+
+    public int getCurrentReplicas() {
+        return currentReplicas;
+    }
+
+    public void setCurrentReplicas(int currentReplicas) {
+        this.currentReplicas = currentReplicas;
+    }
+
+    public void addPendingMember(MemberContext ctxt) {
+        this.pendingMembers.add(ctxt);
+    }
+
+    public boolean removePendingMember(String id) {
+        if (id == null) {
+            return false;
+        }
+        for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) {
+            MemberContext pendingMember = (MemberContext) iterator.next();
+            if (id.equals(pendingMember.getMemberId())) {
+                iterator.remove();
+                return true;
+            }
+
+        }
+
+        return false;
+    }
+
+    public void movePendingMemberToActiveMembers(String memberId) {
+        if (memberId == null) {
+            return;
+        }
+        Iterator<MemberContext> iterator = pendingMembers.listIterator();
+        while (iterator.hasNext()) {
+            MemberContext pendingMember = iterator.next();
+            if (pendingMember == null) {
+                iterator.remove();
+                continue;
+            }
+            if (memberId.equals(pendingMember.getMemberId())) {
+                // member is activated
+                // remove from pending list
+                iterator.remove();
+                // add to the activated list
+                this.activeMembers.add(pendingMember);
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format(
+                            "Pending member is removed and added to the "
+                            + "activated member list. [Member Id] %s",
+                            memberId));
+                }
+                break;
+            }
+        }
+    }
+
+    public void addActiveMember(MemberContext ctxt) {
+        this.activeMembers.add(ctxt);
+    }
+
+    public void removeActiveMember(MemberContext ctxt) {
+        this.activeMembers.remove(ctxt);
+    }
+
+    public long getExpiryTime() {
+        return expiryTime;
+    }
+
+    public void setExpiryTime(long expiryTime) {
+        this.expiryTime = expiryTime;
+    }
+
+    public Map<String, MemberStatsContext> getMemberStatsContexts() {
+        return memberStatsContexts;
+    }
+
+    public MemberStatsContext getMemberStatsContext(String memberId) {
+        return memberStatsContexts.get(memberId);
+    }
+
+    public void addMemberStatsContext(MemberStatsContext ctxt) {
+        this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
+    }
+
+    public void removeMemberStatsContext(String memberId) {
+        this.memberStatsContexts.remove(memberId);
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    public List<MemberContext> getActiveMembers() {
+        return activeMembers;
+    }
+
+    public boolean removeActiveMemberById(String memberId) {
+        boolean removeActiveMember = false;
+        synchronized (activeMembers) {
+            Iterator<MemberContext> iterator = activeMembers.listIterator();
+            while (iterator.hasNext()) {
+                MemberContext memberContext = iterator.next();
+                if (memberId.equals(memberContext.getMemberId())) {
+                    iterator.remove();
+                    removeActiveMember = true;
+
+                    break;
+                }
+            }
+        }
+        return removeActiveMember;
+    }
+
+    public boolean activeMemberExist(String memberId) {
+
+        for (MemberContext memberContext : activeMembers) {
+            if (memberId.equals(memberContext.getMemberId())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private class PendingMemberWatcher implements Runnable {
+        private KubernetesClusterContext ctxt;
+
+        public PendingMemberWatcher(KubernetesClusterContext ctxt) {
+            this.ctxt = ctxt;
+        }
+
+        @Override
+        public void run() {
+
+            while (true) {
+                long expiryTime = ctxt.getExpiryTime();
+                List<MemberContext> pendingMembers = ctxt.getPendingMembers();
+
+                synchronized (pendingMembers) {
+                    Iterator<MemberContext> iterator = pendingMembers
+                            .listIterator();
+                    while (iterator.hasNext()) {
+                        MemberContext pendingMember = iterator.next();
+
+                        if (pendingMember == null) {
+                            continue;
+                        }
+                        long pendingTime = System.currentTimeMillis()
+                                           - pendingMember.getInitTime();
+                        if (pendingTime >= expiryTime) {
+
+                            // terminate all containers of this cluster
+                            try {
+                                CloudControllerClient.getInstance().terminateAllContainers(clusterId);
+                                iterator.remove();
+                            } catch (TerminationException e) {
+                                log.error(e.getMessage(), e);
+                            }
+
+                        }
+                    }
+                }
+
+                try {
+                    // TODO find a constant
+                    Thread.sleep(15000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        }
+
+    }
+
+    public float getAverageRequestsInFlight() {
+        return requestsInFlight.getAverage();
+    }
+
+    public void setAverageRequestsInFlight(float averageRequestsInFlight) {
+        requestsInFlight.setAverage(averageRequestsInFlight);
+        averageRifReset = true;
+        if (secondDerivativeRifRest && gradientRifReset) {
+            rifReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Requests in flights stats are reset, "
+                                        + "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getRequestsInFlightSecondDerivative() {
+        return requestsInFlight.getSecondDerivative();
+    }
+
+    public void setRequestsInFlightSecondDerivative(
+            float requestsInFlightSecondDerivative) {
+        requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
+        secondDerivativeRifRest = true;
+        if (averageRifReset && gradientRifReset) {
+            rifReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getRequestsInFlightGradient() {
+        return requestsInFlight.getGradient();
+    }
+
+    public void setRequestsInFlightGradient(float requestsInFlightGradient) {
+        requestsInFlight.setGradient(requestsInFlightGradient);
+        gradientRifReset = true;
+        if (secondDerivativeRifRest && averageRifReset) {
+            rifReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public boolean isRifReset() {
+        return rifReset;
+    }
+
+    public void setRifReset(boolean rifReset) {
+        this.rifReset = rifReset;
+        this.averageRifReset = rifReset;
+        this.gradientRifReset = rifReset;
+        this.secondDerivativeRifRest = rifReset;
+    }
+
+    public float getAverageMemoryConsumption() {
+        return memoryConsumption.getAverage();
+    }
+
+    public void setAverageMemoryConsumption(float averageMemoryConsumption) {
+        memoryConsumption.setAverage(averageMemoryConsumption);
+        averageMemoryConsumptionReset = true;
+        if (secondDerivativeMemoryConsumptionRest
+            && gradientMemoryConsumptionReset) {
+            memoryConsumptionReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getMemoryConsumptionSecondDerivative() {
+        return memoryConsumption.getSecondDerivative();
+    }
+
+    public void setMemoryConsumptionSecondDerivative(
+            float memoryConsumptionSecondDerivative) {
+        memoryConsumption
+                .setSecondDerivative(memoryConsumptionSecondDerivative);
+        secondDerivativeMemoryConsumptionRest = true;
+        if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
+            memoryConsumptionReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getMemoryConsumptionGradient() {
+        return memoryConsumption.getGradient();
+    }
+
+    public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
+        memoryConsumption.setGradient(memoryConsumptionGradient);
+        gradientMemoryConsumptionReset = true;
+        if (secondDerivativeMemoryConsumptionRest
+            && averageMemoryConsumptionReset) {
+            memoryConsumptionReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public boolean isMemoryConsumptionReset() {
+        return memoryConsumptionReset;
+    }
+
+    public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
+        this.memoryConsumptionReset = memoryConsumptionReset;
+        this.averageMemoryConsumptionReset = memoryConsumptionReset;
+        this.gradientMemoryConsumptionReset = memoryConsumptionReset;
+        this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
+    }
+
+
+    public float getAverageLoadAverage() {
+        return loadAverage.getAverage();
+    }
+
+    public void setAverageLoadAverage(float averageLoadAverage) {
+        loadAverage.setAverage(averageLoadAverage);
+        averageLoadAverageReset = true;
+        if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
+            loadAverageReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Load average stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getLoadAverageSecondDerivative() {
+        return loadAverage.getSecondDerivative();
+    }
+
+    public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
+        loadAverage.setSecondDerivative(loadAverageSecondDerivative);
+        secondDerivativeLoadAverageRest = true;
+        if (averageLoadAverageReset && gradientLoadAverageReset) {
+            loadAverageReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Load average stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public float getLoadAverageGradient() {
+        return loadAverage.getGradient();
+    }
+
+    public void setLoadAverageGradient(float loadAverageGradient) {
+        loadAverage.setGradient(loadAverageGradient);
+        gradientLoadAverageReset = true;
+        if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
+            loadAverageReset = true;
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Load average stats are reset, ready to do scale check "
+                                        + "[kub cluster] %s", this.kubernetesClusterId));
+            }
+        }
+    }
+
+    public boolean isLoadAverageReset() {
+        return loadAverageReset;
+    }
+
+    public void setLoadAverageReset(boolean loadAverageReset) {
+        this.loadAverageReset = loadAverageReset;
+        this.averageLoadAverageReset = loadAverageReset;
+        this.gradientLoadAverageReset = loadAverageReset;
+        this.secondDerivativeLoadAverageRest = loadAverageReset;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b80861ba/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
index ac8b61a..bd3a6c3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
@@ -31,10 +31,10 @@ public class MemberStatsContext {
     private MemoryConsumption memoryConsumption;
     private String memberId;
 
-    public MemberStatsContext(String memberId){
+    public MemberStatsContext(String memberId) {
         this.memberId = memberId;
         memoryConsumption = new MemoryConsumption();
-        loadAverage =  new LoadAverage();
+        loadAverage = new LoadAverage();
     }
 
     public String getMemberId() {
@@ -52,4 +52,29 @@ public class MemberStatsContext {
     public MemoryConsumption getMemoryConsumption() {
         return memoryConsumption;
     }
+
+    public void setAverageLoadAverage(float value) {
+        loadAverage.setAverage(value);
+    }
+
+    public void setAverageMemoryConsumption(float value) {
+        memoryConsumption.setAverage(value);
+    }
+
+    public void setGradientOfLoadAverage(float value) {
+        loadAverage.setGradient(value);
+    }
+
+    public void setGradientOfMemoryConsumption(float value) {
+        memoryConsumption.setGradient(value);
+    }
+
+    public void setSecondDerivativeOfLoadAverage(float value) {
+        loadAverage.setSecondDerivative(value);
+    }
+
+    public void setSecondDerivativeOfMemoryConsumption(float value) {
+        memoryConsumption.setSecondDerivative(value);
+    }
+
 }