You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 06:13:23 UTC

[37/50] [abbrv] git commit: adding terminateContainer logic to AS and refactoring drools logging

adding terminateContainer logic to AS and refactoring drools logging


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4aa518df
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4aa518df
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4aa518df

Branch: refs/heads/master
Commit: 4aa518df76532a9fdc8aeeeb0f5e54af3db5338f
Parents: 67313a1
Author: R-Rajkumar <rr...@gmail.com>
Authored: Fri Oct 10 16:38:52 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Sat Oct 11 09:30:57 2014 +0530

----------------------------------------------------------------------
 .../cloud/controller/CloudControllerClient.java | 16 ++++++
 .../monitor/KubernetesClusterMonitor.java       | 33 +++++++++--
 .../autoscaler/rule/RuleTasksDelegator.java     |  9 +++
 .../src/main/conf/container-mincheck.drl        | 31 +++++++---
 .../src/main/conf/container-scaling.drl         | 60 ++++++++++----------
 5 files changed, 107 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index ce69875..8ec9f8e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -38,6 +38,7 @@ import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidClu
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidIaasProviderExceptionException;
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException;
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidPartitionExceptionException;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceMemberTerminationFailedExceptionException;
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub;
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException;
 import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
@@ -328,4 +329,19 @@ public class CloudControllerClient {
             throw new SpawningException(msg, e);
         } 
     }
+    
+    public synchronized void terminateContainer(String memberId) throws TerminationException{
+    	try {
+			stub.terminateContainer(memberId);
+		} catch (RemoteException e) {
+            String msg = "Error while updating kubernetes controller, cannot communicate with " +
+                    "cloud controller service";
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+		} catch (CloudControllerServiceMemberTerminationFailedExceptionException e) {
+            String msg = "Error while terminating container, member not valid for member id : " + memberId;
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+		}
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
index d90e0b6..9375a8e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
@@ -381,7 +381,7 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
 
         // no need to do anything here
         // we will not be receiving this event for containers
-        // because we just kill the containers
+        // we will only receive member terminated event
     }
 
     @Override
@@ -390,16 +390,39 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
 
         // no need to do anything here
         // we will not be receiving this event for containers
-        // because we just kill the containers
+    	// we will only receive member terminated event
     }
 
     @Override
     public void handleMemberTerminatedEvent(
             MemberTerminatedEvent memberTerminatedEvent) {
 
-        // no need to do anything here
-        // we will not be receiving this event for containers
-        // because we just kill the containers
+        String memberId = memberTerminatedEvent.getMemberId();
+        if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from termination pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Member is removed from pending members list: "
+                                        + "[member] %s", memberId));
+            }
+        } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+            log.warn(String.format("Member is in the wrong list and it is removed from "
+                                   + "active members list", memberId));
+        } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+            log.warn(String.format("Member's obsolated timeout has been expired and "
+                                   + "it is removed from obsolated members list", memberId));
+        } else {
+            log.warn(String.format("Member is not available in any of the list active, "
+                                   + "pending and termination pending", memberId));
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Member stat context has been removed successfully: "
+                                   + "[member] %s", memberId));
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 9d3227a..0a9bde3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -264,6 +264,15 @@ public class RuleTasksDelegator {
             log.error("Cannot update kubernetes controller ", e);
         }
     }
+    
+    public void delegateTerminateContainer(KubernetesClusterContext kubernetesClusterContext, String memberId) {
+    	try {
+    		CloudControllerClient ccClient = CloudControllerClient.getInstance();
+    		ccClient.terminateContainer(memberId);
+    	} catch (Throwable e) {
+    		log.error("Cannot delete container ", e);
+    	}
+    }
 
     public int getPredictedReplicasForStat(int minReplicas, float statUpperLimit, float statPredictedValue) {
         if (statUpperLimit == 0) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
index 605c553..9798852 100644
--- a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
@@ -26,7 +26,7 @@ global org.apache.stratos.autoscaler.rule.RuleLog log;
 global org.apache.stratos.autoscaler.rule.RuleTasksDelegator $delegator;
 global java.lang.String clusterId;
  
-rule "Minimum Rule"
+rule "Container Minimum Rule"
 dialect "mvel"
 	when
            $kubernetesClusterContext : KubernetesClusterContext ()
@@ -36,20 +36,37 @@ dialect "mvel"
            isServiceClusterCreated : Boolean() from $kubernetesClusterContext.isServiceClusterCreated()
 	   
            eval(log.debug("Running minimum rule: [kub-cluster] " +kubernetesClusterId + " [cluster] " + clusterId))
-           eval(log.debug("[min-check] " + " [cluster] " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
-	   eval(log.debug("[min-check] " + " [cluster] " + clusterId + " [Replicas] minReplicas : " + minReplicas))
+           eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
+	   eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas))
 	   eval(nonTerminatedReplicas < minReplicas)
        then
            if (isServiceClusterCreated) {
              // we suceeded calling startContainer() once, can't call it again
-              log.info("[min-check] Decided to scale-up : [cluster] " + clusterId);
- 	      log.info("[min-check] " + " [cluster] " + clusterId + " min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas);
+              log.info("[min-check] Decided to scale-up : [cluster] : " + clusterId);
+ 	      log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas);
               $delegator.delegateUpdateContainers($kubernetesClusterContext, minReplicas);
 	   } else {
              // we should call startContainer
-              log.info("[min-check] Decided to create the cluster : [cluster] " + clusterId);
- 	      log.info("[min-check] " + " [cluster] " + clusterId + " : min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas);
+              log.info("[min-check] Decided to create the cluster : [cluster] : " + clusterId);
+ 	      log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas);
               $delegator.delegateStartContainers($kubernetesClusterContext);
            }
 end
 
+rule "Terminate Obsoleted Containers"
+dialect "mvel"
+        when
+           $kubernetesClusterContext : KubernetesClusterContext ()
+           kubernetesClusterId : String() from $kubernetesClusterContext.getKubernetesClusterID()
+           obsoleteReplicas : Integer() from $kubernetesClusterContext.getObsoletedMembers().size()
+
+           eval(log.debug("Running obsolete containers rule [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId))
+           eval(log.debug("[obsolete-check] " + "[cluster] : " + clusterId + " [Replicas] obsoleteReplicas : " + obsoleteReplicas))
+           eval($kubernetesClusterContext.getObsoletedMembers().keySet().size() > 0)
+           memberId : String() from $kubernetesClusterContext.getObsoletedMembers().keySet()
+           eval(log.debug("[obsolete-check] [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId + " Member id : " + memberId))
+        then
+           $delegator.delegateTerminateContainer($kubernetesClusterContext, memberId);
+end
+
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
index 398049b..46d9aeb 100644
--- a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
@@ -40,7 +40,7 @@ dialect "mvel"
         maxReplicas : Integer() from $kubernetesClusterContext.getMaxReplicas()
         nonTerminatedReplicas : Integer() from $kubernetesClusterContext.getNonTerminatedMemberCount()
 
-        eval(log.debug("Running scaling rule : [kub-cluster] " + kubernetesClusterId + " [cluster] " + clusterId))
+        eval(log.debug("Running scaling rule [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId))
 	
 	$loadThresholds : LoadThresholds() from  autoscalePolicy.getLoadThresholds()
 
@@ -79,81 +79,81 @@ dialect "mvel"
         scaleUp : Boolean() from (scaleUpForRif || scaleUpForMc || scaleUpForLa)
         scaleDown : Boolean() from (scaleDownForRif && scaleDownForMc &&  scaleDownForLa)
 
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
-	eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] minReplicas : " + minReplicas))
-	eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] maxReplicas : " + maxReplicas))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
+	eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas))
+	eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] maxReplicas : " + maxReplicas))
 
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] resetted ? : " + rifReset))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] predicted value : " + rifPredictedValue))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] upper limit : " + rifUpperLimit))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] lower limit : " + rifLowerLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] resetted ? : " + rifReset))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] predicted value : " + rifPredictedValue))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] upper limit : " + rifUpperLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] lower limit : " + rifLowerLimit))
 
-	eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] resetted ? : " + mcReset))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] predicted value : " + mcPredictedValue))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] upper limit : " + mcUpperLimit))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] lower limit : " + mcLowerLimit))
+	eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] resetted ? : " + mcReset))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] predicted value : " + mcPredictedValue))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] upper limit : " + mcUpperLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] lower limit : " + mcLowerLimit))
 
-	eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] resetted ? : " + laReset))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] predicted value : " + laPredictedValue))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] upper limit : " + laUpperLimit))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] lower limit : " + laLowerLimit))
+	eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] resetted ? : " + laReset))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] predicted value : " + laPredictedValue))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] upper limit : " + laUpperLimit))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] lower limit : " + laLowerLimit))
 
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-up action : " + scaleUp))
-        eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-down action : " + scaleDown))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-up action : " + scaleUp))
+        eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-down action : " + scaleDown))
 
 	then
         if (scaleUp) {
 	    int requiredReplicas = 0;
 	    if (scaleUpForRif) {
               int predictedReplicasForRif = $delegator.getPredictedReplicasForStat(minReplicas, rifUpperLimit, rifPredictedValue);
-	      log.info("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] predicted replicas : " + predictedReplicasForRif);
+	      log.info("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] predicted replicas : " + predictedReplicasForRif);
               if (predictedReplicasForRif > requiredReplicas ) {
                 requiredReplicas = predictedReplicasForRif;
               }
 	    } 
 	    if (scaleUpForMc) {
               int predictedReplicasForMc = $delegator.getPredictedReplicasForStat(minReplicas, mcUpperLimit, mcPredictedValue);
-              log.info("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] predicted replicas : " + predictedReplicasForMc);
+              log.info("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] predicted replicas : " + predictedReplicasForMc);
               if (predictedReplicasForMc > requiredReplicas ) {
                 requiredReplicas = predictedReplicasForMc;
               }
             }
             if (scaleUpForLa) {
               int predictedReplicasForLa = $delegator.getPredictedReplicasForStat(minReplicas, laUpperLimit, laPredictedValue);
-              log.info("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] predicted replicas : " + predictedReplicasForLa);
+              log.info("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] predicted replicas : " + predictedReplicasForLa);
               if (predictedReplicasForLa > requiredReplicas ) {
                 requiredReplicas = predictedReplicasForLa;
               }
             }
 	    //max-check
 	    if (requiredReplicas > maxReplicas) {
-	      log.info("[scaling] " + " [cluster] " + clusterId + " predicted replicas > max replicas : ");
+	      log.info("[scaling] " + " [cluster] : " + clusterId + " predicted replicas > max replicas : ");
               requiredReplicas = maxReplicas;
             }
 	    //min-check
 	    if (requiredReplicas < minReplicas) {
-	      log.info("[scaling] " + " [cluster] " + clusterId + " predicted replicas < min replicas : ");
+	      log.info("[scaling] " + " [cluster] : " + clusterId + " predicted replicas < min replicas : ");
               requiredReplicas = minReplicas;
             }
             //expand the cluster
             if (requiredReplicas > nonTerminatedReplicas) {
-              log.info("[scaling] Decided to scale-up : [cluster] " + clusterId);
- 	      log.info("[scaling-up] " + " [cluster] " + clusterId + " valid number of replicas to expand : " + requiredReplicas);
+              log.info("[scaling] Decided to scale-up : [cluster] : " + clusterId);
+ 	      log.info("[scaling-up] " + " [cluster] : " + clusterId + " valid number of replicas to expand : " + requiredReplicas);
 	      $delegator.delegateUpdateContainers($kubernetesClusterContext, requiredReplicas);
             }
             //shrink the cluster
             if (requiredReplicas < nonTerminatedReplicas) {
-              log.info("[scaling] Decided to scale-down : [cluster] " + clusterId);
- 	      log.info("[scaling-down] " + " [cluster] " + clusterId + " valid number of replicas to shrink : " + requiredReplicas);
+              log.info("[scaling] Decided to scale-down : [cluster] : " + clusterId);
+ 	      log.info("[scaling-down] " + " [cluster] : " + clusterId + " valid number of replicas to shrink : " + requiredReplicas);
 	      $delegator.delegateUpdateContainers($kubernetesClusterContext, requiredReplicas);
             }
             if (requiredReplicas == nonTerminatedReplicas) {
- 	      log.info("[scaling] " + " [cluster] " + clusterId + "non terminated replicas and predicted replicas are same");
+ 	      log.info("[scaling] " + " [cluster] : " + clusterId + "non terminated replicas and predicted replicas are same");
             }
             
         } else if (scaleDown) {
-            log.info("[scaling] Decided to scale-down : [cluster] " + clusterId);
-            log.info("[scaling-down] " + " [cluster] " + clusterId + " shrink the cluster to minReplicas : " + minReplicas);
+            log.info("[scaling] Decided to scale-down [cluster] : " + clusterId);
+            log.info("[scaling-down] " + " [cluster] : " + clusterId + " shrink the cluster to minReplicas : " + minReplicas);
             //shrink the cluster to minReplicas
             $delegator.delegateUpdateContainers($kubernetesClusterContext, minReplicas);
         } else {