You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/11 06:13:23 UTC
[37/50] [abbrv] git commit: adding terminateContainer logic to AS and
refactoring drools logging
adding terminateContainer logic to AS and refactoring drools logging
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4aa518df
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4aa518df
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4aa518df
Branch: refs/heads/master
Commit: 4aa518df76532a9fdc8aeeeb0f5e54af3db5338f
Parents: 67313a1
Author: R-Rajkumar <rr...@gmail.com>
Authored: Fri Oct 10 16:38:52 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Sat Oct 11 09:30:57 2014 +0530
----------------------------------------------------------------------
.../cloud/controller/CloudControllerClient.java | 16 ++++++
.../monitor/KubernetesClusterMonitor.java | 33 +++++++++--
.../autoscaler/rule/RuleTasksDelegator.java | 9 +++
.../src/main/conf/container-mincheck.drl | 31 +++++++---
.../src/main/conf/container-scaling.drl | 60 ++++++++++----------
5 files changed, 107 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index ce69875..8ec9f8e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -38,6 +38,7 @@ import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidClu
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidIaasProviderExceptionException;
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException;
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidPartitionExceptionException;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceMemberTerminationFailedExceptionException;
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub;
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException;
import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
@@ -328,4 +329,19 @@ public class CloudControllerClient {
throw new SpawningException(msg, e);
}
}
+
+ public synchronized void terminateContainer(String memberId) throws TerminationException{
+ try {
+ stub.terminateContainer(memberId);
+ } catch (RemoteException e) {
+ String msg = "Error while updating kubernetes controller, cannot communicate with " +
+ "cloud controller service";
+ log.error(msg, e);
+ throw new TerminationException(msg, e);
+ } catch (CloudControllerServiceMemberTerminationFailedExceptionException e) {
+ String msg = "Error while terminating container, member not valid for member id : " + memberId;
+ log.error(msg, e);
+ throw new TerminationException(msg, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
index d90e0b6..9375a8e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
@@ -381,7 +381,7 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
// no need to do anything here
// we will not be receiving this event for containers
- // because we just kill the containers
+ // we will only receive member terminated event
}
@Override
@@ -390,16 +390,39 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
// no need to do anything here
// we will not be receiving this event for containers
- // because we just kill the containers
+ // we will only receive member terminated event
}
@Override
public void handleMemberTerminatedEvent(
MemberTerminatedEvent memberTerminatedEvent) {
- // no need to do anything here
- // we will not be receiving this event for containers
- // because we just kill the containers
+ String memberId = memberTerminatedEvent.getMemberId();
+ if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from "
+ + "active members list", memberId));
+ } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+ log.warn(String.format("Member's obsolated timeout has been expired and "
+ + "it is removed from obsolated members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, "
+ + "pending and termination pending", memberId));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed successfully: "
+ + "[member] %s", memberId));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 9d3227a..0a9bde3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -264,6 +264,15 @@ public class RuleTasksDelegator {
log.error("Cannot update kubernetes controller ", e);
}
}
+
+ public void delegateTerminateContainer(KubernetesClusterContext kubernetesClusterContext, String memberId) {
+ try {
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ ccClient.terminateContainer(memberId);
+ } catch (Throwable e) {
+ log.error("Cannot delete container ", e);
+ }
+ }
public int getPredictedReplicasForStat(int minReplicas, float statUpperLimit, float statPredictedValue) {
if (statUpperLimit == 0) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
index 605c553..9798852 100644
--- a/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
+++ b/products/stratos/modules/distribution/src/main/conf/container-mincheck.drl
@@ -26,7 +26,7 @@ global org.apache.stratos.autoscaler.rule.RuleLog log;
global org.apache.stratos.autoscaler.rule.RuleTasksDelegator $delegator;
global java.lang.String clusterId;
-rule "Minimum Rule"
+rule "Container Minimum Rule"
dialect "mvel"
when
$kubernetesClusterContext : KubernetesClusterContext ()
@@ -36,20 +36,37 @@ dialect "mvel"
isServiceClusterCreated : Boolean() from $kubernetesClusterContext.isServiceClusterCreated()
eval(log.debug("Running minimum rule: [kub-cluster] " +kubernetesClusterId + " [cluster] " + clusterId))
- eval(log.debug("[min-check] " + " [cluster] " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
- eval(log.debug("[min-check] " + " [cluster] " + clusterId + " [Replicas] minReplicas : " + minReplicas))
+ eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
+ eval(log.debug("[min-check] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas))
eval(nonTerminatedReplicas < minReplicas)
then
if (isServiceClusterCreated) {
// we suceeded calling startContainer() once, can't call it again
- log.info("[min-check] Decided to scale-up : [cluster] " + clusterId);
- log.info("[min-check] " + " [cluster] " + clusterId + " min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas);
+ log.info("[min-check] Decided to scale-up : [cluster] : " + clusterId);
+ log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, expanding cluster to minReplicas : " + minReplicas);
$delegator.delegateUpdateContainers($kubernetesClusterContext, minReplicas);
} else {
// we should call startContainer
- log.info("[min-check] Decided to create the cluster : [cluster] " + clusterId);
- log.info("[min-check] " + " [cluster] " + clusterId + " : min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas);
+ log.info("[min-check] Decided to create the cluster : [cluster] : " + clusterId);
+ log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas);
$delegator.delegateStartContainers($kubernetesClusterContext);
}
end
+rule "Terminate Obsoleted Containers"
+dialect "mvel"
+ when
+ $kubernetesClusterContext : KubernetesClusterContext ()
+ kubernetesClusterId : String() from $kubernetesClusterContext.getKubernetesClusterID()
+ obsoleteReplicas : Integer() from $kubernetesClusterContext.getObsoletedMembers().size()
+
+ eval(log.debug("Running obsolete containers rule [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId))
+ eval(log.debug("[obsolete-check] " + "[cluster] : " + clusterId + " [Replicas] obsoleteReplicas : " + obsoleteReplicas))
+ eval($kubernetesClusterContext.getObsoletedMembers().keySet().size() > 0)
+ memberId : String() from $kubernetesClusterContext.getObsoletedMembers().keySet()
+ eval(log.debug("[obsolete-check] [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId + " Member id : " + memberId))
+ then
+ $delegator.delegateTerminateContainer($kubernetesClusterContext, memberId);
+end
+
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/4aa518df/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
index 398049b..46d9aeb 100644
--- a/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
+++ b/products/stratos/modules/distribution/src/main/conf/container-scaling.drl
@@ -40,7 +40,7 @@ dialect "mvel"
maxReplicas : Integer() from $kubernetesClusterContext.getMaxReplicas()
nonTerminatedReplicas : Integer() from $kubernetesClusterContext.getNonTerminatedMemberCount()
- eval(log.debug("Running scaling rule : [kub-cluster] " + kubernetesClusterId + " [cluster] " + clusterId))
+ eval(log.debug("Running scaling rule [kub-cluster] : " + kubernetesClusterId + " [cluster] : " + clusterId))
$loadThresholds : LoadThresholds() from autoscalePolicy.getLoadThresholds()
@@ -79,81 +79,81 @@ dialect "mvel"
scaleUp : Boolean() from (scaleUpForRif || scaleUpForMc || scaleUpForLa)
scaleDown : Boolean() from (scaleDownForRif && scaleDownForMc && scaleDownForLa)
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] minReplicas : " + minReplicas))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [Replicas] maxReplicas : " + maxReplicas))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [Replicas] maxReplicas : " + maxReplicas))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] resetted ? : " + rifReset))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] predicted value : " + rifPredictedValue))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] upper limit : " + rifUpperLimit))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] lower limit : " + rifLowerLimit))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] resetted ? : " + rifReset))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] predicted value : " + rifPredictedValue))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] upper limit : " + rifUpperLimit))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] lower limit : " + rifLowerLimit))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] resetted ? : " + mcReset))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] predicted value : " + mcPredictedValue))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] upper limit : " + mcUpperLimit))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] lower limit : " + mcLowerLimit))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] resetted ? : " + mcReset))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] predicted value : " + mcPredictedValue))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] upper limit : " + mcUpperLimit))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] lower limit : " + mcLowerLimit))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] resetted ? : " + laReset))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] predicted value : " + laPredictedValue))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] upper limit : " + laUpperLimit))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] lower limit : " + laLowerLimit))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] resetted ? : " + laReset))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] predicted value : " + laPredictedValue))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] upper limit : " + laUpperLimit))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] lower limit : " + laLowerLimit))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-up action : " + scaleUp))
- eval(log.debug("[scaling] " + " [cluster] " + clusterId + " Scale-down action : " + scaleDown))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-up action : " + scaleUp))
+ eval(log.debug("[scaling] " + " [cluster] : " + clusterId + " scale-down action : " + scaleDown))
then
if (scaleUp) {
int requiredReplicas = 0;
if (scaleUpForRif) {
int predictedReplicasForRif = $delegator.getPredictedReplicasForStat(minReplicas, rifUpperLimit, rifPredictedValue);
- log.info("[scaling] " + " [cluster] " + clusterId + " [RequestInFlight] predicted replicas : " + predictedReplicasForRif);
+ log.info("[scaling] " + " [cluster] : " + clusterId + " [RequestInFlight] predicted replicas : " + predictedReplicasForRif);
if (predictedReplicasForRif > requiredReplicas ) {
requiredReplicas = predictedReplicasForRif;
}
}
if (scaleUpForMc) {
int predictedReplicasForMc = $delegator.getPredictedReplicasForStat(minReplicas, mcUpperLimit, mcPredictedValue);
- log.info("[scaling] " + " [cluster] " + clusterId + " [MemoryConsumption] predicted replicas : " + predictedReplicasForMc);
+ log.info("[scaling] " + " [cluster] : " + clusterId + " [MemoryConsumption] predicted replicas : " + predictedReplicasForMc);
if (predictedReplicasForMc > requiredReplicas ) {
requiredReplicas = predictedReplicasForMc;
}
}
if (scaleUpForLa) {
int predictedReplicasForLa = $delegator.getPredictedReplicasForStat(minReplicas, laUpperLimit, laPredictedValue);
- log.info("[scaling] " + " [cluster] " + clusterId + " [LoadAverage] predicted replicas : " + predictedReplicasForLa);
+ log.info("[scaling] " + " [cluster] : " + clusterId + " [LoadAverage] predicted replicas : " + predictedReplicasForLa);
if (predictedReplicasForLa > requiredReplicas ) {
requiredReplicas = predictedReplicasForLa;
}
}
//max-check
if (requiredReplicas > maxReplicas) {
- log.info("[scaling] " + " [cluster] " + clusterId + " predicted replicas > max replicas : ");
+ log.info("[scaling] " + " [cluster] : " + clusterId + " predicted replicas > max replicas : ");
requiredReplicas = maxReplicas;
}
//min-check
if (requiredReplicas < minReplicas) {
- log.info("[scaling] " + " [cluster] " + clusterId + " predicted replicas < min replicas : ");
+ log.info("[scaling] " + " [cluster] : " + clusterId + " predicted replicas < min replicas : ");
requiredReplicas = minReplicas;
}
//expand the cluster
if (requiredReplicas > nonTerminatedReplicas) {
- log.info("[scaling] Decided to scale-up : [cluster] " + clusterId);
- log.info("[scaling-up] " + " [cluster] " + clusterId + " valid number of replicas to expand : " + requiredReplicas);
+ log.info("[scaling] Decided to scale-up : [cluster] : " + clusterId);
+ log.info("[scaling-up] " + " [cluster] : " + clusterId + " valid number of replicas to expand : " + requiredReplicas);
$delegator.delegateUpdateContainers($kubernetesClusterContext, requiredReplicas);
}
//shrink the cluster
if (requiredReplicas < nonTerminatedReplicas) {
- log.info("[scaling] Decided to scale-down : [cluster] " + clusterId);
- log.info("[scaling-down] " + " [cluster] " + clusterId + " valid number of replicas to shrink : " + requiredReplicas);
+ log.info("[scaling] Decided to scale-down : [cluster] : " + clusterId);
+ log.info("[scaling-down] " + " [cluster] : " + clusterId + " valid number of replicas to shrink : " + requiredReplicas);
$delegator.delegateUpdateContainers($kubernetesClusterContext, requiredReplicas);
}
if (requiredReplicas == nonTerminatedReplicas) {
- log.info("[scaling] " + " [cluster] " + clusterId + "non terminated replicas and predicted replicas are same");
+ log.info("[scaling] " + " [cluster] : " + clusterId + "non terminated replicas and predicted replicas are same");
}
} else if (scaleDown) {
- log.info("[scaling] Decided to scale-down : [cluster] " + clusterId);
- log.info("[scaling-down] " + " [cluster] " + clusterId + " shrink the cluster to minReplicas : " + minReplicas);
+ log.info("[scaling] Decided to scale-down [cluster] : " + clusterId);
+ log.info("[scaling-down] " + " [cluster] : " + clusterId + " shrink the cluster to minReplicas : " + minReplicas);
//shrink the cluster to minReplicas
$delegator.delegateUpdateContainers($kubernetesClusterContext, minReplicas);
} else {