You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by sa...@apache.org on 2014/10/06 19:43:07 UTC
[1/7] git commit: executor service instead of thread sleep
Repository: stratos
Updated Branches:
refs/heads/container-autoscaling 7d616494a -> fb68de94a
executor service instead of thread sleep
Signed-off-by: sajhak <sa...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/fb68de94
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/fb68de94
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/fb68de94
Branch: refs/heads/container-autoscaling
Commit: fb68de94a1df0a7c679d5881f680dba5671340eb
Parents: 244030b
Author: R-Rajkumar <rr...@gmail.com>
Authored: Mon Oct 6 11:25:54 2014 +0530
Committer: sajhak <sa...@gmail.com>
Committed: Mon Oct 6 23:11:29 2014 +0530
----------------------------------------------------------------------
.../monitor/AbstractClusterMonitor.java | 10 +++----
.../KubernetesServiceClusterMonitor.java | 28 ++++++++++----------
.../autoscaler/monitor/VMLbClusterMonitor.java | 26 +++++++++---------
.../monitor/VMServiceClusterMonitor.java | 26 +++++++++---------
4 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index e44bd72..3238d46 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -66,7 +66,7 @@ public abstract class AbstractClusterMonitor implements Runnable {
private boolean isDestroyed;
private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-
+
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
protected AbstractClusterMonitor(String clusterId, String serviceId,
@@ -81,13 +81,13 @@ public abstract class AbstractClusterMonitor implements Runnable {
}
protected abstract void readConfigurations();
-
+
public void startScheduler() {
- scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+ scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
}
-
+
protected void stopScheduler() {
- scheduler.shutdownNow();
+ scheduler.shutdownNow();
}
protected abstract void monitor();
http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
index 93580d9..6e14ce0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -59,22 +59,22 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
@Override
public void run() {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
- + ClusterStatus.In_Maintenance + " mode......");
- }
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
+ }
+ try {
+ if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+ + ClusterStatus.In_Maintenance + " mode......");
}
- } catch (Exception e) {
- log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
- e);
}
+ } catch (Exception e) {
+ log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
+ e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index f950f9d..1c27380 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -57,21 +57,21 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
@Override
public void run() {
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
- ClusterStatus.In_Maintenance + " mode......");
- }
+ if (log.isDebugEnabled()) {
+ log.debug("VMLbClusterMonitor is running.. " + this.toString());
+ }
+ try {
+ if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
+ ClusterStatus.In_Maintenance + " mode......");
}
- } catch (Exception e) {
- log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
}
+ } catch (Exception e) {
+ log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/stratos/blob/fb68de94/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index d8c9e69..6f9fb26 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -67,21 +67,21 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
} catch (InterruptedException ignore) {
}
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
- ClusterStatus.In_Maintenance + " mode......");
- }
+ if (log.isDebugEnabled()) {
+ log.debug("VMServiceClusterMonitor is running.. " + this.toString());
+ }
+ try {
+ if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
+ ClusterStatus.In_Maintenance + " mode......");
}
- } catch (Exception e) {
- log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
}
+ } catch (Exception e) {
+ log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
+ }
}
@Override
[6/7] code review changes to cluster monitors
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index e1d7cc5..88d8dee 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -21,24 +21,41 @@ package org.apache.stratos.autoscaler.message.receiver.health;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.TerminationException;
import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.DockerServiceClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.policy.model.LoadAverage;
-import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.*;
-import org.apache.stratos.messaging.listener.health.stat.*;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.listener.health.stat.AverageLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.AverageMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.AverageRequestsInFlightEventListener;
+import org.apache.stratos.messaging.listener.health.stat.GradientOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.GradientOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.GradientOfRequestsInFlightEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberAverageLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberAverageMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberFaultEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberGradientOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.MemberSecondDerivativeOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfLoadAverageEventListener;
+import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfMemoryConsumptionEventListener;
+import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfRequestsInFlightEventListener;
import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -54,7 +71,7 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
private HealthStatEventReceiver healthStatEventReceiver;
public AutoscalerHealthStatEventReceiver() {
- this.healthStatEventReceiver = new HealthStatEventReceiver();
+ this.healthStatEventReceiver = new HealthStatEventReceiver();
addEventListeners();
}
@@ -67,18 +84,18 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
}
Thread thread = new Thread(healthStatEventReceiver);
thread.start();
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info("Autoscaler health stat event receiver thread started");
}
// Keep the thread live until terminated
- while (!terminated){
- try {
+ while (!terminated) {
+ try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
}
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info("Autoscaler health stat event receiver thread terminated");
}
}
@@ -88,876 +105,396 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
+ AverageLoadAverageEvent averageLoadAverageEvent = (AverageLoadAverageEvent) event;
+ String clusterId = averageLoadAverageEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageLoadAverage(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setAverageLoadAverage(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleAverageLoadAverageEvent(averageLoadAverageEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
- + "[value] %s", clusterId, networkPartitionId, floatValue));
- }
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent = (AverageMemoryConsumptionEvent) event;
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageMemoryConsumption(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setAverageMemoryConsumption(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleAverageMemoryConsumptionEvent(averageMemoryConsumptionEvent);
}
});
+
healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
- Float floatValue = e.getValue();
-
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent = (AverageRequestsInFlightEvent) event;
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageRequestsInFlight(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setAverageRequestsInFlight(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleAverageRequestsInFlightEvent(averageRequestsInFlightEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent = (GradientOfLoadAverageEvent) event;
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setLoadAverageGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setLoadAverageGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleGradientOfLoadAverageEvent(gradientOfLoadAverageEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
- }
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent = (GradientOfMemoryConsumptionEvent) event;
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setMemoryConsumptionGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setMemoryConsumptionGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleGradientOfMemoryConsumptionEvent(gradientOfMemoryConsumptionEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent = (GradientOfRequestsInFlightEvent) event;
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setRequestsInFlightGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setRequestsInFlightGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleGradientOfRequestsInFlightEvent(gradientOfRequestsInFlightEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
- LoadAverage loadAverage = findLoadAverage(e.getMemberId());
- if(loadAverage != null) {
-
- Float floatValue = e.getValue();
- loadAverage.setAverage(floatValue);
-
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent = (MemberAverageLoadAverageEvent) event;
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Member avg of load avg event: [member] %s [value] %s",
- e.getMemberId(), floatValue));
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
+ return;
}
-
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the health stat", memberId));
+ }
+ return;
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ String clusterId = member.getClusterId();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberAverageLoadAverageEvent(memberAverageLoadAverageEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
- MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
- if(memoryConsumption != null) {
-
- Float floatValue = e.getValue();
- memoryConsumption.setAverage(floatValue);
-
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent = (MemberAverageMemoryConsumptionEvent) event;
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Member avg Memory Consumption event: [member] %s [value] %s",
- e.getMemberId(), floatValue));
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
+ return;
}
-
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the health stat", memberId));
+ }
+ return;
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ String clusterId = member.getClusterId();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberAverageMemoryConsumptionEvent(memberAverageMemoryConsumptionEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberFaultEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberFaultEvent e = (MemberFaultEvent) event;
- String clusterId = e.getClusterId();
- String memberId = e.getMemberId();
-
+ MemberFaultEvent memberFaultEvent = (MemberFaultEvent) event;
+ String clusterId = memberFaultEvent.getClusterId();
+ String memberId = memberFaultEvent.getMemberId();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member fault event: [member] %s ", memberId));
+ }
if (memberId == null || memberId.isEmpty()) {
- if(log.isErrorEnabled()) {
- log.error("Member id not found in received message");
- }
- } else {
-
+ log.error("Member id not found in received message");
+ return;
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Member fault event: [member] %s ", e.getMemberId()));
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
}
- handleMemberFaultEvent(clusterId, memberId);
+ return;
}
+ monitor.handleMemberFaultEvent(memberFaultEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
- LoadAverage loadAverage = findLoadAverage(e.getMemberId());
- if(loadAverage != null) {
-
- Float floatValue = e.getValue();
- loadAverage.setGradient(floatValue);
-
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent = (MemberGradientOfLoadAverageEvent) event;
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Member grad of load avg event: [member] %s "
- + "[value] %s", e.getMemberId(), floatValue));
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
+ return;
}
-
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the health stat", memberId));
+ }
+ return;
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ String clusterId = member.getClusterId();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberGradientOfLoadAverageEvent(memberGradientOfLoadAverageEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
- MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
- if(memoryConsumption != null) {
-
- Float floatValue = e.getValue();
- memoryConsumption.setGradient(floatValue);
-
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent = (MemberGradientOfMemoryConsumptionEvent) event;
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Member grad of Memory Consumption event: [member] %s "
- + "[value] %s", e.getMemberId(), floatValue));
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
+ return;
}
-
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the health stat", memberId));
+ }
+ return;
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ String clusterId = member.getClusterId();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberGradientOfMemoryConsumptionEvent(memberGradientOfMemoryConsumptionEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
- LoadAverage loadAverage = findLoadAverage(e.getMemberId());
- if(loadAverage != null) {
-
- Float floatValue = e.getValue();
- loadAverage.setSecondDerivative(floatValue);
-
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent = (MemberSecondDerivativeOfLoadAverageEvent) event;
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Member Second Derivation of load avg event: [member] %s "
- + "[value] %s", e.getMemberId(), floatValue));
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
+ return;
+ }
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the health stat", memberId));
+ }
+ return;
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ String clusterId = member.getClusterId();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
}
+ monitor.handleMemberSecondDerivativeOfLoadAverageEvent(memberSecondDerivativeOfLoadAverageEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- }
+ }
});
+
healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
- }
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent = (SecondDerivativeOfLoadAverageEvent) event;
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setLoadAverageSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleSecondDerivativeOfLoadAverageEvent(secondDerivativeOfLoadAverageEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
- }
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent = (SecondDerivativeOfMemoryConsumptionEvent) event;
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setMemoryConsumptionSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleSecondDerivativeOfMemoryConsumptionEvent(secondDerivativeOfMemoryConsumptionEvent);
}
-
});
+
healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second derivative of Rif event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, floatValue));
- }
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent = (SecondDerivativeOfRequestsInFlightEvent) event;
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext =
- ((DockerServiceClusterMonitor) monitor).getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setRequestsInFlightSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
+ monitor.handleSecondDerivativeOfRequestsInFlightEvent(secondDerivativeOfRequestsInFlightEvent);
}
});
}
-
- private LoadAverage findLoadAverage(String memberId) {
-// String memberId = event.getProperties().get("member_id");
- Member member = findMember(memberId);
-
- if(null == member){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
- }
- return null;
- }
- String clusterId = member.getClusterId();
-
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return null;
- }
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- String networkPartitionId = findNetworkPartitionId(memberId);
- MemberStatsContext memberStatsContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId)
- .getPartitionCtxt(member.getPartitionId())
- .getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }
- else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the health stat", memberId));
- }
- return null;
- }
-
- LoadAverage loadAverage = memberStatsContext.getLoadAverage();
- return loadAverage;
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- MemberStatsContext memberStatsContext =
- ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt().getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }
- else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the health stat", memberId));
- }
- return null;
- }
-
- LoadAverage loadAverage = memberStatsContext.getLoadAverage();
- return loadAverage;
- }
-
- return null;
- }
-
- private MemoryConsumption findMemoryConsumption(String memberId) {
-// String memberId = event.getProperties().get("member_id");
- Member member = findMember(memberId);
-
- if(null == member){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology : [member] %s", memberId));
- }
- return null;
- }
-
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(member.getClusterId())){
- monitor = asCtx.getClusterMonitor(member.getClusterId());
- } else {
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", member.getClusterId()));
- }
- return null;
- }
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- String networkPartitionId = findNetworkPartitionId(memberId);
- NetworkPartitionContext networkPartitionCtxt =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the health stat", memberId));
- }
- return null;
- }
- MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
- return memoryConsumption;
- } else if (monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterCtxt =
- ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
- MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the health stat", memberId));
- }
- return null;
- }
- MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
- return memoryConsumption;
- }
-
- return null;
- }
-
- private String findNetworkPartitionId(String memberId) {
- for(Service service: TopologyManager.getTopology().getServices()){
- for(Cluster cluster: service.getClusters()){
- if(cluster.memberExists(memberId)){
- return cluster.getMember(memberId).getNetworkPartitionId();
- }
- }
- }
- return null;
- }
-
- private Member findMember(String memberId) {
+ private Member getMemberByMemberId(String memberId) {
try {
TopologyManager.acquireReadLock();
- for(Service service : TopologyManager.getTopology().getServices()) {
- for(Cluster cluster : service.getClusters()) {
- if(cluster.memberExists(memberId)) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
return cluster.getMember(memberId);
}
}
}
return null;
- }
- finally {
+ } finally {
TopologyManager.releaseReadLock();
}
}
- private void handleMemberFaultEvent(String clusterId, String memberId) {
- try {
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
-
- if(asCtx.clusterMonitorExist(clusterId)){
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster){
-
- NetworkPartitionContext nwPartitionCtxt;
- try{
- TopologyManager.acquireReadLock();
- Member member = findMember(memberId);
-
- if(null == member){
- return;
- }
- if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the member fault health stat", memberId));
- }
- return;
- }
-
- nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(member);
-
- }finally{
- TopologyManager.releaseReadLock();
- }
- // start a new member in the same Partition
- String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
- if(!partitionCtxt.activeMemberExist(memberId)){
- if(log.isDebugEnabled()){
- log.debug(String.format("Could not find the active member in partition context, "
- + "[member] %s ", memberId));
- }
- return;
- }
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Faulty member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
- } catch (TerminationException e) {
- log.error(e);
- }
- }
-
- public void terminate(){
- this.terminated = true;
+ public void terminate() {
+ this.terminated = true;
}
}
[7/7] git commit: code review changes to cluster monitors
Posted by sa...@apache.org.
code review changes to cluster monitors
Signed-off-by: sajhak <sa...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/31056109
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/31056109
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/31056109
Branch: refs/heads/container-autoscaling
Commit: 31056109c27256e0fc84319e428390eef09f28cf
Parents: 7d61649
Author: R-Rajkumar <rr...@gmail.com>
Authored: Sun Oct 5 15:04:57 2014 +0530
Committer: sajhak <sa...@gmail.com>
Committed: Mon Oct 6 23:11:29 2014 +0530
----------------------------------------------------------------------
.../stratos/autoscaler/AutoscalerContext.java | 36 +-
.../autoscaler/KubernetesClusterContext.java | 863 ++++++++--------
.../stratos/autoscaler/MemberStatsContext.java | 29 +-
.../AutoscalerHealthStatEventReceiver.java | 991 +++++--------------
.../AutoscalerTopologyEventReceiver.java | 458 ++-------
.../monitor/AbstractClusterMonitor.java | 307 +++---
.../monitor/ClusterMonitorFactory.java | 250 ++---
.../monitor/ContainerClusterMonitor.java | 59 --
.../monitor/DockerServiceClusterMonitor.java | 176 ----
.../monitor/KubernetesClusterMonitor.java | 427 ++++++++
.../KubernetesServiceClusterMonitor.java | 181 ++++
.../autoscaler/monitor/VMClusterMonitor.java | 597 ++++++++++-
.../autoscaler/monitor/VMLbClusterMonitor.java | 87 +-
.../monitor/VMServiceClusterMonitor.java | 73 +-
.../stratos/autoscaler/util/AutoscalerUtil.java | 391 +-------
.../stratos/common/enums/ClusterType.java | 5 -
16 files changed, 2440 insertions(+), 2490 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index 581d633..2d10954 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -33,6 +33,8 @@ import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
public class AutoscalerContext {
private static final Log log = LogFactory.getLog(AutoscalerContext.class);
+ private static final AutoscalerContext INSTANCE = new AutoscalerContext();
+
private AutoscalerContext() {
try {
setClusterMonitors(new HashMap<String, AbstractClusterMonitor>());
@@ -40,17 +42,13 @@ public class AutoscalerContext {
log.error("Rule evaluateMinCheck error", e);
}
}
-
+
// Map<ClusterId, AbstractClusterMonitor>
private Map<String, AbstractClusterMonitor> clusterMonitors;
- private static class Holder {
- private static final AutoscalerContext INSTANCE = new AutoscalerContext();
- }
-
- public static AutoscalerContext getInstance() {
- return Holder.INSTANCE;
- }
+ public static AutoscalerContext getInstance() {
+ return INSTANCE;
+ }
public void addClusterMonitor(AbstractClusterMonitor clusterMonitor) {
clusterMonitors.put(clusterMonitor.getClusterId(), clusterMonitor);
@@ -59,11 +57,7 @@ public class AutoscalerContext {
public AbstractClusterMonitor getClusterMonitor(String clusterId) {
return clusterMonitors.get(clusterId);
}
-
- public boolean clusterMonitorExist(String clusterId) {
- return clusterMonitors.containsKey(clusterId);
- }
-
+
public Map<String, AbstractClusterMonitor> getClusterMonitors() {
return clusterMonitors;
}
@@ -71,13 +65,15 @@ public class AutoscalerContext {
public void setClusterMonitors(Map<String, AbstractClusterMonitor> clusterMonitors) {
this.clusterMonitors = clusterMonitors;
}
-
+
public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
- if(!clusterMonitorExist(clusterId)) {
- log.fatal("ClusterMonitor not found for cluster id: "+clusterId);
- return null;
- }
- log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
- return clusterMonitors.remove(clusterId);
+
+ AbstractClusterMonitor monitor = clusterMonitors.remove(clusterId);
+ if (monitor == null) {
+ log.fatal("ClusterMonitor not found for cluster id: " + clusterId);
+ } else {
+ log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
+ }
+ return monitor;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
index 16bc653..c8b6e39 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/KubernetesClusterContext.java
@@ -40,474 +40,475 @@ import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
/*
* It holds the runtime data of a kubernetes cluster
*/
-public class KubernetesClusterContext implements Serializable{
-
- private static final long serialVersionUID = 808741789615481596L;
- private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
-
- private String kubernetesClusterId;
- private String serviceName;
-
+public class KubernetesClusterContext implements Serializable {
+
+ private static final long serialVersionUID = 808741789615481596L;
+ private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
+
+ private String kubernetesClusterId;
+ private String serviceName;
+
private int minReplicas;
private int maxReplicas;
private int currentReplicas = 0;
-
+
// properties
private Properties properties;
-
+
// 15 mints as the default
private long expiryTime;
// pending members
private List<MemberContext> pendingMembers;
-
+
// active members
private List<MemberContext> activeMembers;
//Keep statistics come from CEP
private Map<String, MemberStatsContext> memberStatsContexts;
-
+
//Following information will keep events details
private RequestsInFlight requestsInFlight;
private MemoryConsumption memoryConsumption;
private LoadAverage loadAverage;
-
+
// cluster id
private String clusterId;
-
+
//boolean values to keep whether the requests in flight parameters are reset or not
- private boolean rifReset = false, averageRifReset = false,
- gradientRifReset = false, secondDerivativeRifRest = false;
+ private boolean rifReset = false, averageRifReset = false,
+ gradientRifReset = false, secondDerivativeRifRest = false;
//boolean values to keep whether the memory consumption parameters are reset or not
private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false,
gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false;
//boolean values to keep whether the load average parameters are reset or not
- private boolean loadAverageReset = false, averageLoadAverageReset = false,
- gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
-
- public KubernetesClusterContext(String kubernetesClusterId, String clusterId){
- this.kubernetesClusterId = kubernetesClusterId;
- this.clusterId = clusterId;
+ private boolean loadAverageReset = false, averageLoadAverageReset = false,
+ gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
+
+ public KubernetesClusterContext(String kubernetesClusterId, String clusterId) {
+ this.kubernetesClusterId = kubernetesClusterId;
+ this.clusterId = clusterId;
this.pendingMembers = new ArrayList<MemberContext>();
this.activeMembers = new ArrayList<MemberContext>();
this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>();
this.requestsInFlight = new RequestsInFlight();
this.loadAverage = new LoadAverage();
this.memoryConsumption = new MemoryConsumption();
-
+
// check if a different value has been set for expiryTime
XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
expiryTime = conf.getLong("autoscaler.member.expiryTimeout", 300000);
if (log.isDebugEnabled()) {
log.debug("Member expiry time is set to: " + expiryTime);
}
-
+
Thread th = new Thread(new PendingMemberWatcher(this));
th.start();
- }
-
- public String getKubernetesClusterID() {
- return kubernetesClusterId;
- }
- public void setKubernetesClusterID(String kubernetesClusterId) {
- this.kubernetesClusterId = kubernetesClusterId;
- }
-
- public List<MemberContext> getPendingMembers() {
- return pendingMembers;
- }
-
- public void setPendingMembers(List<MemberContext> pendingMembers) {
- this.pendingMembers = pendingMembers;
- }
-
- public int getActiveMemberCount() {
- return activeMembers.size();
- }
-
- public void setActiveMembers(List<MemberContext> activeMembers) {
- this.activeMembers = activeMembers;
- }
-
- public int getMinReplicas() {
- return minReplicas;
- }
-
- public void setMinReplicas(int minReplicas) {
- this.minReplicas = minReplicas;
- }
-
- public int getMaxReplicas() {
- return maxReplicas;
- }
-
- public void setMaxReplicas(int maxReplicas) {
- this.maxReplicas = maxReplicas;
- }
-
- public int getCurrentReplicas() {
- return currentReplicas;
- }
-
- public void setCurrentReplicas(int currentReplicas) {
- this.currentReplicas = currentReplicas;
- }
-
- public void addPendingMember(MemberContext ctxt) {
- this.pendingMembers.add(ctxt);
- }
-
- public boolean removePendingMember(String id) {
- if (id == null) {
- return false;
- }
- for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext();) {
- MemberContext pendingMember = (MemberContext) iterator.next();
- if (id.equals(pendingMember.getMemberId())) {
- iterator.remove();
- return true;
- }
-
- }
-
- return false;
- }
-
- public void movePendingMemberToActiveMembers(String memberId) {
- if (memberId == null) {
- return;
- }
- Iterator<MemberContext> iterator = pendingMembers.listIterator();
- while (iterator.hasNext()) {
- MemberContext pendingMember = iterator.next();
- if (pendingMember == null) {
- iterator.remove();
- continue;
- }
- if (memberId.equals(pendingMember.getMemberId())) {
- // member is activated
- // remove from pending list
- iterator.remove();
- // add to the activated list
- this.activeMembers.add(pendingMember);
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Pending member is removed and added to the "
- + "activated member list. [Member Id] %s",
- memberId));
- }
- break;
- }
- }
- }
-
- public void addActiveMember(MemberContext ctxt) {
- this.activeMembers.add(ctxt);
- }
-
- public void removeActiveMember(MemberContext ctxt) {
- this.activeMembers.remove(ctxt);
- }
-
- public long getExpiryTime() {
- return expiryTime;
- }
-
- public void setExpiryTime(long expiryTime) {
- this.expiryTime = expiryTime;
- }
-
- public Map<String, MemberStatsContext> getMemberStatsContexts() {
- return memberStatsContexts;
- }
-
- public MemberStatsContext getMemberStatsContext(String memberId) {
- return memberStatsContexts.get(memberId);
- }
-
- public void addMemberStatsContext(MemberStatsContext ctxt) {
- this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
- }
-
- public void removeMemberStatsContext(String memberId) {
- this.memberStatsContexts.remove(memberId);
- }
-
- public Properties getProperties() {
- return properties;
- }
-
- public void setProperties(Properties properties) {
- this.properties = properties;
- }
-
- public String getServiceName() {
- return serviceName;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
- public List<MemberContext> getActiveMembers() {
- return activeMembers;
- }
-
- public boolean removeActiveMemberById(String memberId) {
- boolean removeActiveMember = false;
- synchronized (activeMembers) {
- Iterator<MemberContext> iterator = activeMembers.listIterator();
- while (iterator.hasNext()) {
- MemberContext memberContext = iterator.next();
- if (memberId.equals(memberContext.getMemberId())) {
- iterator.remove();
- removeActiveMember = true;
-
- break;
- }
- }
- }
- return removeActiveMember;
- }
-
- public boolean activeMemberExist(String memberId) {
-
- for (MemberContext memberContext : activeMembers) {
- if (memberId.equals(memberContext.getMemberId())) {
- return true;
- }
- }
- return false;
- }
-
- private class PendingMemberWatcher implements Runnable {
- private KubernetesClusterContext ctxt;
-
- public PendingMemberWatcher(KubernetesClusterContext ctxt) {
- this.ctxt = ctxt;
- }
-
- @Override
- public void run() {
-
- while (true) {
- long expiryTime = ctxt.getExpiryTime();
- List<MemberContext> pendingMembers = ctxt.getPendingMembers();
-
- synchronized (pendingMembers) {
- Iterator<MemberContext> iterator = pendingMembers
- .listIterator();
- while (iterator.hasNext()) {
- MemberContext pendingMember = iterator.next();
-
- if (pendingMember == null) {
- continue;
- }
- long pendingTime = System.currentTimeMillis()
- - pendingMember.getInitTime();
- if (pendingTime >= expiryTime) {
-
- // terminate all containers of this cluster
- try {
- CloudControllerClient.getInstance().terminateAllContainers(clusterId);
- iterator.remove();
- } catch (TerminationException e) {
- log.error(e.getMessage(), e);
- }
-
- }
- }
- }
-
- try {
- // TODO find a constant
- Thread.sleep(15000);
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- }
-
- public float getAverageRequestsInFlight() {
- return requestsInFlight.getAverage();
- }
-
- public void setAverageRequestsInFlight(float averageRequestsInFlight) {
- requestsInFlight.setAverage(averageRequestsInFlight);
- averageRifReset = true;
- if (secondDerivativeRifRest && gradientRifReset) {
- rifReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Requests in flights stats are reset, "
- + "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getRequestsInFlightSecondDerivative() {
- return requestsInFlight.getSecondDerivative();
- }
-
- public void setRequestsInFlightSecondDerivative(
- float requestsInFlightSecondDerivative) {
- requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
- secondDerivativeRifRest = true;
- if (averageRifReset && gradientRifReset) {
- rifReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getRequestsInFlightGradient() {
- return requestsInFlight.getGradient();
- }
-
- public void setRequestsInFlightGradient(float requestsInFlightGradient) {
- requestsInFlight.setGradient(requestsInFlightGradient);
- gradientRifReset = true;
- if (secondDerivativeRifRest && averageRifReset) {
- rifReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public boolean isRifReset() {
- return rifReset;
- }
-
- public void setRifReset(boolean rifReset) {
- this.rifReset = rifReset;
- this.averageRifReset = rifReset;
- this.gradientRifReset = rifReset;
- this.secondDerivativeRifRest = rifReset;
- }
-
- public float getAverageMemoryConsumption() {
- return memoryConsumption.getAverage();
- }
-
- public void setAverageMemoryConsumption(float averageMemoryConsumption) {
- memoryConsumption.setAverage(averageMemoryConsumption);
- averageMemoryConsumptionReset = true;
- if (secondDerivativeMemoryConsumptionRest
- && gradientMemoryConsumptionReset) {
- memoryConsumptionReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getMemoryConsumptionSecondDerivative() {
- return memoryConsumption.getSecondDerivative();
- }
-
- public void setMemoryConsumptionSecondDerivative(
- float memoryConsumptionSecondDerivative) {
- memoryConsumption
- .setSecondDerivative(memoryConsumptionSecondDerivative);
- secondDerivativeMemoryConsumptionRest = true;
- if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
- memoryConsumptionReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getMemoryConsumptionGradient() {
- return memoryConsumption.getGradient();
- }
-
- public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
- memoryConsumption.setGradient(memoryConsumptionGradient);
- gradientMemoryConsumptionReset = true;
- if (secondDerivativeMemoryConsumptionRest
- && averageMemoryConsumptionReset) {
- memoryConsumptionReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public boolean isMemoryConsumptionReset() {
- return memoryConsumptionReset;
- }
-
- public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
- this.memoryConsumptionReset = memoryConsumptionReset;
- this.averageMemoryConsumptionReset = memoryConsumptionReset;
- this.gradientMemoryConsumptionReset = memoryConsumptionReset;
- this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
- }
-
-
- public float getAverageLoadAverage() {
- return loadAverage.getAverage();
- }
-
- public void setAverageLoadAverage(float averageLoadAverage) {
- loadAverage.setAverage(averageLoadAverage);
- averageLoadAverageReset = true;
- if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
- loadAverageReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Load average stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getLoadAverageSecondDerivative() {
- return loadAverage.getSecondDerivative();
- }
-
- public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
- loadAverage.setSecondDerivative(loadAverageSecondDerivative);
- secondDerivativeLoadAverageRest = true;
- if (averageLoadAverageReset && gradientLoadAverageReset) {
- loadAverageReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Load average stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getLoadAverageGradient() {
- return loadAverage.getGradient();
- }
-
- public void setLoadAverageGradient(float loadAverageGradient) {
- loadAverage.setGradient(loadAverageGradient);
- gradientLoadAverageReset = true;
- if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
- loadAverageReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Load average stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public boolean isLoadAverageReset() {
- return loadAverageReset;
- }
-
- public void setLoadAverageReset(boolean loadAverageReset) {
- this.loadAverageReset = loadAverageReset;
- this.averageLoadAverageReset = loadAverageReset;
- this.gradientLoadAverageReset = loadAverageReset;
- this.secondDerivativeLoadAverageRest = loadAverageReset;
- }
+ }
+
+ public String getKubernetesClusterID() {
+ return kubernetesClusterId;
+ }
+
+ public void setKubernetesClusterID(String kubernetesClusterId) {
+ this.kubernetesClusterId = kubernetesClusterId;
+ }
+
+ public List<MemberContext> getPendingMembers() {
+ return pendingMembers;
+ }
+
+ public void setPendingMembers(List<MemberContext> pendingMembers) {
+ this.pendingMembers = pendingMembers;
+ }
+
+ public int getActiveMemberCount() {
+ return activeMembers.size();
+ }
+
+ public void setActiveMembers(List<MemberContext> activeMembers) {
+ this.activeMembers = activeMembers;
+ }
+
+ public int getMinReplicas() {
+ return minReplicas;
+ }
+
+ public void setMinReplicas(int minReplicas) {
+ this.minReplicas = minReplicas;
+ }
+
+ public int getMaxReplicas() {
+ return maxReplicas;
+ }
+
+ public void setMaxReplicas(int maxReplicas) {
+ this.maxReplicas = maxReplicas;
+ }
+
+ public int getCurrentReplicas() {
+ return currentReplicas;
+ }
+
+ public void setCurrentReplicas(int currentReplicas) {
+ this.currentReplicas = currentReplicas;
+ }
+
+ public void addPendingMember(MemberContext ctxt) {
+ this.pendingMembers.add(ctxt);
+ }
+
+ public boolean removePendingMember(String id) {
+ if (id == null) {
+ return false;
+ }
+ for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) {
+ MemberContext pendingMember = (MemberContext) iterator.next();
+ if (id.equals(pendingMember.getMemberId())) {
+ iterator.remove();
+ return true;
+ }
+
+ }
+
+ return false;
+ }
+
+ public void movePendingMemberToActiveMembers(String memberId) {
+ if (memberId == null) {
+ return;
+ }
+ Iterator<MemberContext> iterator = pendingMembers.listIterator();
+ while (iterator.hasNext()) {
+ MemberContext pendingMember = iterator.next();
+ if (pendingMember == null) {
+ iterator.remove();
+ continue;
+ }
+ if (memberId.equals(pendingMember.getMemberId())) {
+ // member is activated
+ // remove from pending list
+ iterator.remove();
+ // add to the activated list
+ this.activeMembers.add(pendingMember);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Pending member is removed and added to the "
+ + "activated member list. [Member Id] %s",
+ memberId));
+ }
+ break;
+ }
+ }
+ }
+
+ public void addActiveMember(MemberContext ctxt) {
+ this.activeMembers.add(ctxt);
+ }
+
+ public void removeActiveMember(MemberContext ctxt) {
+ this.activeMembers.remove(ctxt);
+ }
+
+ public long getExpiryTime() {
+ return expiryTime;
+ }
+
+ public void setExpiryTime(long expiryTime) {
+ this.expiryTime = expiryTime;
+ }
+
+ public Map<String, MemberStatsContext> getMemberStatsContexts() {
+ return memberStatsContexts;
+ }
+
+ public MemberStatsContext getMemberStatsContext(String memberId) {
+ return memberStatsContexts.get(memberId);
+ }
+
+ public void addMemberStatsContext(MemberStatsContext ctxt) {
+ this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
+ }
+
+ public void removeMemberStatsContext(String memberId) {
+ this.memberStatsContexts.remove(memberId);
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public List<MemberContext> getActiveMembers() {
+ return activeMembers;
+ }
+
+ public boolean removeActiveMemberById(String memberId) {
+ boolean removeActiveMember = false;
+ synchronized (activeMembers) {
+ Iterator<MemberContext> iterator = activeMembers.listIterator();
+ while (iterator.hasNext()) {
+ MemberContext memberContext = iterator.next();
+ if (memberId.equals(memberContext.getMemberId())) {
+ iterator.remove();
+ removeActiveMember = true;
+
+ break;
+ }
+ }
+ }
+ return removeActiveMember;
+ }
+
+ public boolean activeMemberExist(String memberId) {
+
+ for (MemberContext memberContext : activeMembers) {
+ if (memberId.equals(memberContext.getMemberId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private class PendingMemberWatcher implements Runnable {
+ private KubernetesClusterContext ctxt;
+
+ public PendingMemberWatcher(KubernetesClusterContext ctxt) {
+ this.ctxt = ctxt;
+ }
+
+ @Override
+ public void run() {
+
+ while (true) {
+ long expiryTime = ctxt.getExpiryTime();
+ List<MemberContext> pendingMembers = ctxt.getPendingMembers();
+
+ synchronized (pendingMembers) {
+ Iterator<MemberContext> iterator = pendingMembers
+ .listIterator();
+ while (iterator.hasNext()) {
+ MemberContext pendingMember = iterator.next();
+
+ if (pendingMember == null) {
+ continue;
+ }
+ long pendingTime = System.currentTimeMillis()
+ - pendingMember.getInitTime();
+ if (pendingTime >= expiryTime) {
+
+ // terminate all containers of this cluster
+ try {
+ CloudControllerClient.getInstance().terminateAllContainers(clusterId);
+ iterator.remove();
+ } catch (TerminationException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ }
+ }
+ }
+
+ try {
+ // TODO find a constant
+ Thread.sleep(15000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+
+ }
+
+ public float getAverageRequestsInFlight() {
+ return requestsInFlight.getAverage();
+ }
+
+ public void setAverageRequestsInFlight(float averageRequestsInFlight) {
+ requestsInFlight.setAverage(averageRequestsInFlight);
+ averageRifReset = true;
+ if (secondDerivativeRifRest && gradientRifReset) {
+ rifReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Requests in flights stats are reset, "
+ + "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public float getRequestsInFlightSecondDerivative() {
+ return requestsInFlight.getSecondDerivative();
+ }
+
+ public void setRequestsInFlightSecondDerivative(
+ float requestsInFlightSecondDerivative) {
+ requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
+ secondDerivativeRifRest = true;
+ if (averageRifReset && gradientRifReset) {
+ rifReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public float getRequestsInFlightGradient() {
+ return requestsInFlight.getGradient();
+ }
+
+ public void setRequestsInFlightGradient(float requestsInFlightGradient) {
+ requestsInFlight.setGradient(requestsInFlightGradient);
+ gradientRifReset = true;
+ if (secondDerivativeRifRest && averageRifReset) {
+ rifReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public boolean isRifReset() {
+ return rifReset;
+ }
+
+ public void setRifReset(boolean rifReset) {
+ this.rifReset = rifReset;
+ this.averageRifReset = rifReset;
+ this.gradientRifReset = rifReset;
+ this.secondDerivativeRifRest = rifReset;
+ }
+
+ public float getAverageMemoryConsumption() {
+ return memoryConsumption.getAverage();
+ }
+
+ public void setAverageMemoryConsumption(float averageMemoryConsumption) {
+ memoryConsumption.setAverage(averageMemoryConsumption);
+ averageMemoryConsumptionReset = true;
+ if (secondDerivativeMemoryConsumptionRest
+ && gradientMemoryConsumptionReset) {
+ memoryConsumptionReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public float getMemoryConsumptionSecondDerivative() {
+ return memoryConsumption.getSecondDerivative();
+ }
+
+ public void setMemoryConsumptionSecondDerivative(
+ float memoryConsumptionSecondDerivative) {
+ memoryConsumption
+ .setSecondDerivative(memoryConsumptionSecondDerivative);
+ secondDerivativeMemoryConsumptionRest = true;
+ if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
+ memoryConsumptionReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public float getMemoryConsumptionGradient() {
+ return memoryConsumption.getGradient();
+ }
+
+ public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
+ memoryConsumption.setGradient(memoryConsumptionGradient);
+ gradientMemoryConsumptionReset = true;
+ if (secondDerivativeMemoryConsumptionRest
+ && averageMemoryConsumptionReset) {
+ memoryConsumptionReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public boolean isMemoryConsumptionReset() {
+ return memoryConsumptionReset;
+ }
+
+ public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
+ this.memoryConsumptionReset = memoryConsumptionReset;
+ this.averageMemoryConsumptionReset = memoryConsumptionReset;
+ this.gradientMemoryConsumptionReset = memoryConsumptionReset;
+ this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
+ }
+
+
+ public float getAverageLoadAverage() {
+ return loadAverage.getAverage();
+ }
+
+ public void setAverageLoadAverage(float averageLoadAverage) {
+ loadAverage.setAverage(averageLoadAverage);
+ averageLoadAverageReset = true;
+ if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
+ loadAverageReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Load average stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public float getLoadAverageSecondDerivative() {
+ return loadAverage.getSecondDerivative();
+ }
+
+ public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
+ loadAverage.setSecondDerivative(loadAverageSecondDerivative);
+ secondDerivativeLoadAverageRest = true;
+ if (averageLoadAverageReset && gradientLoadAverageReset) {
+ loadAverageReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Load average stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public float getLoadAverageGradient() {
+ return loadAverage.getGradient();
+ }
+
+ public void setLoadAverageGradient(float loadAverageGradient) {
+ loadAverage.setGradient(loadAverageGradient);
+ gradientLoadAverageReset = true;
+ if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
+ loadAverageReset = true;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Load average stats are reset, ready to do scale check "
+ + "[kub cluster] %s", this.kubernetesClusterId));
+ }
+ }
+ }
+
+ public boolean isLoadAverageReset() {
+ return loadAverageReset;
+ }
+
+ public void setLoadAverageReset(boolean loadAverageReset) {
+ this.loadAverageReset = loadAverageReset;
+ this.averageLoadAverageReset = loadAverageReset;
+ this.gradientLoadAverageReset = loadAverageReset;
+ this.secondDerivativeLoadAverageRest = loadAverageReset;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
index ac8b61a..bd3a6c3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/MemberStatsContext.java
@@ -31,10 +31,10 @@ public class MemberStatsContext {
private MemoryConsumption memoryConsumption;
private String memberId;
- public MemberStatsContext(String memberId){
+ public MemberStatsContext(String memberId) {
this.memberId = memberId;
memoryConsumption = new MemoryConsumption();
- loadAverage = new LoadAverage();
+ loadAverage = new LoadAverage();
}
public String getMemberId() {
@@ -52,4 +52,29 @@ public class MemberStatsContext {
public MemoryConsumption getMemoryConsumption() {
return memoryConsumption;
}
+
+ public void setAverageLoadAverage(float value) {
+ loadAverage.setAverage(value);
+ }
+
+ public void setAverageMemoryConsumption(float value) {
+ memoryConsumption.setAverage(value);
+ }
+
+ public void setGradientOfLoadAverage(float value) {
+ loadAverage.setGradient(value);
+ }
+
+ public void setGradientOfMemoryConsumption(float value) {
+ memoryConsumption.setGradient(value);
+ }
+
+ public void setSecondDerivativeOfLoadAverage(float value) {
+ loadAverage.setSecondDerivative(value);
+ }
+
+ public void setSecondDerivativeOfMemoryConsumption(float value) {
+ memoryConsumption.setSecondDerivative(value);
+ }
+
}
[4/7] code review changes to cluster monitors
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
new file mode 100644
index 0000000..d90e0b6
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+
+/*
+ * Every kubernetes cluster monitor should extend this class
+ */
+public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
+
+ private KubernetesClusterContext kubernetesClusterCtxt;
+ protected AutoscalePolicy autoscalePolicy;
+
+ protected KubernetesClusterMonitor(String clusterId, String serviceId,
+ KubernetesClusterContext kubernetesClusterContext,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+ AutoscalePolicy autoscalePolicy) {
+
+ super(clusterId, serviceId, autoscalerRuleEvaluator);
+ this.kubernetesClusterCtxt = kubernetesClusterContext;
+ this.autoscalePolicy = autoscalePolicy;
+ }
+
+ @Override
+ public void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent) {
+
+ String clusterId = averageLoadAverageEvent.getClusterId();
+ float value = averageLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageLoadAverage(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ @Override
+ public void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
+ float value = gradientOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setLoadAverageGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+ float value = secondDerivativeOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setLoadAverageSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
+ float value = averageMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
+ + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageMemoryConsumption(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+ float value = gradientOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setMemoryConsumptionGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+ float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+ float value = averageRequestsInFlightEvent.getValue();
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageRequestsInFlight(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+ float value = gradientOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setRequestsInFlightGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+ float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageMemoryConsumptionEvent.getValue();
+ memberStatsContext.setAverageMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfMemoryConsumptionEvent.getValue();
+ memberStatsContext.setGradientOfMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+ }
+
+ @Override
+ public void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ float value = memberAverageLoadAverageEvent.getValue();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ memberStatsContext.setAverageLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfLoadAverageEvent.getValue();
+ memberStatsContext.setGradientOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+ memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+ // kill the container
+ }
+
+ @Override
+ public void handleMemberStartedEvent(
+ MemberStartedEvent memberStartedEvent) {
+
+ }
+
+ @Override
+ public void handleMemberActivatedEvent(
+ MemberActivatedEvent memberActivatedEvent) {
+
+ KubernetesClusterContext kubernetesClusterContext;
+ kubernetesClusterContext = getKubernetesClusterCtxt();
+ String memberId = memberActivatedEvent.getMemberId();
+ kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added successfully: "
+ + "[member] %s", memberId));
+ }
+ kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // because we just kill the containers
+ }
+
+ @Override
+ public void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // because we just kill the containers
+ }
+
+ @Override
+ public void handleMemberTerminatedEvent(
+ MemberTerminatedEvent memberTerminatedEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // because we just kill the containers
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+
+ }
+
+ public KubernetesClusterContext getKubernetesClusterCtxt() {
+ return kubernetesClusterCtxt;
+ }
+
+ public void setKubernetesClusterCtxt(
+ KubernetesClusterContext kubernetesClusterCtxt) {
+ this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+ }
+
+ public AutoscalePolicy getAutoscalePolicy() {
+ return autoscalePolicy;
+ }
+
+ public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
+ this.autoscalePolicy = autoscalePolicy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
new file mode 100644
index 0000000..3c81ba3
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor;
+
+import java.util.Properties;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.SpawningException;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/*
+ * It is monitoring a kubernetes service cluster periodically.
+ */
+public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
+
+ private String lbReferenceType;
+ private int numberOfReplicasInServiceCluster = 0;
+ int retryInterval = 60000;
+
+ public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
+ String serviceClusterID, String serviceId,
+ AutoscalePolicy autoscalePolicy) {
+ super(serviceClusterID, serviceId, kubernetesClusterCtxt,
+ new AutoscalerRuleEvaluator(), autoscalePolicy);
+ readConfigurations();
+ }
+
+ @Override
+ public void run() {
+
+ while (!isDestroyed()) {
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
+ }
+ try {
+ if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+ + ClusterStatus.In_Maintenance + " mode......");
+ }
+ }
+ } catch (Exception e) {
+ log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
+ e);
+ }
+ try {
+ Thread.sleep(getMonitorIntervalMilliseconds());
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+
+ @Override
+ protected void monitor() {
+
+ int minReplicas;
+ try {
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(getServiceId());
+ Cluster cluster = service.getCluster(getClusterId());
+ Properties props = cluster.getProperties();
+ minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+
+ // is container created successfully?
+ boolean success = false;
+ String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID();
+ int activeMembers = getKubernetesClusterCtxt().getActiveMembers().size();
+ int pendingMembers = getKubernetesClusterCtxt().getPendingMembers().size();
+ int nonTerminatedMembers = activeMembers + pendingMembers;
+
+ if (nonTerminatedMembers == 0) {
+ while (!success) {
+ try {
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ MemberContext memberContext = ccClient.createContainer(kubernetesClusterId, getClusterId());
+ if (null != memberContext) {
+ getKubernetesClusterCtxt().addPendingMember(memberContext);
+ success = true;
+ numberOfReplicasInServiceCluster = minReplicas;
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Pending member added, [member] %s [kub cluster] %s",
+ memberContext.getMemberId(), kubernetesClusterId));
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Returned member context is null, did not add to pending members");
+ }
+ }
+ } catch (SpawningException spawningException) {
+ if (log.isDebugEnabled()) {
+ String message = "Cannot create containers, will retry in "
+ + (retryInterval / 1000) + "s";
+ log.debug(message, spawningException);
+ }
+ } catch (Exception exception) {
+ if (log.isDebugEnabled()) {
+ String message = "Error while creating containers, will retry in "
+ + (retryInterval / 1000) + "s";
+ log.debug(message, exception);
+ }
+ }
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ getMinCheckKnowledgeSession().dispose();
+ getScaleCheckKnowledgeSession().dispose();
+ setDestroyed(true);
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
+ }
+ }
+
+ @Override
+ protected void readConfigurations() {
+ XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+ int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
+ setMonitorIntervalMilliseconds(monitorInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "KubernetesServiceClusterMonitor "
+ + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
+ + ", clusterId=" + getClusterId()
+ + ", serviceId=" + getServiceId() + "]";
+ }
+
+ public String getLbReferenceType() {
+ return lbReferenceType;
+ }
+
+ public void setLbReferenceType(String lbReferenceType) {
+ this.lbReferenceType = lbReferenceType;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
index ffd6713..38ed1a6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
@@ -22,62 +22,581 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.MemberStatsContext;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.TerminationException;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
/**
* Is responsible for monitoring a service cluster. This runs periodically
* and perform minimum instance check and scaling check using the underlying
* rules engine.
- *
*/
- abstract public class VMClusterMonitor extends AbstractClusterMonitor{
-
- private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
- // Map<NetworkpartitionId, Network Partition Context>
- protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
- protected DeploymentPolicy deploymentPolicy;
- protected AutoscalePolicy autoscalePolicy;
-
- protected VMClusterMonitor(String clusterId, String serviceId, ClusterType clusterType,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator,
- DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
- Map<String, NetworkPartitionContext> networkPartitionCtxts) {
- super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator);
- this.deploymentPolicy = deploymentPolicy;
- this.autoscalePolicy = autoscalePolicy;
- this.networkPartitionCtxts = networkPartitionCtxts;
- }
-
- public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
- log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
- String networkPartitionId = member.getNetworkPartitionId();
- if(networkPartitionCtxts.containsKey(networkPartitionId)) {
- log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
- return networkPartitionCtxts.get(networkPartitionId);
- }
- log.info("returning null getNetworkPartitionCtxt");
- return null;
- }
-
- public String getPartitionOfMember(String memberId){
- for(Service service: TopologyManager.getTopology().getServices()){
- for(Cluster cluster: service.getClusters()){
- if(cluster.memberExists(memberId)){
+abstract public class VMClusterMonitor extends AbstractClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
+ // Map<NetworkpartitionId, Network Partition Context>
+ protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
+ protected DeploymentPolicy deploymentPolicy;
+ protected AutoscalePolicy autoscalePolicy;
+
+ protected VMClusterMonitor(String clusterId, String serviceId,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+ DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
+ Map<String, NetworkPartitionContext> networkPartitionCtxts) {
+ super(clusterId, serviceId, autoscalerRuleEvaluator);
+ this.deploymentPolicy = deploymentPolicy;
+ this.autoscalePolicy = autoscalePolicy;
+ this.networkPartitionCtxts = networkPartitionCtxts;
+ }
+
+ @Override
+ public void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent) {
+
+ String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = averageLoadAverageEvent.getClusterId();
+ float value = averageLoadAverageEvent.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setAverageLoadAverage(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+
+ }
+
+ @Override
+ public void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+ String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
+ float value = gradientOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setLoadAverageGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+ String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+ float value = secondDerivativeOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setLoadAverageSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+ String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
+ float value = averageMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
+ + "[value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setAverageMemoryConsumption(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Network partition context is not available for :"
+ + " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+ String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+ float value = gradientOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setMemoryConsumptionGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+ String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+ float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+ String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
+ float value = averageRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setAverageRequestsInFlight(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+ String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+ float value = gradientOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setRequestsInFlightGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+ String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+ float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setRequestsInFlightSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageMemoryConsumptionEvent.getValue();
+ memberStatsContext.setAverageMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfMemoryConsumptionEvent.getValue();
+ memberStatsContext.setGradientOfMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+ }
+
+ @Override
+ public void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageLoadAverageEvent.getValue();
+ memberStatsContext.setAverageLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfLoadAverageEvent.getValue();
+ memberStatsContext.setGradientOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+ memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+ String memberId = memberFaultEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+ }
+ return;
+ }
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ NetworkPartitionContext nwPartitionCtxt;
+ nwPartitionCtxt = getNetworkPartitionCtxt(member);
+ String partitionId = getPartitionOfMember(memberId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+ if (!partitionCtxt.activeMemberExist(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Could not find the active member in partition context, "
+ + "[member] %s ", memberId));
+ }
+ return;
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ try {
+ ccClient.terminate(memberId);
+ } catch (TerminationException e) {
+ String msg = "TerminationException " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ // remove from active member list
+ partitionCtxt.removeActiveMemberById(memberId);
+ if (log.isInfoEnabled()) {
+ String clusterId = memberFaultEvent.getClusterId();
+ log.info(String.format("Faulty member is terminated and removed from the active members list: "
+ + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ }
+ }
+
+ @Override
+ public void handleMemberStartedEvent(
+ MemberStartedEvent memberStartedEvent) {
+
+ }
+
+ @Override
+ public void handleMemberActivatedEvent(
+ MemberActivatedEvent memberActivatedEvent) {
+
+ String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+ String partitionId = memberActivatedEvent.getPartitionId();
+ String memberId = memberActivatedEvent.getMemberId();
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext;
+ partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added successfully: "
+ + "[member] %s", memberId));
+ }
+ partitionContext.movePendingMemberToActiveMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+ String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
+ String partitionId = maintenanceModeEvent.getPartitionId();
+ String memberId = maintenanceModeEvent.getMemberId();
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member has been moved as pending termination: "
+ + "[member] %s", memberId));
+ }
+ partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+ NetworkPartitionContext nwPartitionCtxt;
+ String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
+ nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+
+ // start a new member in the same Partition
+ String memberId = memberReadyToShutdownEvent.getMemberId();
+ String partitionId = getPartitionOfMember(memberId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+ // terminate the shutdown ready member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ try {
+ ccClient.terminate(memberId);
+ // remove from active member list
+ partitionCtxt.removeActiveMemberById(memberId);
+
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
+ log.info(String.format("Member is terminated and removed from the active members list: "
+ + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ } catch (TerminationException e) {
+ String msg = "TerminationException" + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+
+ @Override
+ public void handleMemberTerminatedEvent(
+ MemberTerminatedEvent memberTerminatedEvent) {
+
+ String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+ String memberId = memberTerminatedEvent.getMemberId();
+ String partitionId = memberTerminatedEvent.getPartitionId();
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
+ partitionContext.removeMemberStatsContext(memberId);
+
+ if (partitionContext.removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (partitionContext.removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (partitionContext.removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from "
+ + "active members list", memberId));
+ } else if (partitionContext.removeObsoleteMember(memberId)) {
+ log.warn(String.format("Member's obsolated timeout has been expired and "
+ + "it is removed from obsolated members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, "
+ + "pending and termination pending", memberId));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed successfully: "
+ + "[member] %s", memberId));
+ }
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+
+ }
+
+ private String getNetworkPartitionIdByMemberId(String memberId) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId).getNetworkPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ private Member getMemberByMemberId(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
+ log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
+ String networkPartitionId = member.getNetworkPartitionId();
+ if (networkPartitionCtxts.containsKey(networkPartitionId)) {
+ log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
+ return networkPartitionCtxts.get(networkPartitionId);
+ }
+ log.info("returning null getNetworkPartitionCtxt");
+ return null;
+ }
+
+ public String getPartitionOfMember(String memberId) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
return cluster.getMember(memberId).getPartitionId();
}
}
}
return null;
- }
-
+ }
+
public DeploymentPolicy getDeploymentPolicy() {
return deploymentPolicy;
}
@@ -92,7 +611,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
this.autoscalePolicy = autoscalePolicy;
- }
+ }
public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
return networkPartitionCtxts;
@@ -113,7 +632,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
}
-
+
public NetworkPartitionContext getPartitionCtxt(String id) {
return this.networkPartitionCtxts.get(id);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index f547cb1..a0c66f0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -18,36 +18,39 @@
*/
package org.apache.stratos.autoscaler.monitor;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
import org.apache.stratos.autoscaler.PartitionContext;
import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
import org.apache.stratos.autoscaler.util.AutoScalerConstants;
import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
/**
* Is responsible for monitoring a service cluster. This runs periodically
* and perform minimum instance check and scaling check using the underlying
* rules engine.
- *
*/
-public class VMLbClusterMonitor extends VMClusterMonitor{
+public class VMLbClusterMonitor extends VMClusterMonitor {
private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- super(clusterId, serviceId, ClusterType.VMLbCluster, new AutoscalerRuleEvaluator(),
- deploymentPolicy, autoscalePolicy,
- new ConcurrentHashMap<String, NetworkPartitionContext>());
+ AutoscalePolicy autoscalePolicy) {
+ super(clusterId, serviceId, new AutoscalerRuleEvaluator(),
+ deploymentPolicy, autoscalePolicy,
+ new ConcurrentHashMap<String, NetworkPartitionContext>());
readConfigurations();
}
@@ -56,27 +59,27 @@ public class VMLbClusterMonitor extends VMClusterMonitor{
while (!isDestroyed()) {
if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor is running.. "+this.toString());
+ log.debug("VMLbClusterMonitor is running.. " + this.toString());
}
try {
- if( !ClusterStatus.In_Maintenance.equals(getStatus())) {
+ if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
monitor();
} else {
if (log.isDebugEnabled()) {
log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
- ClusterStatus.In_Maintenance + " mode......");
+ ClusterStatus.In_Maintenance + " mode......");
}
}
} catch (Exception e) {
- log.error("VMLbClusterMonitor : Monitor failed. "+this.toString(), e);
+ log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
}
try {
- Thread.sleep(getMonitorInterval());
+ Thread.sleep(getMonitorIntervalMilliseconds());
} catch (InterruptedException ignore) {
}
}
}
-
+
@Override
protected void monitor() {
// TODO make this concurrent
@@ -84,21 +87,21 @@ public class VMLbClusterMonitor extends VMClusterMonitor{
// minimum check per partition
for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
- .values()) {
+ .values()) {
if (partitionContext != null) {
getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
-
+
if (log.isDebugEnabled()) {
log.debug(String.format("Running minimum check for partition %s ",
partitionContext.getPartitionId()));
}
minCheckFactHandle =
- AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
- minCheckFactHandle,
- partitionContext);
+ AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
+ minCheckFactHandle,
+ partitionContext);
// start only in the first partition context
break;
}
@@ -106,25 +109,55 @@ public class VMLbClusterMonitor extends VMClusterMonitor{
}
}
- }
-
- @Override
+ }
+
+ @Override
public void destroy() {
getMinCheckKnowledgeSession().dispose();
getMinCheckKnowledgeSession().dispose();
setDestroyed(true);
- if(log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor Drools session has been disposed. "+this.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
}
}
-
+
@Override
- protected void readConfigurations () {
+ protected void readConfigurations() {
XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
- setMonitorInterval(monitorInterval);
+ setMonitorIntervalMilliseconds(monitorInterval);
if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor task interval: " + getMonitorInterval());
+ log.debug("VMLbClusterMonitor task interval: " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+
+ String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
+ String clusterId = clusterRemovedEvent.getClusterId();
+ DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
+ if (depPolicy != null) {
+ List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+ .getNetworkPartitionLbHolders(depPolicy);
+
+ for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+ // removes lb cluster ids
+ boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+ if (isRemoved) {
+ log.info("Removed the lb cluster [id]:"
+ + clusterId
+ + " reference from Network Partition [id]: "
+ + networkPartitionLbHolder
+ .getNetworkPartitionId());
+
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(networkPartitionLbHolder);
+ }
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index 9e97e19..0452e32 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -35,14 +35,12 @@ import org.apache.stratos.autoscaler.util.ConfUtil;
import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
import org.apache.stratos.cloud.controller.stub.pojo.Properties;
import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
/**
* Is responsible for monitoring a service cluster. This runs periodically
* and perform minimum instance check and scaling check using the underlying
* rules engine.
- *
*/
public class VMServiceClusterMonitor extends VMClusterMonitor {
@@ -50,11 +48,12 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
private String lbReferenceType;
private boolean hasPrimary;
- public VMServiceClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- super(clusterId, serviceId, ClusterType.VMServiceCluster, new AutoscalerRuleEvaluator(),
- deploymentPolicy, autoscalePolicy,
- new ConcurrentHashMap<String, NetworkPartitionContext>());
+ public VMServiceClusterMonitor(String clusterId, String serviceId,
+ DeploymentPolicy deploymentPolicy,
+ AutoscalePolicy autoscalePolicy) {
+ super(clusterId, serviceId, new AutoscalerRuleEvaluator(),
+ deploymentPolicy, autoscalePolicy,
+ new ConcurrentHashMap<String, NetworkPartitionContext>());
readConfigurations();
}
@@ -73,19 +72,19 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
log.debug("VMServiceClusterMonitor is running.. " + this.toString());
}
try {
- if(!ClusterStatus.In_Maintenance.equals(getStatus())) {
+ if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
monitor();
} else {
if (log.isDebugEnabled()) {
log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
- ClusterStatus.In_Maintenance + " mode......");
+ ClusterStatus.In_Maintenance + " mode......");
}
}
} catch (Exception e) {
log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
}
try {
- Thread.sleep(getMonitorInterval());
+ Thread.sleep(getMonitorIntervalMilliseconds());
} catch (InterruptedException ignore) {
}
}
@@ -105,13 +104,13 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
List<String> primaryMemberListInPartition = new ArrayList<String>();
// get active primary members in this partition context
for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)){
+ if (isPrimaryMember(memberContext)) {
primaryMemberListInPartition.add(memberContext.getMemberId());
}
}
// get pending primary members in this partition context
for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)){
+ if (isPrimaryMember(memberContext)) {
primaryMemberListInPartition.add(memberContext.getMemberId());
}
}
@@ -134,19 +133,19 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
if (log.isDebugEnabled()) {
- log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
- + " flag of loadAverageReset" + loadAverageReset);
+ log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
+ + " flag of loadAverageReset" + loadAverageReset);
}
if (rifReset || memoryConsumptionReset || loadAverageReset) {
- getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
//scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
- getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
- getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
- getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
- getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
- getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
- getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
- getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
+ getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
+ getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+ getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+ getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+ getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+ getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+ getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
if (log.isDebugEnabled()) {
log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
@@ -161,12 +160,12 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
networkPartitionContext.setLoadAverageReset(false);
} else if (log.isDebugEnabled()) {
log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
- "cycle for network partition %s", networkPartitionContext.getId()));
+ "cycle for network partition %s", networkPartitionContext.getId()));
}
}
}
-
- private boolean isPrimaryMember(MemberContext memberContext){
+
+ private boolean isPrimaryMember(MemberContext memberContext) {
Properties props = memberContext.getProperties();
if (log.isDebugEnabled()) {
log.debug(" Properties [" + props + "] ");
@@ -176,7 +175,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
if (prop.getName().equals("PRIMARY")) {
if (Boolean.parseBoolean(prop.getValue())) {
log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
- "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
+ "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
return true;
}
}
@@ -184,33 +183,33 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
}
return false;
}
-
+
@Override
- protected void readConfigurations () {
+ protected void readConfigurations() {
XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
- setMonitorInterval(monitorInterval);
+ setMonitorIntervalMilliseconds(monitorInterval);
if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor task interval: " + getMonitorInterval());
+ log.debug("VMServiceClusterMonitor task interval: " + getMonitorIntervalMilliseconds());
}
}
-
- @Override
+
+ @Override
public void destroy() {
getMinCheckKnowledgeSession().dispose();
getScaleCheckKnowledgeSession().dispose();
setDestroyed(true);
- if(log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor Drools session has been disposed. "+this.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
}
}
@Override
public String toString() {
return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
- ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
- ", lbReferenceType=" + lbReferenceType +
- ", hasPrimary=" + hasPrimary + " ]";
+ ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
+ ", lbReferenceType=" + lbReferenceType +
+ ", hasPrimary=" + hasPrimary + " ]";
}
public String getLbReferenceType() {
[3/7] code review changes to cluster monitors
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index 4f58e8d..e7c16fe 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -19,39 +19,16 @@
package org.apache.stratos.autoscaler.util;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
import org.apache.axiom.om.OMElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.VMServiceClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.DockerServiceClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.VMLbClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.util.Constants;
-
-import javax.xml.namespace.QName;
-
-import java.util.*;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
/**
* This class contains utility methods used by Autoscaler.
@@ -64,302 +41,6 @@ public class AutoscalerUtil {
}
-
- /**
- * Updates ClusterContext for given cluster
- *
- * @param cluster
- * @return ClusterMonitor - Updated ClusterContext
- * @throws PolicyValidationException
- * @throws PartitionValidationException
- */
-// public static ClusterMonitor getClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-// // FIXME fix the following code to correctly update
-// // AutoscalerContext context = AutoscalerContext.getInstance();
-// if (null == cluster) {
-// return null;
-// }
-//
-// String autoscalePolicyName = cluster.getAutoscalePolicyName();
-// String deploymentPolicyName = cluster.getDeploymentPolicyName();
-//
-// if (log.isDebugEnabled()) {
-// log.debug("Deployment policy name: " + deploymentPolicyName);
-// log.debug("Autoscaler policy name: " + autoscalePolicyName);
-// }
-//
-// AutoscalePolicy policy =
-// PolicyManager.getInstance()
-// .getAutoscalePolicy(autoscalePolicyName);
-// DeploymentPolicy deploymentPolicy =
-// PolicyManager.getInstance()
-// .getDeploymentPolicy(deploymentPolicyName);
-//
-// if (deploymentPolicy == null) {
-// String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-// log.error(msg);
-// throw new PolicyValidationException(msg);
-// }
-//
-// Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-// if (allPartitions == null) {
-// String msg =
-// "Deployment Policy's Partitions are null. Policy name: " +
-// deploymentPolicyName;
-// log.error(msg);
-// throw new PolicyValidationException(msg);
-// }
-//
-// CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-//
-// ClusterMonitor clusterMonitor =
-// new ClusterMonitor(cluster.getClusterId(),
-// cluster.getServiceName(),
-// deploymentPolicy, policy);
-// clusterMonitor.setStatus(ClusterStatus.Created);
-//
-// for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
-//
-// NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-// partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
-//
-// for(Partition partition: partitionGroup.getPartitions()){
-// PartitionContext partitionContext = new PartitionContext(partition);
-// partitionContext.setServiceName(cluster.getServiceName());
-// partitionContext.setProperties(cluster.getProperties());
-// partitionContext.setNetworkPartitionId(partitionGroup.getId());
-//
-// for (Member member: cluster.getMembers()){
-// String memberId = member.getMemberId();
-// if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
-// MemberContext memberContext = new MemberContext();
-// memberContext.setClusterId(member.getClusterId());
-// memberContext.setMemberId(memberId);
-// memberContext.setPartition(partition);
-// memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-//
-// if(MemberStatus.Activated.equals(member.getStatus())){
-// partitionContext.addActiveMember(memberContext);
-//// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//// partitionContext.incrementCurrentActiveMemberCount(1);
-//
-// } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){
-// partitionContext.addPendingMember(memberContext);
-//
-//// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-// } else if(MemberStatus.Suspended.equals(member.getStatus())){
-//// partitionContext.addFaultyMember(memberId);
-// }
-// partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-// if(log.isInfoEnabled()){
-// log.info(String.format("Member stat context has been added: [member] %s", memberId));
-// }
-// }
-//
-// }
-// networkPartitionContext.addPartitionContext(partitionContext);
-// if(log.isInfoEnabled()){
-// log.info(String.format("Partition context has been added: [partition] %s",
-// partitionContext.getPartitionId()));
-// }
-// }
-//
-// clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-// if(log.isInfoEnabled()){
-// log.info(String.format("Network partition context has been added: [network partition] %s",
-// networkPartitionContext.getId()));
-// }
-// }
-//
-//
-// // find lb reference type
-// java.util.Properties props = cluster.getProperties();
-//
-// if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
-// String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-// clusterMonitor.setLbReferenceType(value);
-// if(log.isDebugEnabled()) {
-// log.debug("Set the lb reference type: "+value);
-// }
-// }
-//
-// // set hasPrimary property
-// // hasPrimary is true if there are primary members available in that cluster
-// clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
-//
-// log.info("Cluster monitor created: "+clusterMonitor.toString());
-// return clusterMonitor;
-// }
-//
-// private static Properties convertMemberPropsToMemberContextProps(
-// java.util.Properties properties) {
-// Properties props = new Properties();
-// for (Map.Entry<Object, Object> e : properties.entrySet() ) {
-// Property prop = new Property();
-// prop.setName((String)e.getKey());
-// prop.setValue((String)e.getValue());
-// props.addProperties(prop);
-// }
-// return props;
-// }
-//
-//
-// public static LbClusterMonitor getLBClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-// // FIXME fix the following code to correctly update
-// // AutoscalerContext context = AutoscalerContext.getInstance();
-// if (null == cluster) {
-// return null;
-// }
-//
-// String autoscalePolicyName = cluster.getAutoscalePolicyName();
-// String deploymentPolicyName = cluster.getDeploymentPolicyName();
-//
-// if (log.isDebugEnabled()) {
-// log.debug("Deployment policy name: " + deploymentPolicyName);
-// log.debug("Autoscaler policy name: " + autoscalePolicyName);
-// }
-//
-// AutoscalePolicy policy =
-// PolicyManager.getInstance()
-// .getAutoscalePolicy(autoscalePolicyName);
-// DeploymentPolicy deploymentPolicy =
-// PolicyManager.getInstance()
-// .getDeploymentPolicy(deploymentPolicyName);
-//
-// if (deploymentPolicy == null) {
-// String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-// log.error(msg);
-// throw new PolicyValidationException(msg);
-// }
-//
-// String clusterId = cluster.getClusterId();
-// LbClusterMonitor clusterMonitor =
-// new LbClusterMonitor(clusterId,
-// cluster.getServiceName(),
-// deploymentPolicy, policy);
-// clusterMonitor.setStatus(ClusterStatus.Created);
-// // partition group = network partition context
-// for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-//
-// NetworkPartitionLbHolder networkPartitionLbHolder =
-// PartitionManager.getInstance()
-// .getNetworkPartitionLbHolder(partitionGroup.getId());
-//// PartitionManager.getInstance()
-//// .getNetworkPartitionLbHolder(partitionGroup.getId());
-// // FIXME pick a random partition
-// Partition partition =
-// partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
-// PartitionContext partitionContext = new PartitionContext(partition);
-// partitionContext.setServiceName(cluster.getServiceName());
-// partitionContext.setProperties(cluster.getProperties());
-// partitionContext.setNetworkPartitionId(partitionGroup.getId());
-// partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
-//
-// NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
-// partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ;
-// for (Member member : cluster.getMembers()) {
-// String memberId = member.getMemberId();
-// if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
-// MemberContext memberContext = new MemberContext();
-// memberContext.setClusterId(member.getClusterId());
-// memberContext.setMemberId(memberId);
-// memberContext.setPartition(partition);
-//
-// if (MemberStatus.Activated.equals(member.getStatus())) {
-// partitionContext.addActiveMember(memberContext);
-//// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-//// partitionContext.incrementCurrentActiveMemberCount(1);
-// } else if (MemberStatus.Created.equals(member.getStatus()) ||
-// MemberStatus.Starting.equals(member.getStatus())) {
-// partitionContext.addPendingMember(memberContext);
-//// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-// } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-//// partitionContext.addFaultyMember(memberId);
-// }
-//
-// partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-// if(log.isInfoEnabled()){
-// log.info(String.format("Member stat context has been added: [member] %s", memberId));
-// }
-// }
-//
-// }
-// networkPartitionContext.addPartitionContext(partitionContext);
-//
-// // populate lb cluster id in network partition context.
-// java.util.Properties props = cluster.getProperties();
-//
-// // get service type of load balanced cluster
-// String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-//
-// if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
-// String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-//
-// if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
-// networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
-//
-// } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
-// String serviceName = cluster.getServiceName();
-// // TODO: check if this is correct
-// networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
-//
-// if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
-// networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
-// if (log.isDebugEnabled()) {
-// log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
-// }
-// }
-// }
-// }
-//
-// clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-// }
-//
-// log.info("LB Cluster monitor created: "+clusterMonitor.toString());
-// return clusterMonitor;
-// }
-//
-// public static DockerClusterMonitor getDockerClusterMonitor(Cluster cluster) {
-//
-// if (null == cluster) {
-// return null;
-// }
-//
-// String autoscalePolicyName = cluster.getAutoscalePolicyName();
-// if (log.isDebugEnabled()) {
-// log.debug("Autoscaler policy name: " + autoscalePolicyName);
-// }
-//
-// AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-// java.util.Properties props = cluster.getProperties();
-// String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-// KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID);
-//
-// DockerClusterMonitor dockerClusterMonitor = new DockerClusterMonitor(
-// kubernetesClusterCtxt,
-// cluster.getClusterId(),
-// cluster.getServiceName(),
-// policy);
-//
-// dockerClusterMonitor.setStatus(ClusterStatus.Created);
-//
-// // find lb reference type
-// if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
-// String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-// dockerClusterMonitor.setLbReferenceType(value);
-// if(log.isDebugEnabled()) {
-// log.debug("Set the lb reference type: "+value);
-// }
-// }
-//
-//// // set hasPrimary property
-//// // hasPrimary is true if there are primary members available in that cluster
-//// dockerClusterMonitor.setHasPrimary(Boolean.parseBoolean(props.getProperty(Constants.IS_PRIMARY)));
-//
-// log.info("Docker cluster monitor created: "+ dockerClusterMonitor.toString());
-// return dockerClusterMonitor;
-// }
-
public static Properties getProperties(final OMElement elt) {
Iterator<?> it = elt.getChildrenWithName(new QName(AutoScalerConstants.PROPERTY_ELEMENT));
@@ -400,64 +81,4 @@ public class AutoscalerUtil {
properties.setProperties(propertyArray);
return properties;
}
-
-// public static LbClusterMonitor getLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-// if (null == cluster) {
-// return null;
-// }
-//
-// String autoscalePolicyName = cluster.getAutoscalePolicyName();
-// String deploymentPolicyName = cluster.getDeploymentPolicyName();
-//
-// if (log.isDebugEnabled()) {
-// log.debug("Deployment policy name: " + deploymentPolicyName);
-// log.debug("Autoscaler policy name: " + autoscalePolicyName);
-// }
-//
-// AutoscalePolicy policy =
-// PolicyManager.getInstance()
-// .getAutoscalePolicy(autoscalePolicyName);
-// DeploymentPolicy deploymentPolicy =
-// PolicyManager.getInstance()
-// .getDeploymentPolicy(deploymentPolicyName);
-//
-// if (deploymentPolicy == null) {
-// String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
-// log.error(msg);
-// throw new PolicyValidationException(msg);
-// }
-//
-// Partition[] allPartitions = deploymentPolicy.getAllPartitions();
-// if (allPartitions == null) {
-// String msg =
-// "Deployment Policy's Partitions are null. Policy name: " +
-// deploymentPolicyName;
-// log.error(msg);
-// throw new PolicyValidationException(msg);
-// }
-//
-// try {
-// validateExistenceOfPartions(allPartitions);
-// } catch (InvalidPartitionException e) {
-// String msg = "Deployment Policy is invalid. Policy name: " + deploymentPolicyName;
-// log.error(msg, e);
-// throw new PolicyValidationException(msg, e);
-// }
-//
-// CloudControllerClient.getInstance()
-// .validateDeploymentPolicy(cluster.getServiceName(),
-// allPartitions);
-//
-// LbClusterMonitor clusterMonitor =
-// new LbClusterMonitor(cluster.getClusterId(),
-// cluster.getServiceName(),
-// deploymentPolicy, policy);
-// for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
-//
-// NetworkPartitionContext networkPartitionContext
-// = PartitionManager.getInstance().getNetworkPartitionLbHolder(partitionGroup.getNetworkPartitionId());
-// clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
-// }
-// return null;
-// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java
deleted file mode 100644
index 8842fb6..0000000
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/enums/ClusterType.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.stratos.common.enums;
-
-public enum ClusterType {
- VMServiceCluster, VMLbCluster, DockerServiceCluster, DockerLbCluster;
-}
[2/7] git commit: using executor instead of nornal threads
Posted by sa...@apache.org.
using executor instead of nornal threads
Signed-off-by: sajhak <sa...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/244030bb
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/244030bb
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/244030bb
Branch: refs/heads/container-autoscaling
Commit: 244030bb906aadfc0058c84883fa96431d8d4a79
Parents: 3105610
Author: R-Rajkumar <rr...@gmail.com>
Authored: Mon Oct 6 11:11:08 2014 +0530
Committer: sajhak <sa...@gmail.com>
Committed: Mon Oct 6 23:11:29 2014 +0530
----------------------------------------------------------------------
.../topology/AutoscalerTopologyEventReceiver.java | 8 ++++----
.../autoscaler/monitor/AbstractClusterMonitor.java | 14 ++++++++++++++
.../monitor/KubernetesServiceClusterMonitor.java | 7 +------
.../autoscaler/monitor/VMLbClusterMonitor.java | 7 +------
.../autoscaler/monitor/VMServiceClusterMonitor.java | 7 +------
5 files changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index e857eaf..de058ec 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -342,10 +342,10 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
log.error(msg);
throw new RuntimeException(msg);
}
- //TODO private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- // scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS);
- Thread th = new Thread(monitor);
- th.start();
+
+// Thread th = new Thread(monitor);
+// th.start();
+ monitor.startScheduler();
AutoscalerContext.getInstance().addClusterMonitor(monitor);
if (log.isInfoEnabled()) {
log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index 6061c3b..e44bd72 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -18,6 +18,10 @@
*/
package org.apache.stratos.autoscaler.monitor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
@@ -62,6 +66,8 @@ public abstract class AbstractClusterMonitor implements Runnable {
private boolean isDestroyed;
private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
protected AbstractClusterMonitor(String clusterId, String serviceId,
AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
@@ -75,6 +81,14 @@ public abstract class AbstractClusterMonitor implements Runnable {
}
protected abstract void readConfigurations();
+
+ public void startScheduler() {
+ scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ protected void stopScheduler() {
+ scheduler.shutdownNow();
+ }
protected abstract void monitor();
http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
index 3c81ba3..93580d9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -59,7 +59,6 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
@Override
public void run() {
- while (!isDestroyed()) {
if (log.isDebugEnabled()) {
log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
}
@@ -76,11 +75,6 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
e);
}
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
}
@Override
@@ -148,6 +142,7 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
getMinCheckKnowledgeSession().dispose();
getScaleCheckKnowledgeSession().dispose();
setDestroyed(true);
+ stopScheduler();
if (log.isDebugEnabled()) {
log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index a0c66f0..f950f9d 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -57,7 +57,6 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
@Override
public void run() {
- while (!isDestroyed()) {
if (log.isDebugEnabled()) {
log.debug("VMLbClusterMonitor is running.. " + this.toString());
}
@@ -73,11 +72,6 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
} catch (Exception e) {
log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
}
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
}
@Override
@@ -116,6 +110,7 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
getMinCheckKnowledgeSession().dispose();
getMinCheckKnowledgeSession().dispose();
setDestroyed(true);
+ stopScheduler();
if (log.isDebugEnabled()) {
log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/244030bb/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index 0452e32..d8c9e69 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -67,7 +67,6 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
} catch (InterruptedException ignore) {
}
- while (!isDestroyed()) {
if (log.isDebugEnabled()) {
log.debug("VMServiceClusterMonitor is running.. " + this.toString());
}
@@ -83,11 +82,6 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
} catch (Exception e) {
log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
}
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
}
@Override
@@ -199,6 +193,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
getMinCheckKnowledgeSession().dispose();
getScaleCheckKnowledgeSession().dispose();
setDestroyed(true);
+ stopScheduler();
if (log.isDebugEnabled()) {
log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
}
[5/7] code review changes to cluster monitors
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1603aef..e857eaf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,29 +19,16 @@
package org.apache.stratos.autoscaler.message.receiver.topology;
-import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.PartitionValidationException;
import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.event.Event;
@@ -112,7 +99,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
-
try {
TopologyManager.acquireReadLock();
for (Service service : TopologyManager.getTopology().getServices()) {
@@ -121,167 +107,108 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
}
}
} catch (Exception e) {
- log.error("Error processing event", e);
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
} finally {
TopologyManager.releaseReadLock();
}
}
-
-
});
topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
@Override
protected void onEvent(Event event) {
try {
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- String memberId = memberReadyToShutdownEvent.getMemberId();
-
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- TopologyManager.acquireReadLock();
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- NetworkPartitionContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
- // start a new member in the same Partition
- String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
- // terminate the shutdown ready member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
+ monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
- } catch (TerminationException e) {
- log.error(e);
+ topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ log.info("Event received: " + event);
+ ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
+ Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId());
+ startClusterMonitor(cluster);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
} finally {
TopologyManager.releaseReadLock();
}
}
-
});
- topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- log.info("Event received: " + event);
- ClusterCreatedEvent e = (ClusterCreatedEvent) event;
- TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(e.getServiceName());
- Cluster cluster = service.getCluster(e.getClusterId());
- startClusterMonitor(cluster);
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- });
-
topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() {
@Override
protected void onEvent(Event event) {
try {
log.info("Event received: " + event);
- ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event;
+ ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = (ClusterMaintenanceModeEvent) event;
TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(e.getServiceName());
- Cluster cluster = service.getCluster(e.getClusterId());
- if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
- AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
- } else {
+ Service service = TopologyManager.getTopology().getService(clusterMaintenanceModeEvent.getServiceName());
+ Cluster cluster = service.getCluster(clusterMaintenanceModeEvent.getClusterId());
+ AbstractClusterMonitor monitor;
+ monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+ if (null == monitor) {
log.error("cluster monitor not exists for the cluster: " + cluster.toString());
+ return;
}
+ monitor.setStatus(clusterMaintenanceModeEvent.getStatus());
} catch (Exception e) {
- log.error("Error processing event", e);
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
} finally {
TopologyManager.releaseReadLock();
}
}
-
- });
+ });
topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
try {
- ClusterRemovedEvent e = (ClusterRemovedEvent) event;
- TopologyManager.acquireReadLock();
-
- String clusterId = e.getClusterId();
- String deploymentPolicy = e.getDeploymentPolicy();
-
- AbstractClusterMonitor monitor = null;
-
- if (e.isLbCluster()) {
- DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
- if (depPolicy != null) {
- List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
- .getNetworkPartitionLbHolders(depPolicy);
-
- for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
- // removes lb cluster ids
- boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
- if (isRemoved) {
- log.info("Removed the lb cluster [id]:"
- + clusterId
- + " reference from Network Partition [id]: "
- + networkPartitionLbHolder
- .getNetworkPartitionId());
-
- }
- if (log.isDebugEnabled()) {
- log.debug(networkPartitionLbHolder);
- }
-
- }
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ String clusterId = clusterRemovedEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
}
+ return;
}
-
- monitor = AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
-
- // runTerminateAllRule(monitor);
- if (monitor != null) {
- monitor.destroy();
- log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
- clusterId));
- }
+ monitor.handleClusterRemovedEvent(clusterRemovedEvent);
+ asCtx.removeClusterMonitor(clusterId);
+ monitor.destroy();
+ log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+ clusterId));
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
-
});
topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
@@ -295,70 +222,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
-
try {
- TopologyManager.acquireReadLock();
- MemberTerminatedEvent e = (MemberTerminatedEvent) event;
- String networkPartitionId = e.getNetworkPartitionId();
- String clusterId = e.getClusterId();
- String partitionId = e.getPartitionId();
- String memberId = e.getMemberId();
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+ String clusterId = memberTerminatedEvent.getClusterId();
AbstractClusterMonitor monitor;
-
AutoscalerContext asCtx = AutoscalerContext.getInstance();
-
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
- PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
- partitionContext.removeMemberStatsContext(memberId);
-
- if (partitionContext.removeTerminationPendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from termination pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removePendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removeActiveMemberById(memberId)) {
- log.warn(String.format("Member is in the wrong list and it is removed from "
- + "active members list", memberId));
- } else if (partitionContext.removeObsoleteMember(memberId)){
- log.warn(String.format("Member's obsolated timeout has been expired and "
- + "it is removed from obsolated members list", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, "
- + "pending and termination pending", memberId));
- }
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been removed successfully: "
- + "[member] %s", memberId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
+ monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
@@ -367,160 +247,47 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
-
try {
- TopologyManager.acquireReadLock();
-
- MemberActivatedEvent e = (MemberActivatedEvent) event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- String clusterId = e.getClusterId();
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+ String clusterId = memberActivatedEvent.getClusterId();
AbstractClusterMonitor monitor;
-
AutoscalerContext asCtx = AutoscalerContext.getInstance();
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if (monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
- PartitionContext partitionContext;
- partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- partitionContext.movePendingMemberToActiveMembers(memberId);
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext;
- kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
- kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
- }
-
+ monitor.handleMemberActivatedEvent(memberActivatedEvent);
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
});
- topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- String memberId = memberReadyToShutdownEvent.getMemberId();
-
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- NetworkPartitionContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
- // start a new member in the same Partition
- String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
- // terminate the shutdown ready member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
- } catch (TerminationException e) {
- log.error(e);
- }
- }
-
- });
-
-
topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
-
try {
- TopologyManager.acquireReadLock();
-
- MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- String clusterId = e.getClusterId();
+ MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+ String clusterId = maintenanceModeEvent.getClusterId();
AbstractClusterMonitor monitor;
-
AutoscalerContext asCtx = AutoscalerContext.getInstance();
- if (asCtx.clusterMonitorExist(clusterId)) {
- monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- PartitionContext partitionContext;
- partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member has been moved as pending termination: "
- + "[member] %s", memberId));
- }
- partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
+ monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
});
@@ -529,27 +296,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
-// try {
-// TopologyManager.acquireReadLock();
-//
-// // Remove all clusters of given service from context
-// ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
-// for(Service service : TopologyManager.getTopology().getServices()) {
-// for(Cluster cluster : service.getClusters()) {
-// removeMonitor(cluster.getHostName());
-// }
-// }
-// }
-// finally {
-// TopologyManager.releaseReadLock();
-// }
+
}
});
}
private class ClusterMonitorAdder implements Runnable {
private Cluster cluster;
- private String clusterMonitorType;
+
public ClusterMonitorAdder(Cluster cluster) {
this.cluster = cluster;
}
@@ -567,38 +321,41 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
try {
monitor = ClusterMonitorFactory.getMonitor(cluster);
success = true;
- clusterMonitorType = monitor.getClusterType().name();
} catch (PolicyValidationException e) {
- String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
- log.debug(msg, e);
+ if (log.isDebugEnabled()) {
+ String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ }
retries--;
-
} catch (PartitionValidationException e) {
- String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
- log.debug(msg, e);
+ if (log.isDebugEnabled()) {
+ String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ }
retries--;
}
} while (!success && retries != 0);
if (monitor == null) {
String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
- + "for cluster: " + cluster.getClusterId();
+ + "for cluster: " + cluster.getClusterId();
log.error(msg);
throw new RuntimeException(msg);
}
-
+ //TODO private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ // scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS);
Thread th = new Thread(monitor);
th.start();
AutoscalerContext.getInstance().addClusterMonitor(monitor);
if (log.isInfoEnabled()) {
- log.info(String.format("%s monitor has been added successfully: [cluster] %s",
- clusterMonitorType, cluster.getClusterId()));
+ log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
+ cluster.getClusterId()));
}
}
}
-
+
@SuppressWarnings("unused")
- private void runTerminateAllRule(VMClusterMonitor monitor) {
+ private void runTerminateAllRule(VMClusterMonitor monitor) {
FactHandle terminateAllFactHandle = null;
@@ -621,9 +378,13 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
protected synchronized void startClusterMonitor(Cluster cluster) {
Thread th = null;
- if (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
- th = new Thread(new ClusterMonitorAdder(cluster));
- }
+
+ AbstractClusterMonitor monitor;
+ monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+
+ if (null == monitor) {
+ th = new Thread(new ClusterMonitorAdder(cluster));
+ }
if (th != null) {
th.start();
try {
@@ -632,9 +393,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
}
if (log.isDebugEnabled()) {
- log.debug(String
- .format("Cluster monitor thread has been started successfully: [cluster] %s ",
- cluster.getClusterId()));
+ log.debug(String.format("Cluster monitor thread has been started successfully: "
+ + "[cluster] %s ", cluster.getClusterId()));
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index cb60027..6061c3b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -19,130 +19,211 @@
package org.apache.stratos.autoscaler.monitor;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
/*
* Every cluster monitor, which are monitoring a cluster, should extend this class.
*/
-public abstract class AbstractClusterMonitor implements Runnable{
-
+public abstract class AbstractClusterMonitor implements Runnable {
+
private String clusterId;
private String serviceId;
- private ClusterType clusterType;
- private ClusterStatus status;
- private int monitorInterval;
-
- protected FactHandle minCheckFactHandle;
- protected FactHandle scaleCheckFactHandle;
- private StatefulKnowledgeSession minCheckKnowledgeSession;
- private StatefulKnowledgeSession scaleCheckKnowledgeSession;
- private boolean isDestroyed;
-
- private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-
- protected AbstractClusterMonitor(String clusterId, String serviceId, ClusterType clusterType,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-
- super();
- this.clusterId = clusterId;
- this.serviceId = serviceId;
- this.clusterType = clusterType;
- this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ private ClusterStatus status;
+ private int monitoringIntervalMilliseconds;
+
+ protected FactHandle minCheckFactHandle;
+ protected FactHandle scaleCheckFactHandle;
+ private StatefulKnowledgeSession minCheckKnowledgeSession;
+ private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+ private boolean isDestroyed;
+
+ private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+
+ protected AbstractClusterMonitor(String clusterId, String serviceId,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+ super();
+ this.clusterId = clusterId;
+ this.serviceId = serviceId;
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
- }
+ }
+
+ protected abstract void readConfigurations();
+
+ protected abstract void monitor();
- protected abstract void readConfigurations();
- protected abstract void monitor();
public abstract void destroy();
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public void setStatus(ClusterStatus status) {
- this.status = status;
- }
-
- public ClusterType getClusterType() {
- return clusterType;
- }
-
- public ClusterStatus getStatus() {
- return status;
- }
-
- public String getServiceId() {
- return serviceId;
- }
-
- public void setServiceId(String serviceId) {
- this.serviceId = serviceId;
- }
-
- public int getMonitorInterval() {
- return monitorInterval;
- }
-
- public void setMonitorInterval(int monitorInterval) {
- this.monitorInterval = monitorInterval;
- }
-
- public FactHandle getMinCheckFactHandle() {
- return minCheckFactHandle;
- }
-
- public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
- this.minCheckFactHandle = minCheckFactHandle;
- }
-
- public FactHandle getScaleCheckFactHandle() {
- return scaleCheckFactHandle;
- }
-
- public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
- this.scaleCheckFactHandle = scaleCheckFactHandle;
- }
-
- public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
- return minCheckKnowledgeSession;
- }
-
- public void setMinCheckKnowledgeSession(
- StatefulKnowledgeSession minCheckKnowledgeSession) {
- this.minCheckKnowledgeSession = minCheckKnowledgeSession;
- }
-
- public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
- return scaleCheckKnowledgeSession;
- }
-
- public void setScaleCheckKnowledgeSession(
- StatefulKnowledgeSession scaleCheckKnowledgeSession) {
- this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
- }
-
- public boolean isDestroyed() {
- return isDestroyed;
- }
-
- public void setDestroyed(boolean isDestroyed) {
- this.isDestroyed = isDestroyed;
- }
-
- public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
- return autoscalerRuleEvaluator;
- }
-
- public void setAutoscalerRuleEvaluator(
- AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
- this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
- }
+
+ //handle health events
+ public abstract void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent);
+
+ public abstract void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+ public abstract void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+ public abstract void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+ public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+ public abstract void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+ public abstract void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+ public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+ public abstract void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+ public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+ public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+ public abstract void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+ public abstract void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+ public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+ //handle topology events
+ public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+ public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+ public abstract void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent);
+
+ public abstract void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+ public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+ public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public void setStatus(ClusterStatus status) {
+ this.status = status;
+ }
+
+ public ClusterStatus getStatus() {
+ return status;
+ }
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public void setServiceId(String serviceId) {
+ this.serviceId = serviceId;
+ }
+
+ public int getMonitorIntervalMilliseconds() {
+ return monitoringIntervalMilliseconds;
+ }
+
+ public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+ this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+ }
+
+ public FactHandle getMinCheckFactHandle() {
+ return minCheckFactHandle;
+ }
+
+ public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+ this.minCheckFactHandle = minCheckFactHandle;
+ }
+
+ public FactHandle getScaleCheckFactHandle() {
+ return scaleCheckFactHandle;
+ }
+
+ public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+ this.scaleCheckFactHandle = scaleCheckFactHandle;
+ }
+
+ public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+ return minCheckKnowledgeSession;
+ }
+
+ public void setMinCheckKnowledgeSession(
+ StatefulKnowledgeSession minCheckKnowledgeSession) {
+ this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+ }
+
+ public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+ return scaleCheckKnowledgeSession;
+ }
+
+ public void setScaleCheckKnowledgeSession(
+ StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+ this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+ }
+
+ public boolean isDestroyed() {
+ return isDestroyed;
+ }
+
+ public void setDestroyed(boolean isDestroyed) {
+ this.isDestroyed = isDestroyed;
+ }
+
+ public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+ return autoscalerRuleEvaluator;
+ }
+
+ public void setAutoscalerRuleEvaluator(
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
index bd01dc6..208e4ce 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -52,30 +52,32 @@ import org.apache.stratos.messaging.util.Constants;
* Factory class for creating cluster monitors.
*/
public class ClusterMonitorFactory {
-
- private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
- /**
- * @param cluster the cluster to be monitored
- * @return the created cluster monitor
- * @throws PolicyValidationException when deployment policy is not valid
- * @throws PartitionValidationException when partition is not valid
- */
- public static AbstractClusterMonitor getMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-
- AbstractClusterMonitor clusterMonitor;
- if(cluster.isKubernetesCluster()){
- clusterMonitor = getDockerServiceClusterMonitor(cluster);
- } else if (cluster.isLbCluster()){
- clusterMonitor = getVMLbClusterMonitor(cluster);
- } else {
- clusterMonitor = getVMServiceClusterMonitor(cluster);
- }
-
- return clusterMonitor;
- }
-
- private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+
+ private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+ /**
+ * @param cluster the cluster to be monitored
+ * @return the created cluster monitor
+ * @throws PolicyValidationException when deployment policy is not valid
+ * @throws PartitionValidationException when partition is not valid
+ */
+ public static AbstractClusterMonitor getMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+
+ AbstractClusterMonitor clusterMonitor;
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor = getDockerServiceClusterMonitor(cluster);
+ } else if (cluster.isLbCluster()) {
+ clusterMonitor = getVMLbClusterMonitor(cluster);
+ } else {
+ clusterMonitor = getVMServiceClusterMonitor(cluster);
+ }
+
+ return clusterMonitor;
+ }
+
+ private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
// FIXME fix the following code to correctly update
// AutoscalerContext context = AutoscalerContext.getInstance();
if (null == cluster) {
@@ -91,11 +93,11 @@ public class ClusterMonitorFactory {
}
AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
if (deploymentPolicy == null) {
String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -106,8 +108,8 @@ public class ClusterMonitorFactory {
Partition[] allPartitions = deploymentPolicy.getAllPartitions();
if (allPartitions == null) {
String msg =
- "Deployment Policy's Partitions are null. Policy name: " +
- deploymentPolicyName;
+ "Deployment Policy's Partitions are null. Policy name: " +
+ deploymentPolicyName;
log.error(msg);
throw new PolicyValidationException(msg);
}
@@ -115,98 +117,100 @@ public class ClusterMonitorFactory {
CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
VMServiceClusterMonitor clusterMonitor =
- new VMServiceClusterMonitor(cluster.getClusterId(),
- cluster.getServiceName(),
- deploymentPolicy, policy);
+ new VMServiceClusterMonitor(cluster.getClusterId(),
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
clusterMonitor.setStatus(ClusterStatus.Created);
-
- for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
+
+ for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
- for(Partition partition: partitionGroup.getPartitions()){
+ for (Partition partition : partitionGroup.getPartitions()) {
PartitionContext partitionContext = new PartitionContext(partition);
partitionContext.setServiceName(cluster.getServiceName());
partitionContext.setProperties(cluster.getProperties());
partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
- for (Member member: cluster.getMembers()){
+
+ for (Member member : cluster.getMembers()) {
String memberId = member.getMemberId();
- if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
+ if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
MemberContext memberContext = new MemberContext();
memberContext.setClusterId(member.getClusterId());
memberContext.setMemberId(memberId);
memberContext.setPartition(partition);
memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
- if(MemberStatus.Activated.equals(member.getStatus())){
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
partitionContext.addActiveMember(memberContext);
// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
// partitionContext.incrementCurrentActiveMemberCount(1);
- } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){
+ } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
partitionContext.addPendingMember(memberContext);
// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- } else if(MemberStatus.Suspended.equals(member.getStatus())){
+ } else if (MemberStatus.Suspended.equals(member.getStatus())) {
// partitionContext.addFaultyMember(memberId);
}
partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Member stat context has been added: [member] %s", memberId));
}
}
}
networkPartitionContext.addPartitionContext(partitionContext);
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Partition context has been added: [partition] %s",
- partitionContext.getPartitionId()));
+ partitionContext.getPartitionId()));
}
}
clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Network partition context has been added: [network partition] %s",
- networkPartitionContext.getId()));
+ networkPartitionContext.getId()));
}
}
-
-
+
+
// find lb reference type
java.util.Properties props = cluster.getProperties();
-
- if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
String value = props.getProperty(Constants.LOAD_BALANCER_REF);
clusterMonitor.setLbReferenceType(value);
- if(log.isDebugEnabled()) {
- log.debug("Set the lb reference type: "+value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
}
}
-
+
// set hasPrimary property
// hasPrimary is true if there are primary members available in that cluster
clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
- log.info("VMServiceClusterMonitor created: "+clusterMonitor.toString());
+ log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
return clusterMonitor;
}
-
+
private static Properties convertMemberPropsToMemberContextProps(
- java.util.Properties properties) {
- Properties props = new Properties();
- for (Map.Entry<Object, Object> e : properties.entrySet() ) {
- Property prop = new Property();
- prop.setName((String)e.getKey());
- prop.setValue((String)e.getValue());
- props.addProperties(prop);
- }
- return props;
- }
-
-
- private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+ java.util.Properties properties) {
+ Properties props = new Properties();
+ for (Map.Entry<Object, Object> e : properties.entrySet()) {
+ Property prop = new Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ props.addProperties(prop);
+ }
+ return props;
+ }
+
+
+ private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
// FIXME fix the following code to correctly update
// AutoscalerContext context = AutoscalerContext.getInstance();
if (null == cluster) {
@@ -222,11 +226,11 @@ public class ClusterMonitorFactory {
}
AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
if (deploymentPolicy == null) {
String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -236,21 +240,21 @@ public class ClusterMonitorFactory {
String clusterId = cluster.getClusterId();
VMLbClusterMonitor clusterMonitor =
- new VMLbClusterMonitor(clusterId,
- cluster.getServiceName(),
- deploymentPolicy, policy);
+ new VMLbClusterMonitor(clusterId,
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
clusterMonitor.setStatus(ClusterStatus.Created);
// partition group = network partition context
for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
NetworkPartitionLbHolder networkPartitionLbHolder =
- PartitionManager.getInstance()
- .getNetworkPartitionLbHolder(partitionGroup.getId());
+ PartitionManager.getInstance()
+ .getNetworkPartitionLbHolder(partitionGroup.getId());
// PartitionManager.getInstance()
// .getNetworkPartitionLbHolder(partitionGroup.getId());
// FIXME pick a random partition
Partition partition =
- partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+ partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
PartitionContext partitionContext = new PartitionContext(partition);
partitionContext.setServiceName(cluster.getServiceName());
partitionContext.setProperties(cluster.getProperties());
@@ -258,7 +262,8 @@ public class ClusterMonitorFactory {
partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ;
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
for (Member member : cluster.getMembers()) {
String memberId = member.getMemberId();
if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
@@ -280,23 +285,23 @@ public class ClusterMonitorFactory {
}
partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Member stat context has been added: [member] %s", memberId));
}
}
}
networkPartitionContext.addPartitionContext(partitionContext);
-
+
// populate lb cluster id in network partition context.
java.util.Properties props = cluster.getProperties();
// get service type of load balanced cluster
String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-
- if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-
+
if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
@@ -317,13 +322,17 @@ public class ClusterMonitorFactory {
clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
}
- log.info("VMLbClusterMonitor created: "+clusterMonitor.toString());
+ log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
return clusterMonitor;
}
-
- private static DockerServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
- if (null == cluster) {
+ /**
+ * @param cluster - the cluster which needs to be monitored
+ * @return - the cluster monitor
+ */
+ private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
+
+ if (null == cluster) {
return null;
}
@@ -335,42 +344,43 @@ public class ClusterMonitorFactory {
AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
java.util.Properties props = cluster.getProperties();
String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
- KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
- cluster.getClusterId());
-
- DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor(
- kubernetesClusterCtxt,
- cluster.getClusterId(),
- cluster.getServiceName(),
- policy);
-
+ KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+ cluster.getClusterId());
+
+ KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+ kubernetesClusterCtxt,
+ cluster.getClusterId(),
+ cluster.getServiceName(),
+ policy);
+
dockerClusterMonitor.setStatus(ClusterStatus.Created);
-
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- String clusterId = member.getClusterId();
- MemberContext memberContext = new MemberContext();
- memberContext.setMemberId(memberId);
- memberContext.setClusterId(clusterId);
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
- } else if (MemberStatus.Created.equals(member.getStatus())
- || MemberStatus.Starting.equals(member.getStatus())) {
- dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
- }
- }
+
+ //populate the members after restarting
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ String clusterId = member.getClusterId();
+ MemberContext memberContext = new MemberContext();
+ memberContext.setMemberId(memberId);
+ memberContext.setClusterId(clusterId);
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+ } else if (MemberStatus.Created.equals(member.getStatus())
+ || MemberStatus.Starting.equals(member.getStatus())) {
+ dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+ }
+ }
// find lb reference type
- if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
String value = props.getProperty(Constants.LOAD_BALANCER_REF);
dockerClusterMonitor.setLbReferenceType(value);
- if(log.isDebugEnabled()) {
- log.debug("Set the lb reference type: "+value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
}
}
-
- log.info("KubernetesServiceClusterMonitor created: "+ dockerClusterMonitor.toString());
+
+ log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
return dockerClusterMonitor;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
deleted file mode 100644
index 2621690..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
-
-/*
- * Every container cluster monitor should extend this class
- */
-public abstract class ContainerClusterMonitor extends AbstractClusterMonitor {
-
- private KubernetesClusterContext kubernetesClusterCtxt;
- protected AutoscalePolicy autoscalePolicy;
-
- protected ContainerClusterMonitor(String clusterId, String serviceId, ClusterType clusterType,
- KubernetesClusterContext kubernetesClusterContext,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator, AutoscalePolicy autoscalePolicy){
-
- super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator);
- this.kubernetesClusterCtxt = kubernetesClusterContext;
- this.autoscalePolicy = autoscalePolicy;
- }
-
- public KubernetesClusterContext getKubernetesClusterCtxt() {
- return kubernetesClusterCtxt;
- }
-
- public void setKubernetesClusterCtxt(
- KubernetesClusterContext kubernetesClusterCtxt) {
- this.kubernetesClusterCtxt = kubernetesClusterCtxt;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return autoscalePolicy;
- }
-
- public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
- this.autoscalePolicy = autoscalePolicy;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
deleted file mode 100644
index 850a295..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Properties;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.common.enums.ClusterType;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{
-
- private static final Log log = LogFactory.getLog(DockerServiceClusterMonitor.class);
-
- private String lbReferenceType;
- private int numberOfReplicasInServiceCluster = 0;
- int retryInterval = 60000;
-
- public DockerServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
- String serviceClusterID, String serviceId, AutoscalePolicy autoscalePolicy) {
- super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, kubernetesClusterCtxt,
- new AutoscalerRuleEvaluator(), autoscalePolicy);
- readConfigurations();
- }
-
- @Override
- public void run() {
- try {
- // TODO make this configurable,
- // this is the delay the min check of normal cluster monitor to wait
- // until LB monitor is added
- Thread.sleep(60000);
- } catch (InterruptedException ignore) {
- }
-
- while (!isDestroyed()) {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
- + ClusterStatus.In_Maintenance + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
- e);
- }
- try {
- Thread.sleep(getMonitorInterval());
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- protected void monitor() {
-
- // is container created successfully?
- boolean success = false;
- String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID();
-
- try {
- TopologyManager.acquireReadLock();
- Properties props = TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
- int minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
-
- int nonTerminatedMembers = getKubernetesClusterCtxt().getActiveMembers().size() + getKubernetesClusterCtxt().getPendingMembers().size();
-
- if (nonTerminatedMembers == 0) {
-
- while (!success) {
- try {
-
- MemberContext memberContext = CloudControllerClient.getInstance().createContainer(kubernetesClusterId, getClusterId());
- if(null != memberContext) {
- getKubernetesClusterCtxt().addPendingMember(memberContext);
- success = true;
- numberOfReplicasInServiceCluster = minReplicas;
- if(log.isDebugEnabled()){
- log.debug(String.format("Pending member added, [member] %s [kub cluster] %s",
- memberContext.getMemberId(), getKubernetesClusterCtxt().getKubernetesClusterID()));
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Returned member context is null, did not add to pending members");
- }
- }
- } catch (Throwable e) {
- if (log.isDebugEnabled()) {
- String message = "Cannot create a container, will retry in "+(retryInterval/1000)+"s";
- log.debug(message, e);
- }
- }
-
- try {
- Thread.sleep(retryInterval);
- } catch (InterruptedException e1) {
- }
- }
- }
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getScaleCheckKnowledgeSession().dispose();
- setDestroyed(true);
- if(log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. "+this.toString());
- }
- }
-
- @Override
- protected void readConfigurations () {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
- setMonitorInterval(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorInterval());
- }
- }
-
- @Override
- public String toString() {
- return "KubernetesServiceClusterMonitor "
- + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
- + ", clusterId=" + getClusterId()
- + ", serviceId=" + getServiceId() + "]";
- }
-
- public String getLbReferenceType() {
- return lbReferenceType;
- }
-
- public void setLbReferenceType(String lbReferenceType) {
- this.lbReferenceType = lbReferenceType;
- }
-}
\ No newline at end of file