You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by sa...@apache.org on 2014/10/06 19:43:11 UTC
[5/7] code review changes to cluster monitors
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1603aef..e857eaf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -19,29 +19,16 @@
package org.apache.stratos.autoscaler.message.receiver.topology;
-import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.PartitionValidationException;
import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.event.Event;
@@ -112,7 +99,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
-
try {
TopologyManager.acquireReadLock();
for (Service service : TopologyManager.getTopology().getServices()) {
@@ -121,167 +107,108 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
}
}
} catch (Exception e) {
- log.error("Error processing event", e);
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
} finally {
TopologyManager.releaseReadLock();
}
}
-
-
});
topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
@Override
protected void onEvent(Event event) {
try {
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- String memberId = memberReadyToShutdownEvent.getMemberId();
-
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- TopologyManager.acquireReadLock();
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- NetworkPartitionContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
- // start a new member in the same Partition
- String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
- // terminate the shutdown ready member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
+ monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
- } catch (TerminationException e) {
- log.error(e);
+ topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ log.info("Event received: " + event);
+ ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
+ Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId());
+ startClusterMonitor(cluster);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
} finally {
TopologyManager.releaseReadLock();
}
}
-
});
- topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- log.info("Event received: " + event);
- ClusterCreatedEvent e = (ClusterCreatedEvent) event;
- TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(e.getServiceName());
- Cluster cluster = service.getCluster(e.getClusterId());
- startClusterMonitor(cluster);
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- });
-
topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() {
@Override
protected void onEvent(Event event) {
try {
log.info("Event received: " + event);
- ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event;
+ ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = (ClusterMaintenanceModeEvent) event;
TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(e.getServiceName());
- Cluster cluster = service.getCluster(e.getClusterId());
- if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
- AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
- } else {
+ Service service = TopologyManager.getTopology().getService(clusterMaintenanceModeEvent.getServiceName());
+ Cluster cluster = service.getCluster(clusterMaintenanceModeEvent.getClusterId());
+ AbstractClusterMonitor monitor;
+ monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+ if (null == monitor) {
log.error("cluster monitor not exists for the cluster: " + cluster.toString());
+ return;
}
+ monitor.setStatus(clusterMaintenanceModeEvent.getStatus());
} catch (Exception e) {
- log.error("Error processing event", e);
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
} finally {
TopologyManager.releaseReadLock();
}
}
-
- });
+ });
topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
try {
- ClusterRemovedEvent e = (ClusterRemovedEvent) event;
- TopologyManager.acquireReadLock();
-
- String clusterId = e.getClusterId();
- String deploymentPolicy = e.getDeploymentPolicy();
-
- AbstractClusterMonitor monitor = null;
-
- if (e.isLbCluster()) {
- DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
- if (depPolicy != null) {
- List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
- .getNetworkPartitionLbHolders(depPolicy);
-
- for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
- // removes lb cluster ids
- boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
- if (isRemoved) {
- log.info("Removed the lb cluster [id]:"
- + clusterId
- + " reference from Network Partition [id]: "
- + networkPartitionLbHolder
- .getNetworkPartitionId());
-
- }
- if (log.isDebugEnabled()) {
- log.debug(networkPartitionLbHolder);
- }
-
- }
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ String clusterId = clusterRemovedEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
}
+ return;
}
-
- monitor = AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
-
- // runTerminateAllRule(monitor);
- if (monitor != null) {
- monitor.destroy();
- log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
- clusterId));
- }
+ monitor.handleClusterRemovedEvent(clusterRemovedEvent);
+ asCtx.removeClusterMonitor(clusterId);
+ monitor.destroy();
+ log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+ clusterId));
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
-
});
topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
@@ -295,70 +222,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
-
try {
- TopologyManager.acquireReadLock();
- MemberTerminatedEvent e = (MemberTerminatedEvent) event;
- String networkPartitionId = e.getNetworkPartitionId();
- String clusterId = e.getClusterId();
- String partitionId = e.getPartitionId();
- String memberId = e.getMemberId();
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+ String clusterId = memberTerminatedEvent.getClusterId();
AbstractClusterMonitor monitor;
-
AutoscalerContext asCtx = AutoscalerContext.getInstance();
-
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- NetworkPartitionContext networkPartitionContext =
- ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
- PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
- partitionContext.removeMemberStatsContext(memberId);
-
- if (partitionContext.removeTerminationPendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from termination pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removePendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removeActiveMemberById(memberId)) {
- log.warn(String.format("Member is in the wrong list and it is removed from "
- + "active members list", memberId));
- } else if (partitionContext.removeObsoleteMember(memberId)){
- log.warn(String.format("Member's obsolated timeout has been expired and "
- + "it is removed from obsolated members list", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, "
- + "pending and termination pending", memberId));
- }
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been removed successfully: "
- + "[member] %s", memberId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
+ monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
@@ -367,160 +247,47 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
-
try {
- TopologyManager.acquireReadLock();
-
- MemberActivatedEvent e = (MemberActivatedEvent) event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- String clusterId = e.getClusterId();
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+ String clusterId = memberActivatedEvent.getClusterId();
AbstractClusterMonitor monitor;
-
AutoscalerContext asCtx = AutoscalerContext.getInstance();
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if (monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
- PartitionContext partitionContext;
- partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- partitionContext.movePendingMemberToActiveMembers(memberId);
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- KubernetesClusterContext kubernetesClusterContext;
- kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
- kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
- }
-
+ monitor.handleMemberActivatedEvent(memberActivatedEvent);
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
});
- topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyManager.acquireReadLock();
-
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- String memberId = memberReadyToShutdownEvent.getMemberId();
-
- if(asCtx.clusterMonitorExist(clusterId)) {
- monitor = asCtx.getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- NetworkPartitionContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
-
- // start a new member in the same Partition
- String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-
- // terminate the shutdown ready member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
- } catch (TerminationException e) {
- log.error(e);
- }
- }
-
- });
-
-
topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
-
try {
- TopologyManager.acquireReadLock();
-
- MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event;
- String memberId = e.getMemberId();
- String partitionId = e.getPartitionId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- String clusterId = e.getClusterId();
+ MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+ String clusterId = maintenanceModeEvent.getClusterId();
AbstractClusterMonitor monitor;
-
AutoscalerContext asCtx = AutoscalerContext.getInstance();
- if (asCtx.clusterMonitorExist(clusterId)) {
- monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- } else {
- if(log.isDebugEnabled()){
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
-
- if(monitor.getClusterType() == ClusterType.VMServiceCluster
- || monitor.getClusterType() == ClusterType.VMLbCluster) {
-
- PartitionContext partitionContext;
- partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member has been moved as pending termination: "
- + "[member] %s", memberId));
- }
- partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
- } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) {
- // no need to do anything
- }
-
+ monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
} catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- TopologyManager.releaseReadLock();
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
}
}
});
@@ -529,27 +296,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
@Override
protected void onEvent(Event event) {
-// try {
-// TopologyManager.acquireReadLock();
-//
-// // Remove all clusters of given service from context
-// ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
-// for(Service service : TopologyManager.getTopology().getServices()) {
-// for(Cluster cluster : service.getClusters()) {
-// removeMonitor(cluster.getHostName());
-// }
-// }
-// }
-// finally {
-// TopologyManager.releaseReadLock();
-// }
+
}
});
}
private class ClusterMonitorAdder implements Runnable {
private Cluster cluster;
- private String clusterMonitorType;
+
public ClusterMonitorAdder(Cluster cluster) {
this.cluster = cluster;
}
@@ -567,38 +321,41 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
try {
monitor = ClusterMonitorFactory.getMonitor(cluster);
success = true;
- clusterMonitorType = monitor.getClusterType().name();
} catch (PolicyValidationException e) {
- String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
- log.debug(msg, e);
+ if (log.isDebugEnabled()) {
+ String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ }
retries--;
-
} catch (PartitionValidationException e) {
- String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
- log.debug(msg, e);
+ if (log.isDebugEnabled()) {
+ String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId();
+ log.debug(msg, e);
+ }
retries--;
}
} while (!success && retries != 0);
if (monitor == null) {
String msg = "Cluster monitor creation failed, even after retrying for 5 times, "
- + "for cluster: " + cluster.getClusterId();
+ + "for cluster: " + cluster.getClusterId();
log.error(msg);
throw new RuntimeException(msg);
}
-
+ //TODO private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ // scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS);
Thread th = new Thread(monitor);
th.start();
AutoscalerContext.getInstance().addClusterMonitor(monitor);
if (log.isInfoEnabled()) {
- log.info(String.format("%s monitor has been added successfully: [cluster] %s",
- clusterMonitorType, cluster.getClusterId()));
+ log.info(String.format("Cluster monitor has been added successfully: [cluster] %s",
+ cluster.getClusterId()));
}
}
}
-
+
@SuppressWarnings("unused")
- private void runTerminateAllRule(VMClusterMonitor monitor) {
+ private void runTerminateAllRule(VMClusterMonitor monitor) {
FactHandle terminateAllFactHandle = null;
@@ -621,9 +378,13 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
protected synchronized void startClusterMonitor(Cluster cluster) {
Thread th = null;
- if (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) {
- th = new Thread(new ClusterMonitorAdder(cluster));
- }
+
+ AbstractClusterMonitor monitor;
+ monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId());
+
+ if (null == monitor) {
+ th = new Thread(new ClusterMonitorAdder(cluster));
+ }
if (th != null) {
th.start();
try {
@@ -632,9 +393,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
}
if (log.isDebugEnabled()) {
- log.debug(String
- .format("Cluster monitor thread has been started successfully: [cluster] %s ",
- cluster.getClusterId()));
+ log.debug(String.format("Cluster monitor thread has been started successfully: "
+ + "[cluster] %s ", cluster.getClusterId()));
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index cb60027..6061c3b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -19,130 +19,211 @@
package org.apache.stratos.autoscaler.monitor;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
/*
* Every cluster monitor, which are monitoring a cluster, should extend this class.
*/
-public abstract class AbstractClusterMonitor implements Runnable{
-
+public abstract class AbstractClusterMonitor implements Runnable {
+
private String clusterId;
private String serviceId;
- private ClusterType clusterType;
- private ClusterStatus status;
- private int monitorInterval;
-
- protected FactHandle minCheckFactHandle;
- protected FactHandle scaleCheckFactHandle;
- private StatefulKnowledgeSession minCheckKnowledgeSession;
- private StatefulKnowledgeSession scaleCheckKnowledgeSession;
- private boolean isDestroyed;
-
- private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
-
- protected AbstractClusterMonitor(String clusterId, String serviceId, ClusterType clusterType,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-
- super();
- this.clusterId = clusterId;
- this.serviceId = serviceId;
- this.clusterType = clusterType;
- this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ private ClusterStatus status;
+ private int monitoringIntervalMilliseconds;
+
+ protected FactHandle minCheckFactHandle;
+ protected FactHandle scaleCheckFactHandle;
+ private StatefulKnowledgeSession minCheckKnowledgeSession;
+ private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+ private boolean isDestroyed;
+
+ private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+
+ protected AbstractClusterMonitor(String clusterId, String serviceId,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+ super();
+ this.clusterId = clusterId;
+ this.serviceId = serviceId;
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
- }
+ }
+
+ protected abstract void readConfigurations();
+
+ protected abstract void monitor();
- protected abstract void readConfigurations();
- protected abstract void monitor();
public abstract void destroy();
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public void setStatus(ClusterStatus status) {
- this.status = status;
- }
-
- public ClusterType getClusterType() {
- return clusterType;
- }
-
- public ClusterStatus getStatus() {
- return status;
- }
-
- public String getServiceId() {
- return serviceId;
- }
-
- public void setServiceId(String serviceId) {
- this.serviceId = serviceId;
- }
-
- public int getMonitorInterval() {
- return monitorInterval;
- }
-
- public void setMonitorInterval(int monitorInterval) {
- this.monitorInterval = monitorInterval;
- }
-
- public FactHandle getMinCheckFactHandle() {
- return minCheckFactHandle;
- }
-
- public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
- this.minCheckFactHandle = minCheckFactHandle;
- }
-
- public FactHandle getScaleCheckFactHandle() {
- return scaleCheckFactHandle;
- }
-
- public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
- this.scaleCheckFactHandle = scaleCheckFactHandle;
- }
-
- public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
- return minCheckKnowledgeSession;
- }
-
- public void setMinCheckKnowledgeSession(
- StatefulKnowledgeSession minCheckKnowledgeSession) {
- this.minCheckKnowledgeSession = minCheckKnowledgeSession;
- }
-
- public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
- return scaleCheckKnowledgeSession;
- }
-
- public void setScaleCheckKnowledgeSession(
- StatefulKnowledgeSession scaleCheckKnowledgeSession) {
- this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
- }
-
- public boolean isDestroyed() {
- return isDestroyed;
- }
-
- public void setDestroyed(boolean isDestroyed) {
- this.isDestroyed = isDestroyed;
- }
-
- public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
- return autoscalerRuleEvaluator;
- }
-
- public void setAutoscalerRuleEvaluator(
- AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
- this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
- }
+
+ //handle health events
+ public abstract void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent);
+
+ public abstract void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+ public abstract void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+ public abstract void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+ public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+ public abstract void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+ public abstract void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+ public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+ public abstract void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+ public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+ public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+ public abstract void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+ public abstract void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+ public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+ //handle topology events
+ public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+ public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+ public abstract void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent);
+
+ public abstract void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+ public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+ public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public void setStatus(ClusterStatus status) {
+ this.status = status;
+ }
+
+ public ClusterStatus getStatus() {
+ return status;
+ }
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public void setServiceId(String serviceId) {
+ this.serviceId = serviceId;
+ }
+
+ public int getMonitorIntervalMilliseconds() {
+ return monitoringIntervalMilliseconds;
+ }
+
+ public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+ this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+ }
+
+ public FactHandle getMinCheckFactHandle() {
+ return minCheckFactHandle;
+ }
+
+ public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+ this.minCheckFactHandle = minCheckFactHandle;
+ }
+
+ public FactHandle getScaleCheckFactHandle() {
+ return scaleCheckFactHandle;
+ }
+
+ public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+ this.scaleCheckFactHandle = scaleCheckFactHandle;
+ }
+
+ public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+ return minCheckKnowledgeSession;
+ }
+
+ public void setMinCheckKnowledgeSession(
+ StatefulKnowledgeSession minCheckKnowledgeSession) {
+ this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+ }
+
+ public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+ return scaleCheckKnowledgeSession;
+ }
+
+ public void setScaleCheckKnowledgeSession(
+ StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+ this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+ }
+
+ public boolean isDestroyed() {
+ return isDestroyed;
+ }
+
+ public void setDestroyed(boolean isDestroyed) {
+ this.isDestroyed = isDestroyed;
+ }
+
+ public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+ return autoscalerRuleEvaluator;
+ }
+
+ public void setAutoscalerRuleEvaluator(
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
index bd01dc6..208e4ce 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
@@ -52,30 +52,32 @@ import org.apache.stratos.messaging.util.Constants;
* Factory class for creating cluster monitors.
*/
public class ClusterMonitorFactory {
-
- private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
- /**
- * @param cluster the cluster to be monitored
- * @return the created cluster monitor
- * @throws PolicyValidationException when deployment policy is not valid
- * @throws PartitionValidationException when partition is not valid
- */
- public static AbstractClusterMonitor getMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
-
- AbstractClusterMonitor clusterMonitor;
- if(cluster.isKubernetesCluster()){
- clusterMonitor = getDockerServiceClusterMonitor(cluster);
- } else if (cluster.isLbCluster()){
- clusterMonitor = getVMLbClusterMonitor(cluster);
- } else {
- clusterMonitor = getVMServiceClusterMonitor(cluster);
- }
-
- return clusterMonitor;
- }
-
- private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+
+ private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+ /**
+ * @param cluster the cluster to be monitored
+ * @return the created cluster monitor
+ * @throws PolicyValidationException when deployment policy is not valid
+ * @throws PartitionValidationException when partition is not valid
+ */
+ public static AbstractClusterMonitor getMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+
+ AbstractClusterMonitor clusterMonitor;
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor = getDockerServiceClusterMonitor(cluster);
+ } else if (cluster.isLbCluster()) {
+ clusterMonitor = getVMLbClusterMonitor(cluster);
+ } else {
+ clusterMonitor = getVMServiceClusterMonitor(cluster);
+ }
+
+ return clusterMonitor;
+ }
+
+ private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
// FIXME fix the following code to correctly update
// AutoscalerContext context = AutoscalerContext.getInstance();
if (null == cluster) {
@@ -91,11 +93,11 @@ public class ClusterMonitorFactory {
}
AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
if (deploymentPolicy == null) {
String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -106,8 +108,8 @@ public class ClusterMonitorFactory {
Partition[] allPartitions = deploymentPolicy.getAllPartitions();
if (allPartitions == null) {
String msg =
- "Deployment Policy's Partitions are null. Policy name: " +
- deploymentPolicyName;
+ "Deployment Policy's Partitions are null. Policy name: " +
+ deploymentPolicyName;
log.error(msg);
throw new PolicyValidationException(msg);
}
@@ -115,98 +117,100 @@ public class ClusterMonitorFactory {
CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
VMServiceClusterMonitor clusterMonitor =
- new VMServiceClusterMonitor(cluster.getClusterId(),
- cluster.getServiceName(),
- deploymentPolicy, policy);
+ new VMServiceClusterMonitor(cluster.getClusterId(),
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
clusterMonitor.setStatus(ClusterStatus.Created);
-
- for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){
+
+ for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
- for(Partition partition: partitionGroup.getPartitions()){
+ for (Partition partition : partitionGroup.getPartitions()) {
PartitionContext partitionContext = new PartitionContext(partition);
partitionContext.setServiceName(cluster.getServiceName());
partitionContext.setProperties(cluster.getProperties());
partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
- for (Member member: cluster.getMembers()){
+
+ for (Member member : cluster.getMembers()) {
String memberId = member.getMemberId();
- if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
+ if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
MemberContext memberContext = new MemberContext();
memberContext.setClusterId(member.getClusterId());
memberContext.setMemberId(memberId);
memberContext.setPartition(partition);
memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
- if(MemberStatus.Activated.equals(member.getStatus())){
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
partitionContext.addActiveMember(memberContext);
// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
// partitionContext.incrementCurrentActiveMemberCount(1);
- } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){
+ } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
partitionContext.addPendingMember(memberContext);
// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- } else if(MemberStatus.Suspended.equals(member.getStatus())){
+ } else if (MemberStatus.Suspended.equals(member.getStatus())) {
// partitionContext.addFaultyMember(memberId);
}
partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Member stat context has been added: [member] %s", memberId));
}
}
}
networkPartitionContext.addPartitionContext(partitionContext);
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Partition context has been added: [partition] %s",
- partitionContext.getPartitionId()));
+ partitionContext.getPartitionId()));
}
}
clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Network partition context has been added: [network partition] %s",
- networkPartitionContext.getId()));
+ networkPartitionContext.getId()));
}
}
-
-
+
+
// find lb reference type
java.util.Properties props = cluster.getProperties();
-
- if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
String value = props.getProperty(Constants.LOAD_BALANCER_REF);
clusterMonitor.setLbReferenceType(value);
- if(log.isDebugEnabled()) {
- log.debug("Set the lb reference type: "+value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
}
}
-
+
// set hasPrimary property
// hasPrimary is true if there are primary members available in that cluster
clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
- log.info("VMServiceClusterMonitor created: "+clusterMonitor.toString());
+ log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
return clusterMonitor;
}
-
+
private static Properties convertMemberPropsToMemberContextProps(
- java.util.Properties properties) {
- Properties props = new Properties();
- for (Map.Entry<Object, Object> e : properties.entrySet() ) {
- Property prop = new Property();
- prop.setName((String)e.getKey());
- prop.setValue((String)e.getValue());
- props.addProperties(prop);
- }
- return props;
- }
-
-
- private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException {
+ java.util.Properties properties) {
+ Properties props = new Properties();
+ for (Map.Entry<Object, Object> e : properties.entrySet()) {
+ Property prop = new Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ props.addProperties(prop);
+ }
+ return props;
+ }
+
+
+ private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
// FIXME fix the following code to correctly update
// AutoscalerContext context = AutoscalerContext.getInstance();
if (null == cluster) {
@@ -222,11 +226,11 @@ public class ClusterMonitorFactory {
}
AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
if (deploymentPolicy == null) {
String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
@@ -236,21 +240,21 @@ public class ClusterMonitorFactory {
String clusterId = cluster.getClusterId();
VMLbClusterMonitor clusterMonitor =
- new VMLbClusterMonitor(clusterId,
- cluster.getServiceName(),
- deploymentPolicy, policy);
+ new VMLbClusterMonitor(clusterId,
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
clusterMonitor.setStatus(ClusterStatus.Created);
// partition group = network partition context
for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
NetworkPartitionLbHolder networkPartitionLbHolder =
- PartitionManager.getInstance()
- .getNetworkPartitionLbHolder(partitionGroup.getId());
+ PartitionManager.getInstance()
+ .getNetworkPartitionLbHolder(partitionGroup.getId());
// PartitionManager.getInstance()
// .getNetworkPartitionLbHolder(partitionGroup.getId());
// FIXME pick a random partition
Partition partition =
- partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+ partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
PartitionContext partitionContext = new PartitionContext(partition);
partitionContext.setServiceName(cluster.getServiceName());
partitionContext.setProperties(cluster.getProperties());
@@ -258,7 +262,8 @@ public class ClusterMonitorFactory {
partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ;
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
for (Member member : cluster.getMembers()) {
String memberId = member.getMemberId();
if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
@@ -280,23 +285,23 @@ public class ClusterMonitorFactory {
}
partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if(log.isInfoEnabled()){
+ if (log.isInfoEnabled()) {
log.info(String.format("Member stat context has been added: [member] %s", memberId));
}
}
}
networkPartitionContext.addPartitionContext(partitionContext);
-
+
// populate lb cluster id in network partition context.
java.util.Properties props = cluster.getProperties();
// get service type of load balanced cluster
String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-
- if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-
+
if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
@@ -317,13 +322,17 @@ public class ClusterMonitorFactory {
clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
}
- log.info("VMLbClusterMonitor created: "+clusterMonitor.toString());
+ log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
return clusterMonitor;
}
-
- private static DockerServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
- if (null == cluster) {
+ /**
+ * @param cluster - the cluster which needs to be monitored
+ * @return - the cluster monitor
+ */
+ private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) {
+
+ if (null == cluster) {
return null;
}
@@ -335,42 +344,43 @@ public class ClusterMonitorFactory {
AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
java.util.Properties props = cluster.getProperties();
String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
- KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
- cluster.getClusterId());
-
- DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor(
- kubernetesClusterCtxt,
- cluster.getClusterId(),
- cluster.getServiceName(),
- policy);
-
+ KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+ cluster.getClusterId());
+
+ KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+ kubernetesClusterCtxt,
+ cluster.getClusterId(),
+ cluster.getServiceName(),
+ policy);
+
dockerClusterMonitor.setStatus(ClusterStatus.Created);
-
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- String clusterId = member.getClusterId();
- MemberContext memberContext = new MemberContext();
- memberContext.setMemberId(memberId);
- memberContext.setClusterId(clusterId);
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
- } else if (MemberStatus.Created.equals(member.getStatus())
- || MemberStatus.Starting.equals(member.getStatus())) {
- dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
- }
- }
+
+ //populate the members after restarting
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ String clusterId = member.getClusterId();
+ MemberContext memberContext = new MemberContext();
+ memberContext.setMemberId(memberId);
+ memberContext.setClusterId(clusterId);
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+ } else if (MemberStatus.Created.equals(member.getStatus())
+ || MemberStatus.Starting.equals(member.getStatus())) {
+ dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+ }
+ }
// find lb reference type
- if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
String value = props.getProperty(Constants.LOAD_BALANCER_REF);
dockerClusterMonitor.setLbReferenceType(value);
- if(log.isDebugEnabled()) {
- log.debug("Set the lb reference type: "+value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
}
}
-
- log.info("KubernetesServiceClusterMonitor created: "+ dockerClusterMonitor.toString());
+
+ log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
return dockerClusterMonitor;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
deleted file mode 100644
index 2621690..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.enums.ClusterType;
-
-/*
- * Every container cluster monitor should extend this class
- */
-public abstract class ContainerClusterMonitor extends AbstractClusterMonitor {
-
- private KubernetesClusterContext kubernetesClusterCtxt;
- protected AutoscalePolicy autoscalePolicy;
-
- protected ContainerClusterMonitor(String clusterId, String serviceId, ClusterType clusterType,
- KubernetesClusterContext kubernetesClusterContext,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator, AutoscalePolicy autoscalePolicy){
-
- super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator);
- this.kubernetesClusterCtxt = kubernetesClusterContext;
- this.autoscalePolicy = autoscalePolicy;
- }
-
- public KubernetesClusterContext getKubernetesClusterCtxt() {
- return kubernetesClusterCtxt;
- }
-
- public void setKubernetesClusterCtxt(
- KubernetesClusterContext kubernetesClusterCtxt) {
- this.kubernetesClusterCtxt = kubernetesClusterCtxt;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return autoscalePolicy;
- }
-
- public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
- this.autoscalePolicy = autoscalePolicy;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
deleted file mode 100644
index 850a295..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Properties;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.common.enums.ClusterType;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{
-
- private static final Log log = LogFactory.getLog(DockerServiceClusterMonitor.class);
-
- private String lbReferenceType;
- private int numberOfReplicasInServiceCluster = 0;
- int retryInterval = 60000;
-
- public DockerServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
- String serviceClusterID, String serviceId, AutoscalePolicy autoscalePolicy) {
- super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, kubernetesClusterCtxt,
- new AutoscalerRuleEvaluator(), autoscalePolicy);
- readConfigurations();
- }
-
- @Override
- public void run() {
- try {
- // TODO make this configurable,
- // this is the delay the min check of normal cluster monitor to wait
- // until LB monitor is added
- Thread.sleep(60000);
- } catch (InterruptedException ignore) {
- }
-
- while (!isDestroyed()) {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
- + ClusterStatus.In_Maintenance + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
- e);
- }
- try {
- Thread.sleep(getMonitorInterval());
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- protected void monitor() {
-
- // is container created successfully?
- boolean success = false;
- String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID();
-
- try {
- TopologyManager.acquireReadLock();
- Properties props = TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
- int minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
-
- int nonTerminatedMembers = getKubernetesClusterCtxt().getActiveMembers().size() + getKubernetesClusterCtxt().getPendingMembers().size();
-
- if (nonTerminatedMembers == 0) {
-
- while (!success) {
- try {
-
- MemberContext memberContext = CloudControllerClient.getInstance().createContainer(kubernetesClusterId, getClusterId());
- if(null != memberContext) {
- getKubernetesClusterCtxt().addPendingMember(memberContext);
- success = true;
- numberOfReplicasInServiceCluster = minReplicas;
- if(log.isDebugEnabled()){
- log.debug(String.format("Pending member added, [member] %s [kub cluster] %s",
- memberContext.getMemberId(), getKubernetesClusterCtxt().getKubernetesClusterID()));
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Returned member context is null, did not add to pending members");
- }
- }
- } catch (Throwable e) {
- if (log.isDebugEnabled()) {
- String message = "Cannot create a container, will retry in "+(retryInterval/1000)+"s";
- log.debug(message, e);
- }
- }
-
- try {
- Thread.sleep(retryInterval);
- } catch (InterruptedException e1) {
- }
- }
- }
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getScaleCheckKnowledgeSession().dispose();
- setDestroyed(true);
- if(log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. "+this.toString());
- }
- }
-
- @Override
- protected void readConfigurations () {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
- setMonitorInterval(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorInterval());
- }
- }
-
- @Override
- public String toString() {
- return "KubernetesServiceClusterMonitor "
- + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
- + ", clusterId=" + getClusterId()
- + ", serviceId=" + getServiceId() + "]";
- }
-
- public String getLbReferenceType() {
- return lbReferenceType;
- }
-
- public void setLbReferenceType(String lbReferenceType) {
- this.lbReferenceType = lbReferenceType;
- }
-}
\ No newline at end of file