You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/19 12:23:27 UTC
[1/5] stratos git commit: Removed kubernetes cluster
monitors/contexts and renamed vm cluster monitor to cluster monitor
Repository: stratos
Updated Branches:
refs/heads/master 2578fda78 -> d9c323a2c
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 6b51ea6..d4328ab 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -1,4 +1,3 @@
-package org.apache.stratos.autoscaler.rule;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,33 +19,26 @@ package org.apache.stratos.autoscaler.rule;
*
*/
+package org.apache.stratos.autoscaler.rule;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.*;
+import org.apache.stratos.autoscaler.Constants;
import org.apache.stratos.autoscaler.algorithm.AutoscaleAlgorithm;
import org.apache.stratos.autoscaler.algorithm.OneAfterAnother;
import org.apache.stratos.autoscaler.algorithm.RoundRobin;
import org.apache.stratos.autoscaler.client.CloudControllerClient;
-import org.apache.stratos.autoscaler.client.InstanceNotificationClient;
+import org.apache.stratos.autoscaler.context.AutoscalerContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
-import org.apache.stratos.autoscaler.context.cluster.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
-import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
+import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
-//import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.PartitionManager;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.autoscaler.context.AutoscalerContext;
/**
* This will have utility methods that need to be executed from rule file...
@@ -190,8 +182,8 @@ public class RuleTasksDelegator {
// String lbClusterId = getLbClusterId(lbRefType, clusterMonitorPartitionContext, lbHolder);
//Calculate accumulation of minimum counts of all the partition of current network partition
int minimumCountOfNetworkPartition = 0;
- VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- VMClusterContext clusterContext = (VMClusterContext) vmClusterMonitor.getClusterContext();
+ ClusterMonitor vmClusterMonitor = (ClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ ClusterContext clusterContext = (ClusterContext) vmClusterMonitor.getClusterContext();
ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = clusterContext.getNetworkPartitionCtxt(nwPartitionId);
ClusterInstanceContext clusterInstanceContext =
(ClusterInstanceContext) clusterLevelNetworkPartitionContext.
@@ -245,8 +237,8 @@ public class RuleTasksDelegator {
// String lbClusterId = getLbClusterId(lbRefType, clusterMonitorPartitionContext, lbHolder);
//Calculate accumulation of minimum counts of all the partition of current network partition
int minimumCountOfNetworkPartition = 0;
- VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- VMClusterContext clusterContext = (VMClusterContext) vmClusterMonitor.getClusterContext();
+ ClusterMonitor vmClusterMonitor = (ClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ ClusterContext clusterContext = (ClusterContext) vmClusterMonitor.getClusterContext();
ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = clusterContext.getNetworkPartitionCtxt(nwPartitionId);
ClusterInstanceContext clusterInstanceContext =
(ClusterInstanceContext) clusterLevelNetworkPartitionContext.
@@ -302,7 +294,6 @@ public class RuleTasksDelegator {
log.debug("Returned member context is null, did not add to pending members");
}
}
-
} catch (Throwable e) {
String message = "Cannot spawn an instance";
log.error(message, e);
@@ -310,7 +301,6 @@ public class RuleTasksDelegator {
}
}
-
public void delegateScalingDependencyNotification(String clusterId, String networkPartitionId, String instanceId,
int requiredInstanceCount, int minimumInstanceCount) {
@@ -318,122 +308,29 @@ public class RuleTasksDelegator {
log.debug("Scaling dependent notification is going to the [parentInstance] " + instanceId);
}
//Notify parent for checking scaling dependencies
- AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ AbstractClusterMonitor abstractClusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
float fMinimumInstanceCount = minimumInstanceCount;
float factor = requiredInstanceCount / fMinimumInstanceCount;
- if (clusterMonitor instanceof VMClusterMonitor) {
-
- VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) clusterMonitor;
- vmClusterMonitor.sendClusterScalingEvent(networkPartitionId, instanceId, factor);
+ if (abstractClusterMonitor instanceof ClusterMonitor) {
+ ClusterMonitor clusterMonitor = (ClusterMonitor) abstractClusterMonitor;
+ clusterMonitor.sendClusterScalingEvent(networkPartitionId, instanceId, factor);
}
-
}
public void delegateScalingOverMaxNotification(String clusterId, String networkPartitionId, String instanceId) {
-
if(log.isDebugEnabled()) {
log.debug("Scaling max out notification is going to the [parentInstance] " + instanceId);
}
//Notify parent for checking scaling dependencies
- AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- if (clusterMonitor instanceof VMClusterMonitor) {
+ AbstractClusterMonitor abstractClusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ if (abstractClusterMonitor instanceof ClusterMonitor) {
- VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) clusterMonitor;
- vmClusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
+ ClusterMonitor clusterMonitor = (ClusterMonitor) abstractClusterMonitor;
+ clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
}
-
}
- // Original method. Assume this is invoked from mincheck.drl
-
- /* public void delegateSpawn(PartitionContext partitionContext, String clusterId, String lbRefType) {
- try {
-
- String nwPartitionId = partitionContext.getNetworkPartitionId();
- .getNetworkPartitionLbHolder(nwPartitionId);
- NetworkPartitionLbHolder lbHolder =
- PartitionManager.getInstance()
- .getNetworkPartitionLbHolder(nwPartitionId);
-
-
- String lbClusterId = getLbClusterId(lbRefType, partitionContext, lbHolder);
-
- MemberContext memberContext =
- CloudControllerClient.getInstance()
- .spawnAnInstance(partitionContext.getPartition(),
- clusterId,
- lbClusterId, partitionContext.getNetworkPartitionId());
- if (memberContext != null) {
- partitionContext.addPendingMember(memberContext);
- if(log.isDebugEnabled()){
- log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(),
- memberContext.getPartition().getPartitionId()));
- }
- } else if(log.isDebugEnabled()){
- log.debug("Returned member context is null, did not add to pending members");
- }
-
- } catch (Throwable e) {
- String message = "Cannot spawn an instance";
- log.error(message, e);
- throw new RuntimeException(message, e);
- }
- }*/
-
-//
-// public static String getLbClusterId(String lbRefType, ClusterLevelPartitionContext partitionCtxt,
-// NetworkPartitionLbHolder networkPartitionLbHolder) {
-//
-// String lbClusterId = null;
-//
-// if (lbRefType != null) {
-// if (lbRefType.equals(StratosConstants.DEFAULT_LOAD_BALANCER)) {
-// lbClusterId = networkPartitionLbHolder.getDefaultLbClusterId();
-//// lbClusterId = nwPartitionCtxt.getDefaultLbClusterId();
-// } else if (lbRefType.equals(StratosConstants.SERVICE_AWARE_LOAD_BALANCER)) {
-// String serviceName = partitionCtxt.getServiceName();
-// lbClusterId = networkPartitionLbHolder.getLBClusterIdOfService(serviceName);
-//// lbClusterId = nwPartitionCtxt.getLBClusterIdOfService(serviceName);
-// } else {
-// log.warn("Invalid LB reference type defined: [value] " + lbRefType);
-// }
-// }
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Getting LB id for spawning instance [lb reference] %s ," +
-// " [partition] %s [network partition] %s [Lb id] %s ", lbRefType, partitionCtxt.getPartitionId(),
-// networkPartitionLbHolder.getNetworkPartitionId(), lbClusterId));
-// }
-// return lbClusterId;
-// }
-
-// public static String getLbClusterId(String lbRefType, ClusterLevelPartitionContext partitionCtxt,
-// NetworkPartitionLbHolder networkPartitionLbHolder) {
-//
-// String lbClusterId = null;
-//
-// if (lbRefType != null) {
-// if (lbRefType.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
-// lbClusterId = networkPartitionLbHolder.getDefaultLbClusterId();
-//// lbClusterId = nwPartitionCtxt.getDefaultLbClusterId();
-// } else if (lbRefType.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
-// String serviceName = partitionCtxt.getServiceName();
-// lbClusterId = networkPartitionLbHolder.getLBClusterIdOfService(serviceName);
-//// lbClusterId = nwPartitionCtxt.getLBClusterIdOfService(serviceName);
-// } else {
-// log.warn("Invalid LB reference type defined: [value] " + lbRefType);
-// }
-// }
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Getting LB id for spawning instance [lb reference] %s ," +
-// " [partition] %s [network partition] %s [Lb id] %s ", lbRefType, partitionCtxt.getPartitionId(),
-// networkPartitionLbHolder.getNetworkPartitionId(), lbClusterId));
-// }
-// return lbClusterId;
-// }
-
-
public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) {
-
log.info("Starting to terminate Member [ " + memberId + " ], in Partition [ " +
clusterMonitorPartitionContext.getPartitionId() + " ], NW Partition [ " +
clusterMonitorPartitionContext.getNetworkPartitionId() + " ]");
@@ -478,7 +375,7 @@ public class RuleTasksDelegator {
if (log.isDebugEnabled()) {
log.debug("delegateTerminateAll - begin");
}
- CloudControllerClient.getInstance().terminateAllInstances(clusterId);
+ CloudControllerClient.getInstance().terminateInstances(clusterId);
if (log.isDebugEnabled()) {
log.debug("delegateTerminateAll - done");
}
@@ -487,94 +384,6 @@ public class RuleTasksDelegator {
}
}
- public void delegateStartContainers(KubernetesClusterContext kubernetesClusterContext) {
- try {
- String kubernetesClusterId = kubernetesClusterContext.getKubernetesClusterID();
- String clusterId = kubernetesClusterContext.getClusterId();
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
-// MemberContext[] memberContexts = ccClient.startContainers(kubernetesClusterId, clusterId);
-// if (null != memberContexts) {
-// for (MemberContext memberContext : memberContexts) {
-// if (null != memberContext) {
-// kubernetesClusterContext.addPendingMember(memberContext);
-// kubernetesClusterContext.setServiceClusterCreated(true);
-// if (log.isDebugEnabled()) {
-// log.debug(String.format(
-// "Pending member added, [member] %s [kub cluster] %s",
-// memberContext.getMemberId(), kubernetesClusterId));
-// }
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug("Returned member context is null, did not add any pending members");
-// }
-// }
-// }
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug("Returned member context is null, did not add to pending members");
-// }
-// }
- } catch (Exception e) {
- log.error("Cannot create containers ", e);
- }
- }
-
- public void delegateScaleUpContainers(KubernetesClusterContext kubernetesClusterContext,
- int newReplicasCount) {
- String clusterId = kubernetesClusterContext.getClusterId();
- try {
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- // getting newly created pods' member contexts
- MemberContext[] memberContexts = ccClient.updateContainers(clusterId, newReplicasCount);
- if (null != memberContexts) {
- for (MemberContext memberContext : memberContexts) {
- if (null != memberContext) {
- kubernetesClusterContext.addPendingMember(memberContext);
- if (log.isDebugEnabled()) {
- String kubernetesClusterID = kubernetesClusterContext.getKubernetesClusterID();
- log.debug(String.format(
- "Pending member added, [member] %s [kub cluster] %s",
- memberContext.getMemberId(), kubernetesClusterID));
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Returned member context is null, did not add any pending members");
- }
- }
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Returned array of member context is null, did not add to pending members");
- }
- }
- } catch (Exception e) {
- log.error("Scaling up failed, couldn't update kubernetes controller ", e);
- }
- }
-
- public void delegateScaleDownContainers(KubernetesClusterContext kubernetesClusterContext,
- int newReplicasCount) {
- String clusterId = kubernetesClusterContext.getClusterId();
- try {
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- // getting terminated pods's member contexts
- MemberContext[] memberContexts = ccClient.updateContainers(clusterId, newReplicasCount);
- if (null != memberContexts) {
- for (MemberContext memberContext : memberContexts) {
- if (null != memberContext) {
- // we are not removing from active/pending list, it will be handled in AS event receiver
- if (log.isDebugEnabled()) {
- log.debug(String.format("Scaling down, terminated the member with id %s in cluster %s",
- memberContext.getMemberId(), memberContext.getClusterId()));
- }
- }
- }
- }
- } catch (Exception e) {
- log.error("Scaling down failed, couldn't update kubernetes controller ", e);
- }
- }
-
public void delegateTerminateContainer(String memberId) {
try {
CloudControllerClient ccClient = CloudControllerClient.getInstance();
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
index 4888828..7051290 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
@@ -24,7 +24,7 @@ import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.status.processor.StatusProcessor;
/**
@@ -61,7 +61,7 @@ public class ClusterStatusActiveProcessor extends ClusterStatusProcessor {
}
private boolean doProcess(String clusterId, String instanceId) {
- VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().
+ ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().
getClusterMonitor(clusterId);
boolean clusterActive = false;
for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getNetworkPartitionCtxts()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInactiveProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInactiveProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInactiveProcessor.java
index 039aae6..f590f05 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInactiveProcessor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInactiveProcessor.java
@@ -24,7 +24,7 @@ import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.status.processor.StatusProcessor;
/**
@@ -61,7 +61,7 @@ public class ClusterStatusInactiveProcessor extends ClusterStatusProcessor {
}
private boolean doProcess(String clusterId, String instanceId) {
- VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().
+ ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().
getClusterMonitor(clusterId);
boolean clusterInactive;
@@ -82,7 +82,7 @@ public class ClusterStatusInactiveProcessor extends ClusterStatusProcessor {
return clusterInactive;
}
- private boolean getClusterInactive(String instanceId, VMClusterMonitor monitor) {
+ private boolean getClusterInactive(String instanceId, ClusterMonitor monitor) {
boolean clusterInactive = false;
for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext :
monitor.getAllNetworkPartitionCtxts().values()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
index 015d5b8..a16d833 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
@@ -26,7 +26,7 @@ import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.status.processor.StatusProcessor;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
@@ -67,7 +67,7 @@ public class ClusterStatusTerminatedProcessor extends ClusterStatusProcessor {
}
private boolean doProcess(String clusterId, String instanceId) {
- VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().
+ ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().
getClusterMonitor(clusterId);
boolean clusterMonitorHasMembers = clusterInstanceHasMembers(monitor, instanceId);
boolean clusterTerminated = false;
@@ -116,7 +116,7 @@ public class ClusterStatusTerminatedProcessor extends ClusterStatusProcessor {
* @param monitor the cluster monitor
* @return whether has members or not
*/
- private boolean clusterInstanceHasMembers(VMClusterMonitor monitor, String instanceId) {
+ private boolean clusterInstanceHasMembers(ClusterMonitor monitor, String instanceId) {
boolean hasMember = false;
for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext :
monitor.getAllNetworkPartitionCtxts().values()) {
[4/5] stratos git commit: Removed kubernetes cluster
monitors/contexts and renamed vm cluster monitor to cluster monitor
Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 75f7a2d..a8c90b3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
@@ -172,8 +172,8 @@ public class AutoscalerHealthStatEventReceiver {
}
return;
}
- if(monitor instanceof VMClusterMonitor) {
- VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+ if(monitor instanceof ClusterMonitor) {
+ ClusterMonitor vmClusterMonitor = (ClusterMonitor) monitor;
vmClusterMonitor.handleAverageRequestsServingCapabilityEvent(averageRequestsServingCapabilityEvent);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 2a6f945..f5875ce 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
@@ -33,7 +33,7 @@ import org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce
import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
import org.apache.stratos.autoscaler.monitor.MonitorFactory;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
@@ -304,7 +304,7 @@ public class AutoscalerTopologyEventReceiver {
monitor.notifyParentMonitor(ClusterStatus.Terminated, instanceId);
//Removing the instance and instanceContext
ClusterInstance instance = (ClusterInstance) monitor.getInstance(instanceId);
- ((VMClusterContext)monitor.getClusterContext()).
+ ((ClusterContext)monitor.getClusterContext()).
getNetworkPartitionCtxt(instance.getNetworkPartitionId()).
removeInstanceContext(instanceId);
monitor.removeInstance(instanceId);
@@ -446,8 +446,8 @@ public class AutoscalerTopologyEventReceiver {
Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
if (cluster != null) {
try {
- VMClusterContext clusterContext =
- (VMClusterContext) clusterMonitor.getClusterContext();
+ ClusterContext clusterContext =
+ (ClusterContext) clusterMonitor.getClusterContext();
if (clusterContext == null) {
clusterContext = ClusterContextFactory.getVMClusterContext(instanceId, cluster,
clusterMonitor.hasScalingDependents());
@@ -470,7 +470,7 @@ public class AutoscalerTopologyEventReceiver {
+ clusterInstanceCreatedEvent.getClusterId() + " started successfully");
} else {
//monitor already started. Invoking it directly to speed up the process
- ((VMClusterMonitor)clusterMonitor).monitor();
+ ((ClusterMonitor)clusterMonitor).monitor();
}
} catch (PolicyValidationException e) {
log.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
index a8721a6..b5b1614 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java
@@ -31,7 +31,7 @@ import org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce
import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
import org.apache.stratos.autoscaler.monitor.component.GroupMonitor;
import org.apache.stratos.autoscaler.monitor.component.ParentComponentMonitor;
@@ -270,7 +270,7 @@ public class MonitorFactory {
}
//Creating the instance of the cluster
- ((VMClusterMonitor) clusterMonitor).createClusterInstance(parentInstanceIds, cluster);
+ ((ClusterMonitor) clusterMonitor).createClusterInstance(parentInstanceIds, cluster);
//add it to autoscaler context
AutoscalerContext.getInstance().addClusterMonitor(clusterMonitor);
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
new file mode 100644
index 0000000..24b8f3a
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.client.CloudControllerClient;
+import org.apache.stratos.autoscaler.context.InstanceContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
+import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
+import org.apache.stratos.autoscaler.context.cluster.ClusterContext;
+import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
+import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
+import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
+import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
+import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
+import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.ScalingOverMaxEvent;
+import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
+import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
+import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
+import org.apache.stratos.common.Properties;
+import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
+import org.apache.stratos.messaging.domain.instance.GroupInstance;
+import org.apache.stratos.messaging.domain.instance.Instance;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.*;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class ClusterMonitor extends AbstractClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(ClusterMonitor.class);
+ private Map<String, ClusterLevelNetworkPartitionContext> networkPartitionIdToClusterLevelNetworkPartitionCtxts;
+ private boolean hasPrimary;
+ private float scalingFactorBasedOnDependencies = 1.0f;
+
+
+ protected ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) {
+ super(cluster, hasScalingDependents, groupScalingEnabledSubtree);
+ this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>();
+ readConfigurations();
+ autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
+ autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
+ autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
+ autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_MIN_CHECK_DROOL_FILE);
+ autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
+
+ this.obsoleteCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+ StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
+ this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+ StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
+ this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+ StratosConstants.VM_MIN_CHECK_DROOL_FILE);
+ this.dependentScaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
+ StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
+ }
+
+ private static void terminateMember(String memberId) {
+ try {
+ CloudControllerClient.getInstance().terminate(memberId);
+
+ } catch (TerminationException e) {
+ log.error("Unable to terminate member [member id ] " + memberId, e);
+ }
+ }
+
+ private static void createClusterInstance(String serviceType, String clusterId, String alias,
+ String instanceId, String partitionId, String networkPartitionId) {
+ CloudControllerClient.getInstance().createClusterInstance(serviceType, clusterId, alias,
+ instanceId, partitionId, networkPartitionId);
+ }
+
+ public void addClusterLevelNWPartitionContext(ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) {
+ networkPartitionIdToClusterLevelNetworkPartitionCtxts.put(clusterLevelNWPartitionCtxt.getId(), clusterLevelNWPartitionCtxt);
+ }
+
+ public ClusterLevelNetworkPartitionContext getClusterLevelNWPartitionContext(String nwPartitionId) {
+ return networkPartitionIdToClusterLevelNetworkPartitionCtxts.get(nwPartitionId);
+ }
+
+ @Override
+ public void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent) {
+
+ String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = averageLoadAverageEvent.getClusterId();
+ String instanceId = averageLoadAverageEvent.getInstanceId();
+ float value = averageLoadAverageEvent.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+
+ ClusterInstanceContext clusterInstanceContext = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ if (null != clusterInstanceContext) {
+ clusterInstanceContext.setAverageLoadAverage(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!isDestroyed()) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is running.. " + this.toString());
+ }
+ monitor();
+ } catch (Exception e) {
+ log.error("Cluster monitor: Monitor failed." + this.toString(), e);
+ }
+ try {
+ Thread.sleep(getMonitorIntervalMilliseconds());
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+
+ }
+
+ private boolean isPrimaryMember(MemberContext memberContext) {
+ Properties props = AutoscalerUtil.toCommonProperties(memberContext.getProperties());
+ if (log.isDebugEnabled()) {
+ log.debug(" Properties [" + props + "] ");
+ }
+ if (props != null && props.getProperties() != null) {
+ for (Property prop : props.getProperties()) {
+ if (prop.getName().equals("PRIMARY")) {
+ if (Boolean.parseBoolean(prop.getValue())) {
+ log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
+ "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ public synchronized void monitor() {
+
+ for (ClusterLevelNetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts()) {
+
+ final Collection<InstanceContext> clusterInstanceContexts = networkPartitionContext.
+ getInstanceIdToInstanceContextMap().values();
+
+ for (final InstanceContext pInstanceContext : clusterInstanceContexts) {
+ final ClusterInstanceContext instanceContext = (ClusterInstanceContext) pInstanceContext;
+ ClusterInstance instance = (ClusterInstance) this.instanceIdToInstanceMap.
+ get(instanceContext.getId());
+
+ if ((instance.getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
+ (instance.getStatus() == ClusterStatus.Inactive && !hasStartupDependents)
+ && !this.hasFaultyMember) {
+
+ Runnable monitoringRunnable = new Runnable() {
+ @Override
+ public void run() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Monitor is running for [cluster] : " + getClusterId());
+ }
+ // store primary members in the cluster instance context
+ List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+
+ for (ClusterLevelPartitionContext partitionContext :
+ instanceContext.getPartitionCtxts()) {
+
+ // get active primary members in this cluster instance context
+ for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ }
+ }
+
+ // get pending primary members in this cluster instance context
+ for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ }
+ }
+
+ obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+ getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext);
+
+ }
+
+ getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
+ getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+ //FIXME when parent chosen the partition
+ String paritionAlgo = instanceContext.getPartitionAlgorithm();
+
+ getMinCheckKnowledgeSession().setGlobal("algorithmName",
+ paritionAlgo);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running minimum check for cluster instance %s ",
+ instanceContext.getId() + " for the cluster: " + clusterId));
+ }
+
+ minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
+ minCheckFactHandle, instanceContext);
+
+
+ //checking the status of the cluster
+ boolean rifReset = instanceContext.isRifReset();
+ boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
+ boolean loadAverageReset = instanceContext.isLoadAverageReset();
+ boolean averageRequestServedPerInstanceReset
+ = instanceContext.isAverageRequestServedPerInstanceReset();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
+ + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
+ + " [Is loadAverage Reset] : " + loadAverageReset);
+ }
+
+ if (rifReset || memoryConsumptionReset || loadAverageReset) {
+
+ log.info("Executing scaling rule as statistics have been reset");
+ ClusterContext vmClusterContext = (ClusterContext) clusterContext;
+
+ getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+ getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+ getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+ getScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+ getScaleCheckKnowledgeSession().setGlobal("algorithmName", paritionAlgo);
+ getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
+ vmClusterContext.getAutoscalePolicy());
+ getScaleCheckKnowledgeSession().setGlobal("arspiReset",
+ averageRequestServedPerInstanceReset);
+ getScaleCheckKnowledgeSession().setGlobal("primaryMembers",
+ primaryMemberListInClusterInstance);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running scale check for [cluster instance context] %s ",
+ instanceContext.getId()));
+ log.debug(" Primary members : " + primaryMemberListInClusterInstance);
+ }
+
+ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
+ , scaleCheckFactHandle, instanceContext);
+
+ instanceContext.setRifReset(false);
+ instanceContext.setMemoryConsumptionReset(false);
+ instanceContext.setLoadAverageReset(false);
+ } else if (log.isDebugEnabled()) {
+ log.debug(String.format("Scale rule will not run since the LB statistics have not " +
+ "received before this cycle for [cluster instance context] %s [cluster] %s",
+ instanceContext.getId(), clusterId));
+ }
+
+ }
+ };
+ monitoringRunnable.run();
+ }
+
+ for (final ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+ Runnable monitoringRunnable = new Runnable() {
+ @Override
+ public void run() {
+ obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
+ getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext);
+ }
+ };
+
+ monitoringRunnable.run();
+
+ }
+
+ }
+ }
+ }
+
+ @Override
+ protected void readConfigurations() {
+ XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+ int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
+ setMonitorIntervalMilliseconds(monitorInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("ClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public void destroy() {
+ getMinCheckKnowledgeSession().dispose();
+ getObsoleteCheckKnowledgeSession().dispose();
+ getScaleCheckKnowledgeSession().dispose();
+ setDestroyed(true);
+ stopScheduler();
+ if (log.isDebugEnabled()) {
+ log.debug("ClusterMonitor Drools session has been disposed. " + this.toString());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterMonitor [clusterId=" + getClusterId() +
+ ", hasPrimary=" + hasPrimary + " ]";
+ }
+
+ public boolean isHasPrimary() {
+ return hasPrimary;
+ }
+
+ public void setHasPrimary(boolean hasPrimary) {
+ this.hasPrimary = hasPrimary;
+ }
+
+ @Override
+ public void onChildStatusEvent(MonitorStatusEvent statusEvent) {
+
+ }
+
+ @Override
+ public void onParentStatusEvent(MonitorStatusEvent statusEvent) {
+ String instanceId = statusEvent.getInstanceId();
+ // send the ClusterTerminating event
+ if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+ ApplicationStatus.Terminating) {
+ if (log.isInfoEnabled()) {
+ log.info("Publishing Cluster terminating event for [application] " + appId +
+ " [cluster] " + this.getClusterId() + " [instance] " + instanceId);
+ }
+ ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), getServiceId(), getClusterId(), instanceId);
+ }
+ }
+
+ @Override
+ public void onChildScalingEvent(ScalingEvent scalingEvent) {
+
+ }
+
+ @Override
+ public void onChildScalingOverMaxEvent(ScalingOverMaxEvent scalingOverMaxEvent) {
+
+ }
+
+ @Override
+ public void onParentScalingEvent(ScalingEvent scalingEvent) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Parent scaling event received to [cluster]: " + this.getClusterId()
+ + ", [network partition]: " + scalingEvent.getNetworkPartitionId()
+ + ", [event] " + scalingEvent.getId() + ", [group instance] " + scalingEvent.getInstanceId());
+ }
+
+ this.scalingFactorBasedOnDependencies = scalingEvent.getFactor();
+ ClusterContext vmClusterContext = (ClusterContext) clusterContext;
+ String instanceId = scalingEvent.getInstanceId();
+
+ ClusterInstanceContext clusterInstanceContext =
+ getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId);
+
+
+ // store primary members in the cluster instance context
+ List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
+
+ for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) {
+
+ // get active primary members in this cluster instance context
+ for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ }
+ }
+
+ // get pending primary members in this cluster instance context
+ for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInClusterInstance.add(memberContext.getMemberId());
+ }
+ }
+ }
+
+
+ //TODO get min instance count from instance context
+ float requiredInstanceCount = clusterInstanceContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;
+ int roundedRequiredInstanceCount = getRoundedInstanceCount(requiredInstanceCount,
+ vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor());
+ clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
+
+ getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount);
+ getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm());
+ getDependentScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+
+ dependentScaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getDependentScaleCheckKnowledgeSession()
+ , dependentScaleCheckFactHandle, clusterInstanceContext);
+
+ }
+
+ public void sendClusterScalingEvent(String networkPartitionId, String instanceId, float factor) {
+
+ MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, networkPartitionId, instanceId, factor, this.id);
+ }
+
+ public void sendScalingOverMaxEvent(String networkPartitionId, String instanceId) {
+
+ MonitorStatusEventBuilder.handleScalingOverMaxEvent(this.parent, networkPartitionId, instanceId,
+ this.id);
+ }
+
+ @Override
+ public void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+ String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
+ String instanceId = gradientOfLoadAverageEvent.getInstanceId();
+ float value = gradientOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setLoadAverageGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+ String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+ String instanceId = secondDerivativeOfLoadAverageEvent.getInstanceId();
+ float value = secondDerivativeOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+ String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
+ String instanceId = averageMemoryConsumptionEvent.getInstanceId();
+ float value = averageMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
+ + "[value] %s", clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setAverageMemoryConsumption(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Network partition context is not available for :"
+ + " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+ String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+ String instanceId = gradientOfMemoryConsumptionEvent.getInstanceId();
+ float value = gradientOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setMemoryConsumptionGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+ String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+ String instanceId = secondDerivativeOfMemoryConsumptionEvent.getInstanceId();
+ float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ public void handleAverageRequestsServingCapabilityEvent(
+ AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent) {
+
+ String clusterId = averageRequestsServingCapabilityEvent.getClusterId();
+ String instanceId = averageRequestsServingCapabilityEvent.getInstanceId();
+ String networkPartitionId = averageRequestsServingCapabilityEvent.getNetworkPartitionId();
+ Float floatValue = averageRequestsServingCapabilityEvent.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue);
+
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+
+ }
+
+ @Override
+ public void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+ String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
+ String instanceId = averageRequestsInFlightEvent.getInstanceId();
+ Float servedCount = averageRequestsInFlightEvent.getServedCount();
+ Float activeInstances = averageRequestsInFlightEvent.getActiveInstances();
+ Float requestsServedPerInstance = servedCount / activeInstances;
+ if (requestsServedPerInstance.isInfinite()) {
+ requestsServedPerInstance = 0f;
+ }
+ float value = averageRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setAverageRequestsInFlight(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+ String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+ String instanceId = gradientOfRequestsInFlightEvent.getInstanceId();
+ float value = gradientOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setRequestsInFlightGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+ String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+ String instanceId = secondDerivativeOfRequestsInFlightEvent.getInstanceId();
+ float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ if (null != clusterLevelNetworkPartitionContext) {
+ clusterLevelNetworkPartitionContext.setRequestsInFlightSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+ String instanceId = memberAverageMemoryConsumptionEvent.getInstanceId();
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+ member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageMemoryConsumptionEvent.getValue();
+ memberStatsContext.setAverageMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+ String instanceId = memberGradientOfMemoryConsumptionEvent.getInstanceId();
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+ member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfMemoryConsumptionEvent.getValue();
+ memberStatsContext.setGradientOfMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+ }
+
+ @Override
+ public void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+ String instanceId = memberAverageLoadAverageEvent.getInstanceId();
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+ member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageLoadAverageEvent.getValue();
+ memberStatsContext.setAverageLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+ String instanceId = memberGradientOfLoadAverageEvent.getInstanceId();
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+ member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfLoadAverageEvent.getValue();
+ memberStatsContext.setGradientOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+ String instanceId = memberSecondDerivativeOfLoadAverageEvent.getInstanceId();
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
+ member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+ memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+ String memberId = memberFaultEvent.getMemberId();
+ String clusterId = memberFaultEvent.getClusterId();
+ Member member = getMemberByMemberId(memberId);
+ String instanceId = memberFaultEvent.getInstanceId();
+ String networkPartitionId = memberFaultEvent.getNetworkPartitionId();
+ if (null == member) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+ }
+ return;
+ }
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ ClusterInstanceContext nwPartitionCtxt;
+ nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
+ String partitionId = getPartitionOfMember(memberId);
+ ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+ if (!partitionCtxt.activeMemberExist(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Could not find the active member in partition context, "
+ + "[member] %s ", memberId));
+ }
+ return;
+ }
+
+ // move member to obsolete list
+ synchronized (this) {
+ partitionCtxt.moveMemberToObsoleteList(memberId);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Faulty member is added to obsolete list and removed from the active members list: "
+ + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ }
+
+ ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+ ClusterStatusInactiveProcessor.class.getName(), clusterId, instanceId);
+ }
+
+ @Override
+ public void handleMemberStartedEvent(
+ MemberStartedEvent memberStartedEvent) {
+
+ }
+
+ @Override
+ public void handleMemberActivatedEvent(
+ MemberActivatedEvent memberActivatedEvent) {
+
+ String instanceId = memberActivatedEvent.getInstanceId();
+ String clusterId = memberActivatedEvent.getClusterId();
+ String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+ String partitionId = memberActivatedEvent.getPartitionId();
+ String memberId = memberActivatedEvent.getMemberId();
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
+ ClusterLevelPartitionContext clusterLevelPartitionContext;
+ clusterLevelPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+ clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member stat context has been added successfully: "
+ + "[member] %s", memberId));
+ }
+ clusterLevelPartitionContext.movePendingMemberToActiveMembers(memberId);
+ ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+ ClusterStatusActiveProcessor.class.getName(), clusterId, instanceId);
+ }
+
+ @Override
+ public void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+ String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
+ String partitionId = maintenanceModeEvent.getPartitionId();
+ String memberId = maintenanceModeEvent.getMemberId();
+ String instanceId = maintenanceModeEvent.getInstanceId();
+ ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
+ instanceId);
+ ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt.
+ getPartitionCtxt(partitionId);
+ clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member has been moved as pending termination: "
+ + "[member] %s", memberId));
+ }
+ clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+ ClusterInstanceContext nwPartitionCtxt;
+ String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
+ String instanceId = memberReadyToShutdownEvent.getInstanceId();
+ nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
+
+ // start a new member in the same Partition
+ String memberId = memberReadyToShutdownEvent.getMemberId();
+ String partitionId = getPartitionOfMember(memberId);
+ ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+ try {
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
+ //move member to pending termination list
+ if (partitionCtxt.getPendingTerminationMember(memberId) != null) {
+ partitionCtxt.movePendingTerminationMemberToObsoleteMembers(memberId);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from the pending termination members " +
+ "and moved to obsolete list: [member] %s " +
+ "[partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ }
+ } else if (partitionCtxt.getObsoleteMember(memberId) != null) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is in obsolete list: [member] %s " +
+ "[partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ }
+ } //TODO else part
+
+ //when no more members are there to terminate Invoking it monitor directly
+ // to speed up the termination process
+ if (partitionCtxt.getTotalMemberCount() == 0) {
+ this.monitor();
+ }
+
+
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+
+ @Override
+ public void handleMemberTerminatedEvent(
+ MemberTerminatedEvent memberTerminatedEvent) {
+
+ String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+ String memberId = memberTerminatedEvent.getMemberId();
+ String clusterId = memberTerminatedEvent.getClusterId();
+ String instanceId = memberTerminatedEvent.getInstanceId();
+ String partitionId = memberTerminatedEvent.getPartitionId();
+ ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
+ networkPartitionId, instanceId);
+ ClusterLevelPartitionContext clusterMonitorPartitionContext =
+ clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId);
+ clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
+
+ if (clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (clusterMonitorPartitionContext.removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (clusterMonitorPartitionContext.removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from "
+ + "active members list: %s", memberId));
+ } else if (clusterMonitorPartitionContext.removeObsoleteMember(memberId)) {
+ log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and"
+ + " it is removed from obsolete members list: %s", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, "
+ + "pending and termination pending: %s", memberId));
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member stat context has been removed successfully: "
+ + "[member] %s", memberId));
+ }
+ //Checking whether the cluster state can be changed either from in_active to created/terminating to terminated
+ ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
+ ClusterStatusTerminatedProcessor.class.getName(), clusterId, instanceId);
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+
+ }
+
+ @Override
+ public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+
+ }
+
+ private String getNetworkPartitionIdByMemberId(String memberId) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId).getNetworkPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ private Member getMemberByMemberId(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ public String getPartitionOfMember(String memberId) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId).getPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void terminateAllMembers(final String instanceId, final String networkPartitionId) {
+ final ClusterMonitor monitor = this;
+ Thread memberTerminator = new Thread(new Runnable() {
+ public void run() {
+
+ ClusterInstanceContext instanceContext =
+ (ClusterInstanceContext) getAllNetworkPartitionCtxts().get(networkPartitionId)
+ .getInstanceContext(instanceId);
+ boolean allMovedToObsolete = true;
+ for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
+ if (log.isInfoEnabled()) {
+ log.info("Starting to terminate all members in cluster [" + getClusterId() + "] " +
+ "Network Partition [" + instanceContext.getNetworkPartitionId() + "], Partition [" +
+ partitionContext.getPartitionId() + "]");
+ }
+ // need to terminate active, pending and obsolete members
+ //FIXME to traverse concurrent
+ // active members
+ List<String> activeMembers = new ArrayList<String>();
+ Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator();
+ while (iterator.hasNext()) {
+ MemberContext activeMemberCtxt = iterator.next();
+ activeMembers.add(activeMemberCtxt.getMemberId());
+
+ }
+ for (String memberId : activeMembers) {
+ log.info("Sending instance cleanup event for the active member: [member-id] " + memberId);
+ partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+ InstanceNotificationPublisher.getInstance().
+ sendInstanceCleanupEventForMember(memberId);
+ }
+ Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator();
+
+ List<String> pendingMembers = new ArrayList<String>();
+ while (pendingIterator.hasNext()) {
+ MemberContext activeMemberCtxt = pendingIterator.next();
+ pendingMembers.add(activeMemberCtxt.getMemberId());
+
+ }
+ for (String memberId : pendingMembers) {
+ MemberContext pendingMemberCtxt = pendingIterator.next();
+ // pending members
+ String memeberId = pendingMemberCtxt.getMemberId();
+ if (log.isDebugEnabled()) {
+ log.debug("Moving pending member [member id] " + memeberId + " to obsolete list");
+ }
+ partitionContext.movePendingMemberToObsoleteMembers(memeberId);
+ }
+ if(partitionContext.getTotalMemberCount() == 0) {
+ allMovedToObsolete = allMovedToObsolete && true;
+ } else {
+ allMovedToObsolete = false;
+ }
+ }
+
+ if(allMovedToObsolete) {
+ monitor.monitor();
+ }
+ }
+ }, "Member Terminator - [cluster id] " + getClusterId());
+
+ memberTerminator.start();
+ }
+
+ public Map<String, ClusterLevelNetworkPartitionContext> getAllNetworkPartitionCtxts() {
+ return ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts();
+ }
+
+ public ClusterInstanceContext getClusterInstanceContext(String networkPartitionId, String instanceId) {
+ Map<String, ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContextMap =
+ ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts();
+ ClusterLevelNetworkPartitionContext networkPartitionContext =
+ clusterLevelNetworkPartitionContextMap.get(networkPartitionId);
+ ClusterInstanceContext instanceContext = (ClusterInstanceContext) networkPartitionContext.
+ getInstanceContext(instanceId);
+ return instanceContext;
+ }
+
+ public Collection<ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() {
+ return ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts().values();
+ }
+
+ public void createClusterInstance(List<String> parentInstanceIds, Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+ for (String parentInstanceId : parentInstanceIds) {
+ createInstance(parentInstanceId, cluster);
+ }
+
+ }
+
+ public boolean createInstanceOnDemand(String instanceId) {
+ Cluster cluster = TopologyManager.getTopology().getService(this.serviceType).
+ getCluster(this.clusterId);
+ try {
+ return createInstance(instanceId, cluster);
+ //TODO exception
+ } catch (PolicyValidationException e) {
+ log.error("Error while creating the cluster instance", e);
+ } catch (PartitionValidationException e) {
+ log.error("Error while creating the cluster instance", e);
+
+ }
+ return false;
+
+ }
+
+ private boolean createInstance(String parentInstanceId, Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+ Instance parentMonitorInstance = this.parent.getInstance(parentInstanceId);
+ String partitionId = null;
+ if (parentMonitorInstance instanceof GroupInstance) {
+ partitionId = parentMonitorInstance.getPartitionId();
+ }
+ if (parentMonitorInstance != null) {
+
+ ClusterInstance clusterInstance = cluster.getInstanceContexts(parentInstanceId);
+ if (clusterInstance != null) {
+
+ // Cluster instance is already there. No need to create one.
+ ClusterContext clusterContext = (ClusterContext) this.getClusterContext();
+ if (clusterContext == null) {
+
+ clusterContext = ClusterContextFactory.getVMClusterContext(clusterInstance.getInstanceId(), cluster,
+ hasScalingDependents());
+ this.setClusterContext(clusterContext);
+ }
+
+ // create VMClusterContext and then add all the instanceContexts
+ clusterContext.addInstanceContext(parentInstanceId, cluster, hasScalingDependents(),
+ groupScalingEnabledSubtree());
+ if (this.getInstance(clusterInstance.getInstanceId()) == null) {
+ this.addInstance(clusterInstance);
+ }
+ // Checking the current status of the cluster instance
+ boolean stateChanged =
+ ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain()
+ .process("", cluster.getClusterId(), clusterInstance.getInstanceId());
+ if (!stateChanged && clusterInstance.getStatus() != ClusterStatus.Created) {
+ this.notifyParentMonitor(clusterInstance.getStatus(),
+ clusterInstance.getInstanceId());
+
+ if (this.hasMonitoringStarted().compareAndSet(false, true)) {
+ this.startScheduler();
+ log.info("Monitoring task for Cluster Monitor with cluster id " +
+ cluster.getClusterId() + " started successfully");
+ }
+ }
+ } else {
+ createClusterInstance(cluster.getServiceName(), cluster.getClusterId(), null, parentInstanceId, partitionId,
+ parentMonitorInstance.getNetworkPartitionId());
+
+ }
+ return true;
+
+ } else {
+ return false;
+
+ }
+
+ }
+
+ /**
+ * Move all the members of the cluster instance to termiantion pending
+ *
+ * @param instanceId
+ */
+ public void moveMembersFromActiveToPendingTermination(String instanceId) {
+
+ //TODO take read lock for network partition context
+ //FIXME to iterate properly
+ for (ClusterLevelNetworkPartitionContext networkPartitionContext :
+ ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts().values()) {
+ ClusterInstanceContext clusterInstanceContext =
+ (ClusterInstanceContext) networkPartitionContext.getInstanceContext(instanceId);
+ if (clusterInstanceContext != null) {
+ for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) {
+ List<String> members = new ArrayList<String>();
+
+ Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator();
+ while (iterator.hasNext()) {
+ MemberContext activeMember = iterator.next();
+ members.add(activeMember.getMemberId());
+ }
+
+ for (String memberId : members) {
+ partitionContext.moveActiveMemberToTerminationPendingMembers(
+ memberId);
+ }
+ List<String> pendingMembers = new ArrayList<String>();
+
+ Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator();
+ while (pendingIterator.hasNext()) {
+ MemberContext activeMember = pendingIterator.next();
+ pendingMembers.add(activeMember.getMemberId());
+ }
+ for (String memberId : members) {
+ // pending members
+ if (log.isDebugEnabled()) {
+ log.debug("Moving pending member [member id] " + memberId + " the obsolete list");
+ }
+ partitionContext.movePendingMemberToObsoleteMembers(memberId);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
index e5fa765..f870e11 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -21,14 +21,9 @@ package org.apache.stratos.autoscaler.monitor.cluster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
/*
* Factory class for creating cluster monitors.
@@ -47,19 +42,12 @@ public class ClusterMonitorFactory {
boolean groupScalingEnabledSubtree)
throws PolicyValidationException, PartitionValidationException {
- AbstractClusterMonitor clusterMonitor;
-// if (cluster.isKubernetesCluster()) {
-// clusterMonitor = getDockerServiceClusterMonitor(cluster);
-////// } else if (cluster.isLbCluster()) {
-////// clusterMonitor = getVMLbClusterMonitor(cluster);
-// } else {
- clusterMonitor = getVMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
-// }
-
+ AbstractClusterMonitor clusterMonitor =
+ getVMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
return clusterMonitor;
}
- private static VMClusterMonitor getVMClusterMonitor(Cluster cluster, boolean hasScalingDependents,
+ private static ClusterMonitor getVMClusterMonitor(Cluster cluster, boolean hasScalingDependents,
boolean groupScalingEnabledSubtree)
throws PolicyValidationException, PartitionValidationException {
@@ -67,7 +55,7 @@ public class ClusterMonitorFactory {
return null;
}
- VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
+ ClusterMonitor clusterMonitor = new ClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree);
// find lb reference type
java.util.Properties props = cluster.getProperties();
@@ -90,121 +78,4 @@ public class ClusterMonitorFactory {
log.info("VMClusterMonitor created: " + clusterMonitor.toString());
return clusterMonitor;
}
-//
-// private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
-// throws PolicyValidationException, PartitionValidationException {
-//
-// if (null == cluster) {
-// return null;
-// }
-//
-// VMLbClusterMonitor clusterMonitor =
-// new VMLbClusterMonitor(cluster.getServiceName(), cluster.getClusterId());
-// clusterMonitor.notifyParentMonitor(ClusterStatus.Created);
-//
-// log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
-// return clusterMonitor;
-// }
-
- /**
- * @param cluster - the cluster which needs to be monitored
- * @return - the cluster monitor
- */
- private static KubernetesClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
- throws PolicyValidationException {
-
- if (null == cluster) {
- return null;
- }
-
-// String autoscalePolicyName = cluster.getAutoscalePolicyName();
-//
-// AutoscalePolicy autoscalePolicy =
-// PolicyManager.getInstance()
-// .getAutoscalePolicy(autoscalePolicyName);
-// if (log.isDebugEnabled()) {
-// log.debug("Autoscaling policy name: " + autoscalePolicyName);
-// }
-//
-// AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-//
-// if (policy == null) {
-// String msg = String.format("Autoscaling policy is null: [policy-name] %s", autoscalePolicyName);
-// log.error(msg);
-// throw new PolicyValidationException(msg);
-// }
-//
- java.util.Properties properties = cluster.getProperties();
- if (properties == null) {
- String message = String.format("Properties not found in kubernetes cluster: [cluster-id] %s",
- cluster.getClusterId());
- log.error(message);
- throw new RuntimeException(message);
- }
-// String minReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
-// int minReplicas = 0;
-// if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
-// minReplicas = Integer.parseInt(minReplicasProperty);
-// }
-//
-// int maxReplicas = 0;
-// String maxReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
-// if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
-// maxReplicas = Integer.parseInt(maxReplicasProperty);
-// }
-//
-// String kubernetesHostClusterID = properties.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
-// KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
-// cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, minReplicas, maxReplicas);
-
-
-// KubernetesClusterMonitor dockerClusterMonitor = new KubernetesClusterMonitor(cluster);
-
- //populate the members after restarting
-// for (Member member : cluster.getMembers()) {
-// String memberId = member.getMemberId();
-// String clusterId = member.getClusterId();
-// MemberContext memberContext = new MemberContext();
-// memberContext.setMemberId(memberId);
-// memberContext.setClusterId(clusterId);
-// memberContext.setInitTime(member.getInitTime());
-//
-// // if there is at least one member in the topology, that means service has been created already
-// // this is to avoid calling startContainer() method again
-// //kubernetesClusterCtxt.setServiceClusterCreated(true);
-//
-// if (MemberStatus.Activated.equals(member.getStatus())) {
-// if (log.isDebugEnabled()) {
-// String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
-// log.debug(msg);
-// }
-// ((VMClusterContext) dockerClusterMonitor.getClusterContext()).addActiveMember(memberContext);
-// } else if (MemberStatus.Created.equals(member.getStatus())
-// || MemberStatus.Starting.equals(member.getStatus())) {
-// if (log.isDebugEnabled()) {
-// String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
-// log.debug(msg);
-// }
-// dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
-// }
-//
-// //kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
-// if (log.isInfoEnabled()) {
-// log.info(String.format("Member stat context has been added: [member] %s", memberId));
-// }
-// }
-//
-// // find lb reference type
-// if (properties.containsKey(StratosConstants.LOAD_BALANCER_REF)) {
-// String value = properties.getProperty(StratosConstants.LOAD_BALANCER_REF);
-// dockerClusterMonitor.setLbReferenceType(value);
-// if (log.isDebugEnabled()) {
-// log.debug("Set the lb reference type: " + value);
-// }
-// }
-
-// log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
-// return dockerClusterMonitor;
- return null;
- }
}
[2/5] stratos git commit: Removed kubernetes cluster
monitors/contexts and renamed vm cluster monitor to cluster monitor
Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
deleted file mode 100644
index 1ef3a58..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
+++ /dev/null
@@ -1,1246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.client.CloudControllerClient;
-import org.apache.stratos.autoscaler.context.InstanceContext;
-import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
-import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
-import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
-import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
-import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
-import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
-import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
-import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
-import org.apache.stratos.autoscaler.monitor.events.ScalingOverMaxEvent;
-import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor;
-import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor;
-import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.AutoscalerUtil;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.Properties;
-import org.apache.stratos.common.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
-import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.domain.instance.Instance;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.*;
-import org.apache.stratos.messaging.event.topology.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-import java.util.*;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMClusterMonitor extends AbstractClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
- private Map<String, ClusterLevelNetworkPartitionContext> networkPartitionIdToClusterLevelNetworkPartitionCtxts;
- private boolean hasPrimary;
- private float scalingFactorBasedOnDependencies = 1.0f;
-
-
- protected VMClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) {
- super(cluster, hasScalingDependents, groupScalingEnabledSubtree);
- this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>();
- readConfigurations();
- autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_MIN_CHECK_DROOL_FILE);
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
-
- this.obsoleteCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE);
- this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.VM_SCALE_CHECK_DROOL_FILE);
- this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.VM_MIN_CHECK_DROOL_FILE);
- this.dependentScaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
- }
-
- private static void terminateMember(String memberId) {
- try {
- CloudControllerClient.getInstance().terminate(memberId);
-
- } catch (TerminationException e) {
- log.error("Unable to terminate member [member id ] " + memberId, e);
- }
- }
-
- private static void createClusterInstance(String serviceType, String clusterId, String alias,
- String instanceId, String partitionId, String networkPartitionId) {
- CloudControllerClient.getInstance().createClusterInstance(serviceType, clusterId, alias,
- instanceId, partitionId, networkPartitionId);
- }
-
- public void addClusterLevelNWPartitionContext(ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) {
- networkPartitionIdToClusterLevelNetworkPartitionCtxts.put(clusterLevelNWPartitionCtxt.getId(), clusterLevelNWPartitionCtxt);
- }
-
- public ClusterLevelNetworkPartitionContext getClusterLevelNWPartitionContext(String nwPartitionId) {
- return networkPartitionIdToClusterLevelNetworkPartitionCtxts.get(nwPartitionId);
- }
-
- @Override
- public void handleAverageLoadAverageEvent(
- AverageLoadAverageEvent averageLoadAverageEvent) {
-
- String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
- String clusterId = averageLoadAverageEvent.getClusterId();
- String instanceId = averageLoadAverageEvent.getInstanceId();
- float value = averageLoadAverageEvent.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
-
- ClusterInstanceContext clusterInstanceContext = getClusterInstanceContext(networkPartitionId,
- instanceId);
- if (null != clusterInstanceContext) {
- clusterInstanceContext.setAverageLoadAverage(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void run() {
- while (!isDestroyed()) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- monitor();
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed." + this.toString(), e);
- }
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
-
-
- }
-
- private boolean isPrimaryMember(MemberContext memberContext) {
- Properties props = AutoscalerUtil.toCommonProperties(memberContext.getProperties());
- if (log.isDebugEnabled()) {
- log.debug(" Properties [" + props + "] ");
- }
- if (props != null && props.getProperties() != null) {
- for (Property prop : props.getProperties()) {
- if (prop.getName().equals("PRIMARY")) {
- if (Boolean.parseBoolean(prop.getValue())) {
- log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
- "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
- return true;
- }
- }
- }
- }
- return false;
- }
-
- public synchronized void monitor() {
-
- for (ClusterLevelNetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts()) {
-
- final Collection<InstanceContext> clusterInstanceContexts = networkPartitionContext.
- getInstanceIdToInstanceContextMap().values();
-
- for (final InstanceContext pInstanceContext : clusterInstanceContexts) {
- final ClusterInstanceContext instanceContext = (ClusterInstanceContext) pInstanceContext;
- ClusterInstance instance = (ClusterInstance) this.instanceIdToInstanceMap.
- get(instanceContext.getId());
-
- if ((instance.getStatus().getCode() <= ClusterStatus.Active.getCode()) ||
- (instance.getStatus() == ClusterStatus.Inactive && !hasStartupDependents)
- && !this.hasFaultyMember) {
-
- Runnable monitoringRunnable = new Runnable() {
- @Override
- public void run() {
-
- if (log.isDebugEnabled()) {
- log.debug("Monitor is running for [cluster] : " + getClusterId());
- }
- // store primary members in the cluster instance context
- List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
-
- for (ClusterLevelPartitionContext partitionContext :
- instanceContext.getPartitionCtxts()) {
-
- // get active primary members in this cluster instance context
- for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInClusterInstance.add(memberContext.getMemberId());
- }
- }
-
- // get pending primary members in this cluster instance context
- for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInClusterInstance.add(memberContext.getMemberId());
- }
- }
-
- obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
- getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext);
-
- }
-
- getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance);
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
- //FIXME when parent chosen the partition
- String paritionAlgo = instanceContext.getPartitionAlgorithm();
-
- getMinCheckKnowledgeSession().setGlobal("algorithmName",
- paritionAlgo);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for cluster instance %s ",
- instanceContext.getId() + " for the cluster: " + clusterId));
- }
-
- minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
- minCheckFactHandle, instanceContext);
-
-
- //checking the status of the cluster
- boolean rifReset = instanceContext.isRifReset();
- boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset();
- boolean loadAverageReset = instanceContext.isLoadAverageReset();
- boolean averageRequestServedPerInstanceReset
- = instanceContext.isAverageRequestServedPerInstanceReset();
-
- if (log.isDebugEnabled()) {
- log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset
- + " [Is memoryConsumption Reset] : " + memoryConsumptionReset
- + " [Is loadAverage Reset] : " + loadAverageReset);
- }
-
- if (rifReset || memoryConsumptionReset || loadAverageReset) {
-
- log.info("Executing scaling rule as statistics have been reset");
- VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
-
- getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
- getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
- getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
- getScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
- getScaleCheckKnowledgeSession().setGlobal("algorithmName", paritionAlgo);
- getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy",
- vmClusterContext.getAutoscalePolicy());
- getScaleCheckKnowledgeSession().setGlobal("arspiReset",
- averageRequestServedPerInstanceReset);
- getScaleCheckKnowledgeSession().setGlobal("primaryMembers",
- primaryMemberListInClusterInstance);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running scale check for [cluster instance context] %s ",
- instanceContext.getId()));
- log.debug(" Primary members : " + primaryMemberListInClusterInstance);
- }
-
- scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession()
- , scaleCheckFactHandle, instanceContext);
-
- instanceContext.setRifReset(false);
- instanceContext.setMemoryConsumptionReset(false);
- instanceContext.setLoadAverageReset(false);
- } else if (log.isDebugEnabled()) {
- log.debug(String.format("Scale rule will not run since the LB statistics have not " +
- "received before this cycle for [cluster instance context] %s [cluster] %s",
- instanceContext.getId(), clusterId));
- }
-
- }
- };
- monitoringRunnable.run();
- }
-
- for (final ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
- Runnable monitoringRunnable = new Runnable() {
- @Override
- public void run() {
- obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
- getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext);
- }
- };
-
- monitoringRunnable.run();
-
- }
-
- }
- }
- }
-
- @Override
- protected void readConfigurations() {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
- setMonitorIntervalMilliseconds(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("VMClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getObsoleteCheckKnowledgeSession().dispose();
- getScaleCheckKnowledgeSession().dispose();
- setDestroyed(true);
- stopScheduler();
- if (log.isDebugEnabled()) {
- log.debug("VMClusterMonitor Drools session has been disposed. " + this.toString());
- }
- }
-
- @Override
- public String toString() {
- return "VMClusterMonitor [clusterId=" + getClusterId() +
- ", hasPrimary=" + hasPrimary + " ]";
- }
-
- public boolean isHasPrimary() {
- return hasPrimary;
- }
-
- public void setHasPrimary(boolean hasPrimary) {
- this.hasPrimary = hasPrimary;
- }
-
- @Override
- public void onChildStatusEvent(MonitorStatusEvent statusEvent) {
-
- }
-
- @Override
- public void onParentStatusEvent(MonitorStatusEvent statusEvent) {
- String instanceId = statusEvent.getInstanceId();
- // send the ClusterTerminating event
- if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
- ApplicationStatus.Terminating) {
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster terminating event for [application] " + appId +
- " [cluster] " + this.getClusterId() + " [instance] " + instanceId);
- }
- ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), getServiceId(), getClusterId(), instanceId);
- }
- }
-
- @Override
- public void onChildScalingEvent(ScalingEvent scalingEvent) {
-
- }
-
- @Override
- public void onChildScalingOverMaxEvent(ScalingOverMaxEvent scalingOverMaxEvent) {
-
- }
-
- @Override
- public void onParentScalingEvent(ScalingEvent scalingEvent) {
-
- if (log.isDebugEnabled()) {
- log.debug("Parent scaling event received to [cluster]: " + this.getClusterId()
- + ", [network partition]: " + scalingEvent.getNetworkPartitionId()
- + ", [event] " + scalingEvent.getId() + ", [group instance] " + scalingEvent.getInstanceId());
- }
-
- this.scalingFactorBasedOnDependencies = scalingEvent.getFactor();
- VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
- String instanceId = scalingEvent.getInstanceId();
-
- ClusterInstanceContext clusterInstanceContext =
- getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId);
-
-
- // store primary members in the cluster instance context
- List<String> primaryMemberListInClusterInstance = new ArrayList<String>();
-
- for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) {
-
- // get active primary members in this cluster instance context
- for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInClusterInstance.add(memberContext.getMemberId());
- }
- }
-
- // get pending primary members in this cluster instance context
- for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInClusterInstance.add(memberContext.getMemberId());
- }
- }
- }
-
-
- //TODO get min instance count from instance context
- float requiredInstanceCount = clusterInstanceContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;
- int roundedRequiredInstanceCount = getRoundedInstanceCount(requiredInstanceCount,
- vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor());
- clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
-
- getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount);
- getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm());
- getDependentScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
-
- dependentScaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getDependentScaleCheckKnowledgeSession()
- , dependentScaleCheckFactHandle, clusterInstanceContext);
-
- }
-
- public void sendClusterScalingEvent(String networkPartitionId, String instanceId, float factor) {
-
- MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, networkPartitionId, instanceId, factor, this.id);
- }
-
- public void sendScalingOverMaxEvent(String networkPartitionId, String instanceId) {
-
- MonitorStatusEventBuilder.handleScalingOverMaxEvent(this.parent, networkPartitionId, instanceId,
- this.id);
- }
-
- @Override
- public void handleGradientOfLoadAverageEvent(
- GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
- String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
- String clusterId = gradientOfLoadAverageEvent.getClusterId();
- String instanceId = gradientOfLoadAverageEvent.getInstanceId();
- float value = gradientOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setLoadAverageGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfLoadAverageEvent(
- SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
- String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
- String instanceId = secondDerivativeOfLoadAverageEvent.getInstanceId();
- float value = secondDerivativeOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleAverageMemoryConsumptionEvent(
- AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
- String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = averageMemoryConsumptionEvent.getClusterId();
- String instanceId = averageMemoryConsumptionEvent.getInstanceId();
- float value = averageMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
- + "[value] %s", clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setAverageMemoryConsumption(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Network partition context is not available for :"
- + " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleGradientOfMemoryConsumptionEvent(
- GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
- String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
- String instanceId = gradientOfMemoryConsumptionEvent.getInstanceId();
- float value = gradientOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setMemoryConsumptionGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfMemoryConsumptionEvent(
- SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
- String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
- String instanceId = secondDerivativeOfMemoryConsumptionEvent.getInstanceId();
- float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setMemoryConsumptionSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- public void handleAverageRequestsServingCapabilityEvent(
- AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent) {
-
- String clusterId = averageRequestsServingCapabilityEvent.getClusterId();
- String instanceId = averageRequestsServingCapabilityEvent.getInstanceId();
- String networkPartitionId = averageRequestsServingCapabilityEvent.getNetworkPartitionId();
- Float floatValue = averageRequestsServingCapabilityEvent.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
-
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue);
-
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
-
- }
-
- @Override
- public void handleAverageRequestsInFlightEvent(
- AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
- String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = averageRequestsInFlightEvent.getClusterId();
- String instanceId = averageRequestsInFlightEvent.getInstanceId();
- Float servedCount = averageRequestsInFlightEvent.getServedCount();
- Float activeInstances = averageRequestsInFlightEvent.getActiveInstances();
- Float requestsServedPerInstance = servedCount / activeInstances;
- if (requestsServedPerInstance.isInfinite()) {
- requestsServedPerInstance = 0f;
- }
- float value = averageRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setAverageRequestsInFlight(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleGradientOfRequestsInFlightEvent(
- GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
- String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
- String instanceId = gradientOfRequestsInFlightEvent.getInstanceId();
- float value = gradientOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setRequestsInFlightGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfRequestsInFlightEvent(
- SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
- String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
- String instanceId = secondDerivativeOfRequestsInFlightEvent.getInstanceId();
- float value = secondDerivativeOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second derivative of Rif event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- if (null != clusterLevelNetworkPartitionContext) {
- clusterLevelNetworkPartitionContext.setRequestsInFlightSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleMemberAverageMemoryConsumptionEvent(
- MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
- String instanceId = memberAverageMemoryConsumptionEvent.getInstanceId();
- String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
- instanceId);
- ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
- member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageMemoryConsumptionEvent.getValue();
- memberStatsContext.setAverageMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberGradientOfMemoryConsumptionEvent(
- MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
- String instanceId = memberGradientOfMemoryConsumptionEvent.getInstanceId();
- String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
- instanceId);
- ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
- member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfMemoryConsumptionEvent.getValue();
- memberStatsContext.setGradientOfMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
- MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
- }
-
- @Override
- public void handleMemberAverageLoadAverageEvent(
- MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
- String instanceId = memberAverageLoadAverageEvent.getInstanceId();
- String memberId = memberAverageLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
- instanceId);
- ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
- member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageLoadAverageEvent.getValue();
- memberStatsContext.setAverageLoadAverage(value);
- }
-
- @Override
- public void handleMemberGradientOfLoadAverageEvent(
- MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
- String instanceId = memberGradientOfLoadAverageEvent.getInstanceId();
- String memberId = memberGradientOfLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
- instanceId);
- ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
- member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfLoadAverageEvent.getValue();
- memberStatsContext.setGradientOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfLoadAverageEvent(
- MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
- String instanceId = memberSecondDerivativeOfLoadAverageEvent.getInstanceId();
- String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
-
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
- instanceId);
- ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(
- member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
- memberStatsContext.setSecondDerivativeOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-
- String memberId = memberFaultEvent.getMemberId();
- String clusterId = memberFaultEvent.getClusterId();
- Member member = getMemberByMemberId(memberId);
- String instanceId = memberFaultEvent.getInstanceId();
- String networkPartitionId = memberFaultEvent.getNetworkPartitionId();
- if (null == member) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
- }
- return;
- }
- if (!member.isActive()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the member fault health stat", memberId));
- }
- return;
- }
-
- ClusterInstanceContext nwPartitionCtxt;
- nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
- String partitionId = getPartitionOfMember(memberId);
- ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
- if (!partitionCtxt.activeMemberExist(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Could not find the active member in partition context, "
- + "[member] %s ", memberId));
- }
- return;
- }
-
- // move member to obsolete list
- synchronized (this) {
- partitionCtxt.moveMemberToObsoleteList(memberId);
- }
- if (log.isDebugEnabled()) {
- log.debug(String.format("Faulty member is added to obsolete list and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
-
- ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
- ClusterStatusInactiveProcessor.class.getName(), clusterId, instanceId);
- }
-
- @Override
- public void handleMemberStartedEvent(
- MemberStartedEvent memberStartedEvent) {
-
- }
-
- @Override
- public void handleMemberActivatedEvent(
- MemberActivatedEvent memberActivatedEvent) {
-
- String instanceId = memberActivatedEvent.getInstanceId();
- String clusterId = memberActivatedEvent.getClusterId();
- String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
- String partitionId = memberActivatedEvent.getPartitionId();
- String memberId = memberActivatedEvent.getMemberId();
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
- ClusterLevelPartitionContext clusterLevelPartitionContext;
- clusterLevelPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
- clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- clusterLevelPartitionContext.movePendingMemberToActiveMembers(memberId);
- ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
- ClusterStatusActiveProcessor.class.getName(), clusterId, instanceId);
- }
-
- @Override
- public void handleMemberMaintenanceModeEvent(
- MemberMaintenanceModeEvent maintenanceModeEvent) {
-
- String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
- String partitionId = maintenanceModeEvent.getPartitionId();
- String memberId = maintenanceModeEvent.getMemberId();
- String instanceId = maintenanceModeEvent.getInstanceId();
- ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId,
- instanceId);
- ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt.
- getPartitionCtxt(partitionId);
- clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member has been moved as pending termination: "
- + "[member] %s", memberId));
- }
- clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
- }
-
- @Override
- public void handleMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
- ClusterInstanceContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- String instanceId = memberReadyToShutdownEvent.getInstanceId();
- nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId);
-
- // start a new member in the same Partition
- String memberId = memberReadyToShutdownEvent.getMemberId();
- String partitionId = getPartitionOfMember(memberId);
- ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
- try {
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- //move member to pending termination list
- if (partitionCtxt.getPendingTerminationMember(memberId) != null) {
- partitionCtxt.movePendingTerminationMemberToObsoleteMembers(memberId);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from the pending termination members " +
- "and moved to obsolete list: [member] %s " +
- "[partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } else if (partitionCtxt.getObsoleteMember(memberId) != null) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is in obsolete list: [member] %s " +
- "[partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- } //TODO else part
-
- //when no more members are there to terminate Invoking it monitor directly
- // to speed up the termination process
- if (partitionCtxt.getTotalMemberCount() == 0) {
- this.monitor();
- }
-
-
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
-
- @Override
- public void handleMemberTerminatedEvent(
- MemberTerminatedEvent memberTerminatedEvent) {
-
- String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
- String memberId = memberTerminatedEvent.getMemberId();
- String clusterId = memberTerminatedEvent.getClusterId();
- String instanceId = memberTerminatedEvent.getInstanceId();
- String partitionId = memberTerminatedEvent.getPartitionId();
- ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(
- networkPartitionId, instanceId);
- ClusterLevelPartitionContext clusterMonitorPartitionContext =
- clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId);
- clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
-
- if (clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from termination pending members list: "
- + "[member] %s", memberId));
- }
- } else if (clusterMonitorPartitionContext.removePendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from pending members list: "
- + "[member] %s", memberId));
- }
- } else if (clusterMonitorPartitionContext.removeActiveMemberById(memberId)) {
- log.warn(String.format("Member is in the wrong list and it is removed from "
- + "active members list: %s", memberId));
- } else if (clusterMonitorPartitionContext.removeObsoleteMember(memberId)) {
- log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and"
- + " it is removed from obsolete members list: %s", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, "
- + "pending and termination pending: %s", memberId));
- }
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member stat context has been removed successfully: "
- + "[member] %s", memberId));
- }
- //Checking whether the cluster state can be changed either from in_active to created/terminating to terminated
- ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process(
- ClusterStatusTerminatedProcessor.class.getName(), clusterId, instanceId);
- }
-
- @Override
- public void handleClusterRemovedEvent(
- ClusterRemovedEvent clusterRemovedEvent) {
-
- }
-
- @Override
- public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-
- }
-
- private String getNetworkPartitionIdByMemberId(String memberId) {
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId).getNetworkPartitionId();
- }
- }
- }
- return null;
- }
-
- private Member getMemberByMemberId(String memberId) {
- try {
- TopologyManager.acquireReadLock();
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId);
- }
- }
- }
- return null;
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- public String getPartitionOfMember(String memberId) {
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId).getPartitionId();
- }
- }
- }
- return null;
- }
-
- @Override
- public void terminateAllMembers(final String instanceId, final String networkPartitionId) {
- final VMClusterMonitor monitor = this;
- Thread memberTerminator = new Thread(new Runnable() {
- public void run() {
-
- ClusterInstanceContext instanceContext =
- (ClusterInstanceContext) getAllNetworkPartitionCtxts().get(networkPartitionId)
- .getInstanceContext(instanceId);
- boolean allMovedToObsolete = true;
- for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) {
- if (log.isInfoEnabled()) {
- log.info("Starting to terminate all members in cluster [" + getClusterId() + "] " +
- "Network Partition [" + instanceContext.getNetworkPartitionId() + "], Partition [" +
- partitionContext.getPartitionId() + "]");
- }
- // need to terminate active, pending and obsolete members
- //FIXME to traverse concurrent
- // active members
- List<String> activeMembers = new ArrayList<String>();
- Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator();
- while (iterator.hasNext()) {
- MemberContext activeMemberCtxt = iterator.next();
- activeMembers.add(activeMemberCtxt.getMemberId());
-
- }
- for (String memberId : activeMembers) {
- log.info("Sending instance cleanup event for the active member: [member-id] " + memberId);
- partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
- InstanceNotificationPublisher.getInstance().
- sendInstanceCleanupEventForMember(memberId);
- }
- Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator();
-
- List<String> pendingMembers = new ArrayList<String>();
- while (pendingIterator.hasNext()) {
- MemberContext activeMemberCtxt = pendingIterator.next();
- pendingMembers.add(activeMemberCtxt.getMemberId());
-
- }
- for (String memberId : pendingMembers) {
- MemberContext pendingMemberCtxt = pendingIterator.next();
- // pending members
- String memeberId = pendingMemberCtxt.getMemberId();
- if (log.isDebugEnabled()) {
- log.debug("Moving pending member [member id] " + memeberId + " to obsolete list");
- }
- partitionContext.movePendingMemberToObsoleteMembers(memeberId);
- }
- if(partitionContext.getTotalMemberCount() == 0) {
- allMovedToObsolete = allMovedToObsolete && true;
- } else {
- allMovedToObsolete = false;
- }
- }
-
- if(allMovedToObsolete) {
- monitor.monitor();
- }
- }
- }, "Member Terminator - [cluster id] " + getClusterId());
-
- memberTerminator.start();
- }
-
- public Map<String, ClusterLevelNetworkPartitionContext> getAllNetworkPartitionCtxts() {
- return ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts();
- }
-
- public ClusterInstanceContext getClusterInstanceContext(String networkPartitionId, String instanceId) {
- Map<String, ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContextMap =
- ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts();
- ClusterLevelNetworkPartitionContext networkPartitionContext =
- clusterLevelNetworkPartitionContextMap.get(networkPartitionId);
- ClusterInstanceContext instanceContext = (ClusterInstanceContext) networkPartitionContext.
- getInstanceContext(instanceId);
- return instanceContext;
- }
-
- public Collection<ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() {
- return ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts().values();
- }
-
- public void createClusterInstance(List<String> parentInstanceIds, Cluster cluster)
- throws PolicyValidationException, PartitionValidationException {
- for (String parentInstanceId : parentInstanceIds) {
- createInstance(parentInstanceId, cluster);
- }
-
- }
-
- public boolean createInstanceOnDemand(String instanceId) {
- Cluster cluster = TopologyManager.getTopology().getService(this.serviceType).
- getCluster(this.clusterId);
- try {
- return createInstance(instanceId, cluster);
- //TODO exception
- } catch (PolicyValidationException e) {
- log.error("Error while creating the cluster instance", e);
- } catch (PartitionValidationException e) {
- log.error("Error while creating the cluster instance", e);
-
- }
- return false;
-
- }
-
- private boolean createInstance(String parentInstanceId, Cluster cluster)
- throws PolicyValidationException, PartitionValidationException {
- Instance parentMonitorInstance = this.parent.getInstance(parentInstanceId);
- String partitionId = null;
- if (parentMonitorInstance instanceof GroupInstance) {
- partitionId = parentMonitorInstance.getPartitionId();
- }
- if (parentMonitorInstance != null) {
-
- ClusterInstance clusterInstance = cluster.getInstanceContexts(parentInstanceId);
- if (clusterInstance != null) {
-
- // Cluster instance is already there. No need to create one.
- VMClusterContext clusterContext = (VMClusterContext) this.getClusterContext();
- if (clusterContext == null) {
-
- clusterContext = ClusterContextFactory.getVMClusterContext(clusterInstance.getInstanceId(), cluster,
- hasScalingDependents());
- this.setClusterContext(clusterContext);
- }
-
- // create VMClusterContext and then add all the instanceContexts
- clusterContext.addInstanceContext(parentInstanceId, cluster, hasScalingDependents(),
- groupScalingEnabledSubtree());
- if (this.getInstance(clusterInstance.getInstanceId()) == null) {
- this.addInstance(clusterInstance);
- }
- // Checking the current status of the cluster instance
- boolean stateChanged =
- ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain()
- .process("", cluster.getClusterId(), clusterInstance.getInstanceId());
- if (!stateChanged && clusterInstance.getStatus() != ClusterStatus.Created) {
- this.notifyParentMonitor(clusterInstance.getStatus(),
- clusterInstance.getInstanceId());
-
- if (this.hasMonitoringStarted().compareAndSet(false, true)) {
- this.startScheduler();
- log.info("Monitoring task for Cluster Monitor with cluster id " +
- cluster.getClusterId() + " started successfully");
- }
- }
- } else {
- createClusterInstance(cluster.getServiceName(), cluster.getClusterId(), null, parentInstanceId, partitionId,
- parentMonitorInstance.getNetworkPartitionId());
-
- }
- return true;
-
- } else {
- return false;
-
- }
-
- }
-
- /**
- * Move all the members of the cluster instance to termiantion pending
- *
- * @param instanceId
- */
- public void moveMembersFromActiveToPendingTermination(String instanceId) {
-
- //TODO take read lock for network partition context
- //FIXME to iterate properly
- for (ClusterLevelNetworkPartitionContext networkPartitionContext :
- ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts().values()) {
- ClusterInstanceContext clusterInstanceContext =
- (ClusterInstanceContext) networkPartitionContext.getInstanceContext(instanceId);
- if (clusterInstanceContext != null) {
- for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) {
- List<String> members = new ArrayList<String>();
-
- Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator();
- while (iterator.hasNext()) {
- MemberContext activeMember = iterator.next();
- members.add(activeMember.getMemberId());
- }
-
- for (String memberId : members) {
- partitionContext.moveActiveMemberToTerminationPendingMembers(
- memberId);
- }
- List<String> pendingMembers = new ArrayList<String>();
-
- Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator();
- while (pendingIterator.hasNext()) {
- MemberContext activeMember = pendingIterator.next();
- pendingMembers.add(activeMember.getMemberId());
- }
- for (String memberId : members) {
- // pending members
- if (log.isDebugEnabled()) {
- log.debug("Moving pending member [member id] " + memberId + " the obsolete list");
- }
- partitionContext.movePendingMemberToObsoleteMembers(memberId);
- }
- }
- }
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
deleted file mode 100644
index a69d3bd..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied. See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// */
-//package org.apache.stratos.autoscaler.monitor.cluster;
-//
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import org.apache.commons.configuration.XMLConfiguration;
-//import org.apache.commons.logging.Log;
-//import org.apache.commons.logging.LogFactory;
-//import org.apache.stratos.autoscaler.*;
-//import org.apache.stratos.autoscaler.context.cluster.AbstractClusterContext;
-//import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
-//import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
-//import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
-//import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.PartitionManager;
-//import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
-//import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
-//import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-//import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-//import org.apache.stratos.autoscaler.util.ConfUtil;
-//import org.apache.stratos.common.constants.StratosConstants;
-//import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-//
-///**
-// * Is responsible for monitoring a service cluster. This runs periodically
-// * and perform minimum instance check and scaling check using the underlying
-// * rules engine.
-// */
-//public class VMLbClusterMonitor extends VMClusterMonitor {
-//
-// private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
-//
-// public VMLbClusterMonitor(String serviceType, String clusterId) {
-// super(serviceType, clusterId, new AutoscalerRuleEvaluator( StratosConstants.VM_MIN_CHECK_DROOL_FILE,
-// StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE, StratosConstants.VM_SCALE_CHECK_DROOL_FILE));
-// readConfigurations();
-// }
-//
-// @Override
-// public void run() {
-//
-// while (!isDestroyed()) {
-// if (log.isDebugEnabled()) {
-// log.debug("Cluster monitor is running.. " + this.toString());
-// }
-// try {
-// //TODO ******** if (!ClusterStatus.Inactive.equals(getStatus())) {
-// monitor();
-// /*} else {
-// if (log.isDebugEnabled()) {
-// log.debug("LB Cluster monitor is suspended as the cluster is in " +
-// ClusterStatus.Inactive + " mode......");
-// }
-// }*/
-// } catch (Exception e) {
-// log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
-// }
-// try {
-// Thread.sleep(getMonitorIntervalMilliseconds());
-// } catch (InterruptedException ignore) {
-// }
-// }
-// }
-//
-// @Override
-// protected void monitor() {
-//
-// Set<Map.Entry<String, AbstractClusterContext>> instanceIdToClusterCtxtEntries = instanceIdToClusterContextMap.entrySet();
-// for (final Map.Entry<String, AbstractClusterContext> instanceIdToClusterCtxtEntry : instanceIdToClusterCtxtEntries) {
-// Runnable monitoringRunnable = new Runnable() {
-//
-// @Override
-// public void run() {
-// for (ClusterLevelNetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts(instanceIdToClusterCtxtEntry.getKey()).values()) {
-//
-// // minimum check per partition
-// for (ClusterLevelPartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
-// .values()) {
-//
-// if (partitionContext != null) {
-// getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-// getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
-//
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Running minimum check for partition %s ",
-// partitionContext.getPartitionId()));
-// }
-//
-// minCheckFactHandle =
-// AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(),
-// minCheckFactHandle,
-// partitionContext);
-// obsoleteCheckFactHandle =
-// AutoscalerRuleEvaluator.evaluateObsoleteCheck(getObsoleteCheckKnowledgeSession(),
-// obsoleteCheckFactHandle, partitionContext);
-// // start only in the first partition context
-// break;
-// }
-//
-// }
-//
-// }
-// }
-// };
-//
-// monitoringRunnable.run();
-// }
-// }
-//
-// @Override
-// public void destroy() {
-// getMinCheckKnowledgeSession().dispose();
-// getObsoleteCheckKnowledgeSession().dispose();
-// getMinCheckKnowledgeSession().dispose();
-// setDestroyed(true);
-// stopScheduler();
-// if (log.isDebugEnabled()) {
-// log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
-// }
-// }
-//
-// @Override
-// protected void readConfigurations() {
-// XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-// int monitorInterval = conf.getInt(AutoScalerConstants.VMLb_Cluster_MONITOR_INTERVAL, 90000);
-// setMonitorIntervalMilliseconds(monitorInterval);
-// if (log.isDebugEnabled()) {
-// log.debug("VMLbClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
-// }
-// }
-//
-// @Override
-// public void handleClusterRemovedEvent(
-// ClusterRemovedEvent clusterRemovedEvent) {
-//
-// String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
-// String clusterId = clusterRemovedEvent.getClusterId();
-// DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
-// if (depPolicy != null) {
-// List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
-// .getNetworkPartitionLbHolders(depPolicy);
-//
-// for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
-// // removes lb cluster ids
-// boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
-// if (isRemoved) {
-// log.info("Removed the lb cluster [id]:"
-// + clusterId
-// + " reference from Network Partition [id]: "
-// + networkPartitionLbHolder
-// .getNetworkPartitionId());
-//
-// }
-// if (log.isDebugEnabled()) {
-// log.debug(networkPartitionLbHolder);
-// }
-//
-// }
-// }
-// }
-//
-// @Override
-// public String toString() {
-// return "VMLbClusterMonitor [clusterId=" + getClusterId() + "]";
-// }
-//
-// @Override
-// public void onChildScalingEvent(MonitorScalingEvent scalingEvent) {
-//
-// }
-//
-// @Override
-// public void onParentScalingEvent(MonitorScalingEvent scalingEvent) {
-//
-// }
-//}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
index 668aac4..957e0d3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java
@@ -33,7 +33,7 @@ import org.apache.stratos.autoscaler.exception.application.DependencyBuilderExce
import org.apache.stratos.autoscaler.exception.application.MonitorNotFoundException;
import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
import org.apache.stratos.autoscaler.monitor.Monitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.monitor.events.GroupStatusEvent;
import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
@@ -327,7 +327,7 @@ public class GroupMonitor extends ParentComponentMonitor {
Monitor monitor = aliasToActiveMonitorsMap.get(
scalingDependentListComponent);
if (monitor instanceof GroupMonitor ||
- monitor instanceof VMClusterMonitor) {
+ monitor instanceof ClusterMonitor) {
ScalingEvent childScalingEvent = new ScalingEvent(monitor.getId(),
monitor.getId(),
scalingEvent.getInstanceId(),
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index d927223..6b635f6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -30,7 +30,6 @@ import org.apache.stratos.autoscaler.applications.dependency.DependencyTree;
import org.apache.stratos.autoscaler.applications.dependency.context.ApplicationChildContext;
import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
import org.apache.stratos.autoscaler.context.InstanceContext;
-import org.apache.stratos.autoscaler.context.partition.network.ApplicationLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.context.partition.network.NetworkPartitionContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
@@ -41,7 +40,7 @@ import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
import org.apache.stratos.autoscaler.monitor.Monitor;
import org.apache.stratos.autoscaler.monitor.MonitorFactory;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.autoscaler.monitor.events.ScalingEvent;
import org.apache.stratos.autoscaler.monitor.events.ScalingOverMaxEvent;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
@@ -611,7 +610,7 @@ public abstract class ParentComponentMonitor extends Monitor implements Runnable
Monitor monitor = aliasToActiveMonitorsMap.get(
scalingDependentListComponent);
if (monitor instanceof GroupMonitor ||
- monitor instanceof VMClusterMonitor) {
+ monitor instanceof ClusterMonitor) {
ScalingEvent scalingEvent = new ScalingEvent(monitor.getId(),
networkPartitionContext.getId(),
instanceContext.getId(),
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
index 512ed41..1374358 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
@@ -47,65 +47,21 @@ public class AutoscalerRuleEvaluator {
private static final Log log = LogFactory.getLog(AutoscalerRuleEvaluator.class);
-// //vm drool files as default
-// private String minCheckDroolFileName = StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE;
-// private String obsoleteCheckDroolFileName = StratosConstants.CONTAINER_OBSOLETE_CHECK_DROOL_FILE;
-// private String scaleCheckDroolFileName = StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE;
-//
-// private static KnowledgeBase minCheckKbase;
-// private static KnowledgeBase obsoleteCheckKbase;
-// private static KnowledgeBase scaleCheckKbase;
-
private static Map<String, KnowledgeBase> knowledgeBases;
public AutoscalerRuleEvaluator(){
-
knowledgeBases = new HashMap<String, KnowledgeBase>();
-// if (minCheckDroolFileName != null && !minCheckDroolFileName.isEmpty()) {
-// this.minCheckDroolFileName = minCheckDroolFileName;
-// }
-//
-// if (obsoleteCheckDroolFileName != null && !obsoleteCheckDroolFileName.isEmpty()) {
-// this.obsoleteCheckDroolFileName = obsoleteCheckDroolFileName;
-// }
-//
-// if (scaleCheckDroolFileName != null && !scaleCheckDroolFileName.isEmpty()) {
-// this.scaleCheckDroolFileName = scaleCheckDroolFileName;
-// }
-//
-// minCheckKbase = readKnowledgeBase(this.minCheckDroolFileName);
-//
-// if (log.isDebugEnabled()) {
-// log.debug("Minimum check rule is parsed successfully : " + this.minCheckDroolFileName);
-// }
-//
-// obsoleteCheckKbase = readKnowledgeBase(this.obsoleteCheckDroolFileName);
-//
-// if (log.isDebugEnabled()) {
-// log.debug("Obsolete check rule is parsed successfully : " + this.obsoleteCheckDroolFileName);
-// }
-//
-// scaleCheckKbase = readKnowledgeBase(this.scaleCheckDroolFileName);
-//
-// if (log.isDebugEnabled()) {
-// log.debug("Scale check rule is parsed successfully : " + this.scaleCheckDroolFileName);
-// }
-//
}
public void parseAndBuildKnowledgeBaseForDroolsFile(String drlFileName){
-
knowledgeBases.put(drlFileName, readKnowledgeBase(drlFileName));
if (log.isDebugEnabled()) {
log.debug("Drools file is parsed successfully : " + drlFileName);
}
-
}
public static FactHandle evaluate(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
-
-
if (handle == null) {
ksession.setGlobal("delegator", new RuleTasksDelegator());
handle = ksession.insert(obj);
@@ -118,82 +74,6 @@ public class AutoscalerRuleEvaluator {
}
return handle;
}
-//
-// public static FactHandle evaluateObsoleteCheck(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
-// if (handle == null) {
-// ksession.setGlobal("delegator", new RuleTasksDelegator());
-// handle = ksession.insert(obj);
-// } else {
-// ksession.update(handle, obj);
-// }
-// ksession.fireAllRules();
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Obsolete check executed for : %s ", obj));
-// }
-// return handle;
-// }
-//
-// public static FactHandle evaluateScaleCheck(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
-// if (handle == null) {
-// ksession.setGlobal("delegator", new RuleTasksDelegator());
-// handle = ksession.insert(obj);
-// } else {
-// ksession.update(handle, obj);
-// }
-// ksession.fireAllRules();
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Scale check executed for : %s ", obj));
-// }
-// return handle;
-// }
-//
-// public static FactHandle evaluateDependentScaleCheck(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
-// if (handle == null) {
-// ksession.setGlobal("delegator", new RuleTasksDelegator());
-// handle = ksession.insert(obj);
-// } else {
-// ksession.update(handle, obj);
-// }
-// ksession.fireAllRules();
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Dependent scale check executed for : %s ", obj));
-// }
-// return handle;
-// }
-//
-// public static FactHandle evaluateTerminateAll(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
-// if (handle == null) {
-// ksession.setGlobal("delegator", new RuleTasksDelegator());
-// handle = ksession.insert(obj);
-// } else {
-// ksession.update(handle, obj);
-// }
-// ksession.fireAllRules();
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Terminate all check executed for : %s ", obj));
-// }
-// return handle;
-// }
-//
-// public static FactHandle evaluateTerminateDependency(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Terminate dependency check executing for : %s ", obj));
-// }
-// if (handle == null) {
-// ksession.setGlobal("delegator", new RuleTasksDelegator());
-// handle = ksession.insert(obj);
-// } else {
-// ksession.update(handle, obj);
-// }
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Terminate dependency check firing rules for : %s ", ksession));
-// }
-// ksession.fireAllRules();
-// if(log.isDebugEnabled()){
-// log.debug(String.format("Terminate dependency check executed for : %s ", obj));
-// }
-// return handle;
-// }
public StatefulKnowledgeSession getStatefulSession(String drlFileName) {
StatefulKnowledgeSession ksession;
@@ -201,48 +81,6 @@ public class AutoscalerRuleEvaluator {
ksession.setGlobal("log", RuleLog.getInstance());
return ksession;
}
-
-// public StatefulKnowledgeSession getObsoleteCheckStatefulSession() {
-// StatefulKnowledgeSession ksession;
-// ksession = obsoleteCheckKbase.newStatefulKnowledgeSession();
-// ksession.setGlobal("log", RuleLog.getInstance());
-// return ksession;
-// }
-//
-// public StatefulKnowledgeSession getScaleCheckStatefulSession() {
-// StatefulKnowledgeSession ksession;
-// ksession = scaleCheckKbase.newStatefulKnowledgeSession();
-// ksession.setGlobal("log", RuleLog.getInstance());
-// return ksession;
-// }
-//
-// public StatefulKnowledgeSession getTerminateAllStatefulSession() {
-// StatefulKnowledgeSession ksession;
-// ksession = scaleCheckKbase.newStatefulKnowledgeSession();
-// ksession.setGlobal("log", RuleLog.getInstance());
-// return ksession;
-// }
-
-// public static String getLbClusterId(ClusterLevelPartitionContext clusterMonitorPartitionContext, String nwpartitionId) {
-// Properties props = clusterMonitorPartitionContext.getProperties();
-// String value =
-// (String) props.get(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF);
-//
-// if (value == null){
-// return null;
-// }
-//
-// String lbClusterId = null;
-//
-// NetworkPartitionLbHolder networkPartitionLbHolder = PartitionManager.getInstance().getNetworkPartitionLbHolder(nwpartitionId);
-// if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
-// lbClusterId = networkPartitionLbHolder.getDefaultLbClusterId();
-// } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
-// String serviceName = clusterMonitorPartitionContext.getServiceName();
-// lbClusterId = networkPartitionLbHolder.getLBClusterIdOfService(serviceName);
-// }
-// return lbClusterId;
-// }
private static KnowledgeBase readKnowledgeBase(String drlFileName) {
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
[5/5] stratos git commit: Removed kubernetes cluster
monitors/contexts and renamed vm cluster monitor to cluster monitor
Posted by im...@apache.org.
Removed kubernetes cluster monitors/contexts and renamed vm cluster monitor to cluster monitor
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d9c323a2
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d9c323a2
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d9c323a2
Branch: refs/heads/master
Commit: d9c323a2c3c1fc1a3eefe8d9c65be5aad33dc909
Parents: 2578fda
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Dec 19 16:53:17 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Dec 19 16:53:17 2014 +0530
----------------------------------------------------------------------
.../client/CloudControllerClient.java | 10 +-
.../context/cluster/ClusterContext.java | 351 +++++
.../context/cluster/ClusterContextFactory.java | 264 +---
.../cluster/KubernetesClusterContext.java | 771 -----------
.../context/cluster/VMClusterContext.java | 351 -----
.../AutoscalerHealthStatEventReceiver.java | 6 +-
.../AutoscalerTopologyEventReceiver.java | 12 +-
.../autoscaler/monitor/MonitorFactory.java | 4 +-
.../monitor/cluster/ClusterMonitor.java | 1244 +++++++++++++++++
.../monitor/cluster/ClusterMonitorFactory.java | 137 +-
.../cluster/KubernetesClusterMonitor.java | 516 --------
.../KubernetesServiceClusterMonitor.java | 219 ---
.../monitor/cluster/VMClusterMonitor.java | 1246 ------------------
.../monitor/cluster/VMLbClusterMonitor.java | 194 ---
.../monitor/component/GroupMonitor.java | 4 +-
.../component/ParentComponentMonitor.java | 5 +-
.../rule/AutoscalerRuleEvaluator.java | 162 ---
.../autoscaler/rule/RuleTasksDelegator.java | 231 +---
.../cluster/ClusterStatusActiveProcessor.java | 4 +-
.../cluster/ClusterStatusInactiveProcessor.java | 6 +-
.../ClusterStatusTerminatedProcessor.java | 6 +-
21 files changed, 1649 insertions(+), 4094 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/CloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/CloudControllerClient.java
index e902dfb..210f5e1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/CloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/CloudControllerClient.java
@@ -226,16 +226,16 @@ public class CloudControllerClient {
}
}
- public synchronized void terminateAllInstances(String clusterId) throws TerminationException {
+ public synchronized void terminateInstances(String clusterId) throws TerminationException {
try {
if (log.isInfoEnabled()) {
log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId));
}
long startTime = System.currentTimeMillis();
- stub.terminateAllInstances(clusterId);
+ stub.terminateInstances(clusterId);
if (log.isDebugEnabled()) {
long endTime = System.currentTimeMillis();
- log.debug(String.format("Service call terminateAllInstances() returned in %dms", (endTime - startTime)));
+ log.debug(String.format("Service call terminateInstances() returned in %dms", (endTime - startTime)));
}
} catch (RemoteException e) {
String msg = e.getMessage();
@@ -422,13 +422,13 @@ public class CloudControllerClient {
// }
}
- public synchronized void terminateAllContainers(String clusterId) throws TerminationException {
+ public synchronized void terminateContainers(String clusterId) throws TerminationException {
try {
if (log.isInfoEnabled()) {
log.info(String.format("Terminating containers via cloud controller: [cluster] %s", clusterId));
}
long startTime = System.currentTimeMillis();
- stub.terminateAllContainers(clusterId);
+ stub.terminateContainers(clusterId);
if (log.isDebugEnabled()) {
long endTime = System.currentTimeMillis();
log.debug(String.format("Service call terminateContainer() returned in %dms", (endTime - startTime)));
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContext.java
new file mode 100644
index 0000000..b680fd1
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContext.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.context.cluster;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
+import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
+import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
+import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
+import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.pojo.policy.autoscale.AutoscalePolicy;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.ChildPolicy;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelNetworkPartition;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelPartition;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.Partition;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
+import org.apache.stratos.messaging.domain.applications.Application;
+import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+
+/*
+ * It holds the runtime data of a VM cluster
+ */
+public class ClusterContext extends AbstractClusterContext {
+
+ private static final long serialVersionUID = 17570842529682141L;
+
+ private static final Log log = LogFactory.getLog(ClusterContext.class);
+
+ // Map<NetworkpartitionId, Network Partition Context>
+ protected Map<String, ClusterLevelNetworkPartitionContext> networkPartitionCtxts;
+
+ protected DeploymentPolicy deploymentPolicy;
+ protected AutoscalePolicy autoscalePolicy;
+
+ public ClusterContext(String clusterId, String serviceId, AutoscalePolicy autoscalePolicy,
+ DeploymentPolicy deploymentPolicy, boolean hasScalingDependents) {
+
+ super(clusterId, serviceId);
+ this.deploymentPolicy = deploymentPolicy;
+ this.networkPartitionCtxts = new ConcurrentHashMap<String, ClusterLevelNetworkPartitionContext>();
+ this.autoscalePolicy = autoscalePolicy;
+
+ }
+
+ public Map<String, ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() {
+ return networkPartitionCtxts;
+ }
+
+ public DeploymentPolicy getDeploymentPolicy() {
+ return deploymentPolicy;
+ }
+
+ public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
+ this.deploymentPolicy = deploymentPolicy;
+ }
+
+ public AutoscalePolicy getAutoscalePolicy() {
+ return autoscalePolicy;
+ }
+
+ public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
+ this.autoscalePolicy = autoscalePolicy;
+ }
+
+ public ClusterLevelNetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
+ return networkPartitionCtxts.get(networkPartitionId);
+ }
+
+ public void setPartitionCtxt(Map<String, ClusterLevelNetworkPartitionContext> partitionCtxt) {
+ this.networkPartitionCtxts = partitionCtxt;
+ }
+
+ public boolean partitionCtxtAvailable(String partitionId) {
+ return networkPartitionCtxts.containsKey(partitionId);
+ }
+
+ public void addNetworkPartitionCtxt(ClusterLevelNetworkPartitionContext ctxt) {
+ this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
+ }
+
+ public ClusterLevelNetworkPartitionContext getPartitionCtxt(String id) {
+ return this.networkPartitionCtxts.get(id);
+ }
+
+ public ClusterLevelNetworkPartitionContext getNetworkPartitionCtxt(Member member) {
+ log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
+ String networkPartitionId = member.getNetworkPartitionId();
+ if (networkPartitionCtxts.containsKey(networkPartitionId)) {
+ log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
+ return networkPartitionCtxts.get(networkPartitionId);
+ }
+
+ log.info("returning null getNetworkPartitionCtxt");
+ return null;
+ }
+
+ public void addInstanceContext(String instanceId, Cluster cluster, boolean hasScalingDependents,
+ boolean groupScalingEnabledSubtree)
+ throws PolicyValidationException, PartitionValidationException {
+ ClusterLevelNetworkPartitionContext networkPartitionContext = null;
+ ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId);
+ ChildPolicy policy = this.deploymentPolicy.
+ getChildPolicy(
+ AutoscalerUtil.getAliasFromClusterId(clusterId));
+ if (networkPartitionCtxts.containsKey(clusterInstance.getNetworkPartitionId())) {
+ networkPartitionContext = this.networkPartitionCtxts.get(
+ clusterInstance.getNetworkPartitionId());
+ } else {
+ if (policy != null) {
+ ChildLevelNetworkPartition networkPartition = policy.
+ getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());
+ networkPartitionContext = new ClusterLevelNetworkPartitionContext(networkPartition.getId(),
+ networkPartition.getPartitionAlgo(), 0);
+ } else {
+ //Parent should have the partition specified
+ networkPartitionContext = new ClusterLevelNetworkPartitionContext(
+ clusterInstance.getNetworkPartitionId());
+ }
+
+ }
+
+ if (clusterInstance.getPartitionId() != null) {
+ //Need to add partition Context based on the given one from the parent
+ networkPartitionContext = addPartition(clusterInstance, cluster,
+ networkPartitionContext, null, hasScalingDependents, groupScalingEnabledSubtree);
+ } else {
+ networkPartitionContext = parseDeploymentPolicy(clusterInstance, cluster,
+ policy, networkPartitionContext, hasScalingDependents, groupScalingEnabledSubtree);
+ }
+ if (!networkPartitionCtxts.containsKey(clusterInstance.getNetworkPartitionId())) {
+ this.networkPartitionCtxts.put(clusterInstance.getNetworkPartitionId(),
+ networkPartitionContext);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster instance context has been added to network partition, [cluster instance]" +
+ " %s [network partition] %s", clusterInstance.getInstanceId(),
+ clusterInstance.getNetworkPartitionId()));
+ }
+ }
+
+ }
+
+ private ClusterLevelNetworkPartitionContext parseDeploymentPolicy(
+ ClusterInstance clusterInstance,
+ Cluster cluster,
+ ChildPolicy childPolicy,
+ ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext,
+ boolean hasGroupScalingDependent, boolean groupScalingEnabledSubtree)
+ throws PolicyValidationException, PartitionValidationException {
+
+ if (childPolicy == null) {
+ String msg = "Deployment policy is null";
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Child policy alias: " + childPolicy.getAlias());
+ }
+
+ ChildLevelPartition[] childLevelPartitions = childPolicy.
+ getChildLevelNetworkPartition(
+ clusterLevelNetworkPartitionContext.getId()).
+ getChildLevelPartitions();
+ if (childLevelPartitions == null) {
+ String msg = "Partitions are null in child policy: [alias]: " +
+ childPolicy.getAlias();
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ //Retrieving the ChildLevelNetworkPartition and create NP Context
+ ChildLevelNetworkPartition networkPartition;
+ networkPartition = childPolicy.
+ getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());
+
+ //Fill cluster instance context with child level partitions
+ for (ChildLevelPartition childLevelPartition : networkPartition.getChildLevelPartitions()) {
+ addPartition(clusterInstance, cluster, clusterLevelNetworkPartitionContext, childLevelPartition,
+ hasGroupScalingDependent, groupScalingEnabledSubtree);
+ }
+ return clusterLevelNetworkPartitionContext;
+ }
+
+ private ClusterLevelNetworkPartitionContext addPartition(
+ ClusterInstance clusterInstance,
+ Cluster cluster,
+ ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext,
+ ChildLevelPartition childLevelPartition,
+ boolean hasScalingDependents, boolean groupScalingEnabledSubtree)
+ throws PolicyValidationException, PartitionValidationException {
+ if (clusterLevelNetworkPartitionContext == null) {
+ String msg =
+ "Network Partition is null in deployment policy: [application-id]: " +
+ deploymentPolicy.getApplicationId();
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ String nPartitionId = clusterLevelNetworkPartitionContext.getId();
+
+ //Getting the associated partition
+ if (clusterInstance.getPartitionId() == null && childLevelPartition == null) {
+ String msg =
+ "[Partition] " + clusterInstance.getPartitionId() + " for [networkPartition] " +
+ clusterInstance.getNetworkPartitionId() + "is null " +
+ "in deployment policy: [application-id]: " + deploymentPolicy.getApplicationId();
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ ClusterInstanceContext clusterInstanceContext = (ClusterInstanceContext) clusterLevelNetworkPartitionContext.
+ getInstanceContext(clusterInstance.getInstanceId());
+ int maxInstances = 1;
+ if (clusterInstanceContext == null) {
+ int minInstances = 1;
+ try {
+ ApplicationHolder.acquireReadLock();
+ Application application = ApplicationHolder.getApplications().
+ getApplication(cluster.getAppId());
+ ClusterDataHolder dataHolder = application.
+ getClusterDataHolderRecursivelyByAlias(
+ AutoscalerUtil.getAliasFromClusterId(clusterId));
+ minInstances = dataHolder.getMinInstances();
+ maxInstances = dataHolder.getMaxInstances();
+ } finally {
+ ApplicationHolder.releaseReadLock();
+ }
+ clusterInstanceContext = new ClusterInstanceContext(clusterInstance.getInstanceId(),
+ clusterLevelNetworkPartitionContext.getPartitionAlgorithm(),
+ minInstances, maxInstances, nPartitionId, clusterId, hasScalingDependents, groupScalingEnabledSubtree);
+ }
+ String partitionId;
+ if (childLevelPartition != null) {
+ //use it own defined partition
+ partitionId = childLevelPartition.getPartitionId();
+ maxInstances = childLevelPartition.getMax();
+ } else {
+ //handling the partition given by the parent
+ partitionId = clusterInstance.getPartitionId();
+ }
+ //Retrieving the actual partition from application
+ Partition appPartition = deploymentPolicy.getApplicationLevelNetworkPartition(nPartitionId).
+ getPartition(partitionId);
+ org.apache.stratos.cloud.controller.stub.domain.Partition partition =
+ convertTOCCPartition(appPartition);
+
+ //Validate the partition
+ //TODO validate partition removal
+ //CloudControllerClient.getInstance().validatePartition(partition);
+
+ //Creating cluster level partition context
+ ClusterLevelPartitionContext clusterLevelPartitionContext = new ClusterLevelPartitionContext(
+ maxInstances,
+ partition,
+ clusterInstance.getNetworkPartitionId(), clusterId);
+ clusterLevelPartitionContext.setServiceName(cluster.getServiceName());
+ clusterLevelPartitionContext.setProperties(cluster.getProperties());
+
+ //add members to partition Context
+ addMembersFromTopology(cluster, partition, clusterLevelPartitionContext);
+
+ //adding it to the monitors context
+ clusterInstanceContext.addPartitionCtxt(clusterLevelPartitionContext);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Partition context has been added: [partition] %s",
+ clusterLevelPartitionContext.getPartitionId()));
+ }
+
+ clusterLevelNetworkPartitionContext.addInstanceContext(clusterInstanceContext);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Cluster Instance context has been added: " +
+ "[ClusterInstanceContext] %s", clusterInstanceContext.getId()));
+ }
+
+
+ return clusterLevelNetworkPartitionContext;
+ }
+
+ private void addMembersFromTopology(Cluster cluster,
+ org.apache.stratos.cloud.controller.stub.domain.Partition partition,
+ ClusterLevelPartitionContext clusterLevelPartitionContext) {
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
+ MemberContext memberContext = new MemberContext();
+ memberContext.setClusterId(member.getClusterId());
+ memberContext.setMemberId(memberId);
+ memberContext.setInitTime(member.getInitTime());
+ memberContext.setPartition(partition);
+ memberContext.setProperties(AutoscalerUtil.toStubProperties(member.getProperties()));
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ clusterLevelPartitionContext.addActiveMember(memberContext);
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member read from topology and added to active member list: %s", member.toString());
+ log.debug(msg);
+ }
+ } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
+ clusterLevelPartitionContext.addPendingMember(memberContext);
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member read from topology and added to pending member list: %s", member.toString());
+ log.debug(msg);
+ }
+ }
+ clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member-id] %s", memberId));
+ }
+ }
+ }
+ }
+
+ private org.apache.stratos.cloud.controller.stub.domain.Partition convertTOCCPartition(Partition partition) {
+ org.apache.stratos.cloud.controller.stub.domain.Partition partition1 = new
+ org.apache.stratos.cloud.controller.stub.domain.Partition();
+
+ partition1.setId(partition.getId());
+ partition1.setProvider(partition.getProvider());
+ partition1.setProperties(AutoscalerUtil.toStubProperties(partition.getProperties()));
+
+ return partition1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java
index 0a7cbf5..2fceb02 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java
@@ -21,24 +21,14 @@ package org.apache.stratos.autoscaler.context.cluster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.ApplicationHolder;
-import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-//import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.PartitionManager;
import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
import org.apache.stratos.autoscaler.pojo.policy.autoscale.AutoscalePolicy;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.ChildPolicy;
import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
-import org.apache.stratos.autoscaler.util.AutoscalerUtil;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
import org.apache.stratos.common.Properties;
import org.apache.stratos.common.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
import java.util.Map;
@@ -48,7 +38,7 @@ public class ClusterContextFactory {
private static final Log log = LogFactory.getLog(ClusterContextFactory.class);
- public static VMClusterContext getVMClusterContext(String instanceId, Cluster cluster, boolean hasScalingDependents)
+ public static ClusterContext getVMClusterContext(String instanceId, Cluster cluster, boolean hasScalingDependents)
throws PolicyValidationException, PartitionValidationException {
if (null == cluster) {
@@ -66,260 +56,10 @@ public class ClusterContextFactory {
deploymentPolicy = PolicyManager.getInstance().
getDeploymentPolicyByApplication(cluster.getAppId());
- return new VMClusterContext(cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy,
+ return new ClusterContext(cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy,
deploymentPolicy, hasScalingDependents);
}
- /* public static VMClusterContext getVMLBClusterContext(Cluster cluster) throws PolicyValidationException {
-
- // FIXME fix the following code to correctly update
- // AutoscalerContext context = AutoscalerContext.getInstance();
- if (null == cluster) {
- return null;
- }
-
- String autoscalePolicyName = cluster.getAutoscalePolicyName();
- String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
- if (log.isDebugEnabled()) {
- log.debug("Deployment policy name: " + deploymentPolicyName);
- log.debug("Autoscaler policy name: " + autoscalePolicyName);
- }
-
- AutoscalePolicy autoscalePolicy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
- DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
-
- if (deploymentPolicy == null) {
- String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- String clusterId = cluster.getClusterId();
-
- Map<String, ClusterLevelNetworkPartitionContext> networkPartitionContextMap = new HashMap<String, ClusterLevelNetworkPartitionContext>();
-
- // partition group = network partition context
- for (ChildLevelNetworkPartition networkPartition : deploymentPolicy.getChildLevelNetworkPartitions()) {
-
- String networkPartitionId = networkPartition.getApplicationId();
- // NetworkPartitionLbHolder networkPartitionLbHolder =
- // PartitionManager.getInstance()
- // .getNetworkPartitionLbHolder(networkPartitionId);
- // PartitionManager.getInstance()
- // .getNetworkPartitionLbHolder(partitionGroup.getPartitionId());
- // FIXME pick a random partition
- Partition partition =
- networkPartition.getPartitions()[new Random().nextInt(networkPartition.getPartitions().length)];
- ClusterLevelPartitionContext clusterMonitorPartitionContext = new ClusterLevelPartitionContext(partition);
- clusterMonitorPartitionContext.setServiceName(cluster.getServiceName());
- clusterMonitorPartitionContext.setProperties(cluster.getProperties());
- clusterMonitorPartitionContext.setNetworkPartitionId(networkPartitionId);
- clusterMonitorPartitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
-
- ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = new ClusterLevelNetworkPartitionContext(networkPartitionId,
- networkPartition.getPartitionAlgo(),
- networkPartition.getPartitions());
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- if (member.getNetworkPartitionId().equalsIgnoreCase(clusterLevelNetworkPartitionContext.getApplicationId())) {
- MemberContext memberContext = new MemberContext();
- memberContext.setClusterId(member.getClusterId());
- memberContext.setMemberId(memberId);
- memberContext.setPartition(partition);
- memberContext.setInitTime(member.getInitTime());
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
- log.debug(msg);
- }
- clusterMonitorPartitionContext.addActiveMember(memberContext);
- // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- // partitionContext.incrementCurrentActiveMemberCount(1);
- } else if (MemberStatus.Created.equals(member.getStatus()) ||
- MemberStatus.Starting.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
- log.debug(msg);
- }
- clusterMonitorPartitionContext.addPendingMember(memberContext);
- // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- } else if (MemberStatus.Suspended.equals(member.getStatus())) {
- // partitionContext.addFaultyMember(memberId);
- }
-
- clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member] %s", memberId));
- }
- }
-
- }
- clusterLevelNetworkPartitionContext.addPartitionContext(clusterMonitorPartitionContext);
-
-
- // // populate lb cluster id in network partition context.
- // java.util.Properties props = cluster.getProperties();
- //
- // // get service type of load balanced cluster
- // String loadBalancedServiceType = props.getProperty(StratosConstants.LOAD_BALANCED_SERVICE_TYPE);
- //
- // if (props.containsKey(StratosConstants.LOAD_BALANCER_REF)) {
- // String value = props.getProperty(StratosConstants.LOAD_BALANCER_REF);
- //
- // if (value.equals(StratosConstants.DEFAULT_LOAD_BALANCER)) {
- // networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
- //
- // } else if (value.equals(StratosConstants.SERVICE_AWARE_LOAD_BALANCER)) {
- // String serviceName = cluster.getServiceName();
- // // TODO: check if this is correct
- // networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
- //
- // if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
- // networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
- // if (log.isDebugEnabled()) {
- // log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
- // }
- // }
- // }
- // }
-
- // // populate lb cluster id in network partition context.
- // java.util.Properties props = cluster.getProperties();
- //
- // // get service type of load balanced cluster
- // String loadBalancedServiceType = props.getProperty(org.apache.stratos.messaging.util.Constants.LOAD_BALANCED_SERVICE_TYPE);
- //
- // if (props.containsKey(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF)) {
- // String value = props.getProperty(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF);
- //
- // if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
- // networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
- //
- // } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
- // String serviceName = cluster.getServiceName();
- // // TODO: check if this is correct
- // networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
- //
- // if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
- // networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
- // if (log.isDebugEnabled()) {
- // log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
- // }
- // }
- // }
- // }
-
-
- networkPartitionContextMap.put(networkPartitionId, clusterLevelNetworkPartitionContext);
- }
-
- return new VMClusterContext(clusterId, cluster.getServiceName(), autoscalePolicy,
- deploymentPolicy, networkPartitionContextMap);
- }
- */
- public static KubernetesClusterContext getKubernetesClusterContext(String instanceId,
- Cluster cluster)
- throws PolicyValidationException {
-
- if (null == cluster) {
- return null;
- }
-
- String autoscalePolicyName = cluster.getAutoscalePolicyName();
-
- AutoscalePolicy autoscalePolicy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
- if (log.isDebugEnabled()) {
- log.debug("Autoscaling policy name: " + autoscalePolicyName);
- }
-
- AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-
- if (policy == null) {
- String msg = String.format("Autoscaling policy is null: [policy-name] %s", autoscalePolicyName);
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- java.util.Properties properties = cluster.getProperties();
- if (properties == null) {
- String message = String.format("Properties not found in kubernetes cluster: [cluster-id] %s",
- cluster.getClusterId());
- log.error(message);
- throw new RuntimeException(message);
- }
- String minReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
- int minReplicas = 0;
- if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
- minReplicas = Integer.parseInt(minReplicasProperty);
- }
-
- int maxReplicas = 0;
- String maxReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
- if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
- maxReplicas = Integer.parseInt(maxReplicasProperty);
- }
-
- String kubernetesHostClusterID = properties.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
- KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
- cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, minReplicas, maxReplicas);
-
- //populate the members after restarting
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- String clusterId = member.getClusterId();
- MemberContext memberContext = new MemberContext();
- memberContext.setMemberId(memberId);
- memberContext.setClusterId(clusterId);
- memberContext.setInitTime(member.getInitTime());
-
- // if there is at least one member in the topology, that means service has been created already
- // this is to avoid calling startContainer() method again
- kubernetesClusterCtxt.setServiceClusterCreated(true);
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
- log.debug(msg);
- }
- //dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
- } else if (MemberStatus.Created.equals(member.getStatus())
- || MemberStatus.Starting.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
- log.debug(msg);
- }
- //dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
- }
-
- kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
- kubernetesClusterCtxt.setInstanceId(instanceId);
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member] %s", memberId));
- }
- }
-
- // find lb reference type
-
-// if (properties.containsKey(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF)) {
-// String value = properties.getProperty(Constants.LOAD_BALANCER_REF);
-// //dockerClusterMonitor.setLbReferenceType(value);
-// if (log.isDebugEnabled()) {
-// log.debug("Set the lb reference type: " + value);
-// }
-// }
-
-
- return kubernetesClusterCtxt;
- }
-
private static Properties convertMemberPropsToMemberContextProps(
java.util.Properties properties) {
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java
deleted file mode 100644
index 44517dd..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java
+++ /dev/null
@@ -1,771 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.context.cluster;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
-import org.apache.stratos.autoscaler.pojo.policy.autoscale.AutoscalePolicy;
-import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage;
-import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption;
-import org.apache.stratos.autoscaler.pojo.policy.autoscale.RequestsInFlight;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.constants.StratosConstants;
-
-/*
- * It holds the runtime data of a kubernetes service cluster
- */
-public class KubernetesClusterContext extends AbstractClusterContext {
-
- private static final long serialVersionUID = 808741789615481596L;
- private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
-
- private String instanceId;
-
- private String kubernetesClusterId;
- private String serviceName;
-
- private int minReplicas;
- private int maxReplicas;
- private int currentReplicas;
- private float RequiredReplicas;
-
- private AutoscalePolicy autoscalePolicy;
-
- // it will tell whether the startContainers() method succeed or not for the 1st time
- // we should call startContainers() only once
- private boolean isServiceClusterCreated = false;
-
- // properties
- private Properties properties;
-
- // 15 mints as the default
- private long pendingMemberExpiryTime;
- // pending members
- private List<MemberContext> pendingMembers;
-
- // active members
- private List<MemberContext> activeMembers;
-
- // 1 day as default
- private long obsoltedMemberExpiryTime = 1*24*60*60*1000;
-
- // members to be terminated
- private Map<String, MemberContext> obsoletedMembers;
-
- // termination pending members, member is added to this when Autoscaler send grace fully shut down event
- private List<MemberContext> terminationPendingMembers;
-
- //Keep statistics come from CEP
- private Map<String, MemberStatsContext> memberStatsContexts;
-
- //Following information will keep events details
- private RequestsInFlight requestsInFlight;
- private MemoryConsumption memoryConsumption;
- private LoadAverage loadAverage;
-
- //boolean values to keep whether the requests in flight parameters are reset or not
- private boolean rifReset = false, averageRifReset = false,
- gradientRifReset = false, secondDerivativeRifRest = false;
- //boolean values to keep whether the memory consumption parameters are reset or not
- private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false,
- gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false;
- //boolean values to keep whether the load average parameters are reset or not
- private boolean loadAverageReset = false, averageLoadAverageReset = false,
- gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false;
-
- public KubernetesClusterContext(String kubernetesClusterId, String clusterId, String serviceId, AutoscalePolicy autoscalePolicy,
- int minCount, int maxCount) {
-
- super(clusterId, serviceId);
- this.kubernetesClusterId = kubernetesClusterId;
- this.minReplicas = minCount;
- this.maxReplicas = maxCount;
- this.pendingMembers = new ArrayList<MemberContext>();
- this.activeMembers = new ArrayList<MemberContext>();
- this.terminationPendingMembers = new ArrayList<MemberContext>();
- this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>();
- this.memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>();
- this.requestsInFlight = new RequestsInFlight();
- this.loadAverage = new LoadAverage();
- this.memoryConsumption = new MemoryConsumption();
- this.autoscalePolicy = autoscalePolicy;
-
- // check if a different value has been set for expiryTime
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- pendingMemberExpiryTime = conf.getLong(StratosConstants.PENDING_CONTAINER_MEMBER_EXPIRY_TIMEOUT, 300000);
- obsoltedMemberExpiryTime = conf.getLong(StratosConstants.OBSOLETED_CONTAINER_MEMBER_EXPIRY_TIMEOUT, 3600000);
- if (log.isDebugEnabled()) {
- log.debug("Member expiry time is set to: " + pendingMemberExpiryTime);
- log.debug("Member obsoleted expiry time is set to: " + obsoltedMemberExpiryTime);
- }
-
- Thread th = new Thread(new PendingMemberWatcher(this));
- th.start();
- Thread th2 = new Thread(new ObsoletedMemberWatcher(this));
- th2.start();
- }
-
- public String getKubernetesClusterID() {
- return kubernetesClusterId;
- }
-
- public void setKubernetesClusterID(String kubernetesClusterId) {
- this.kubernetesClusterId = kubernetesClusterId;
- }
-
- public List<MemberContext> getPendingMembers() {
- return pendingMembers;
- }
-
- public void setPendingMembers(List<MemberContext> pendingMembers) {
- this.pendingMembers = pendingMembers;
- }
-
- public int getActiveMemberCount() {
- return activeMembers.size();
- }
-
- public void setActiveMembers(List<MemberContext> activeMembers) {
- this.activeMembers = activeMembers;
- }
-
- public int getMinReplicas() {
- return minReplicas;
- }
-
- public void setMinReplicas(int minReplicas) {
- this.minReplicas = minReplicas;
- }
-
- public int getMaxReplicas() {
- return maxReplicas;
- }
-
- public void setMaxReplicas(int maxReplicas) {
- this.maxReplicas = maxReplicas;
- }
-
- public int getCurrentReplicas() {
- return currentReplicas;
- }
-
- public void setCurrentReplicas(int currentReplicas) {
- this.currentReplicas = currentReplicas;
- }
-
- public void addPendingMember(MemberContext ctxt) {
- this.pendingMembers.add(ctxt);
- }
-
- public boolean removePendingMember(String id) {
- if (id == null) {
- return false;
- }
- for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) {
- MemberContext pendingMember = (MemberContext) iterator.next();
- if (id.equals(pendingMember.getMemberId())) {
- iterator.remove();
- return true;
- }
-
- }
-
- return false;
- }
-
- public void movePendingMemberToActiveMembers(String memberId) {
- if (memberId == null) {
- return;
- }
- Iterator<MemberContext> iterator = pendingMembers.listIterator();
- while (iterator.hasNext()) {
- MemberContext pendingMember = iterator.next();
- if (pendingMember == null) {
- iterator.remove();
- continue;
- }
- if (memberId.equals(pendingMember.getMemberId())) {
- // member is activated
- // remove from pending list
- iterator.remove();
- // add to the activated list
- this.activeMembers.add(pendingMember);
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Pending member is removed and added to the "
- + "activated member list. [Member Id] %s",
- memberId));
- }
- break;
- }
- }
- }
-
- public void addActiveMember(MemberContext ctxt) {
- this.activeMembers.add(ctxt);
- }
-
- public void removeActiveMember(MemberContext ctxt) {
- this.activeMembers.remove(ctxt);
- }
-
- public long getPendingMemberExpiryTime() {
- return pendingMemberExpiryTime;
- }
-
- public void setPendingMemberExpiryTime(long pendingMemberExpiryTime) {
- this.pendingMemberExpiryTime = pendingMemberExpiryTime;
- }
-
- public Map<String, MemberStatsContext> getMemberStatsContexts() {
- return memberStatsContexts;
- }
-
- public MemberStatsContext getMemberStatsContext(String memberId) {
- return memberStatsContexts.get(memberId);
- }
-
- public void addMemberStatsContext(MemberStatsContext ctxt) {
- this.memberStatsContexts.put(ctxt.getMemberId(), ctxt);
- }
-
- public void removeMemberStatsContext(String memberId) {
- this.memberStatsContexts.remove(memberId);
- }
-
- public Properties getProperties() {
- return properties;
- }
-
- public void setProperties(Properties properties) {
- this.properties = properties;
- }
-
- public String getServiceName() {
- return serviceName;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
- public List<MemberContext> getActiveMembers() {
- return activeMembers;
- }
-
- public boolean removeActiveMemberById(String memberId) {
- boolean removeActiveMember = false;
- synchronized (activeMembers) {
- Iterator<MemberContext> iterator = activeMembers.listIterator();
- while (iterator.hasNext()) {
- MemberContext memberContext = iterator.next();
- if (memberId.equals(memberContext.getMemberId())) {
- iterator.remove();
- removeActiveMember = true;
-
- break;
- }
- }
- }
- return removeActiveMember;
- }
-
- public boolean activeMemberExist(String memberId) {
-
- for (MemberContext memberContext : activeMembers) {
- if (memberId.equals(memberContext.getMemberId())) {
- return true;
- }
- }
- return false;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return autoscalePolicy;
- }
-
- public float getRequiredReplicas() {
- return RequiredReplicas;
- }
-
- public void setRequiredReplicas(float requiredReplicas) {
- RequiredReplicas = requiredReplicas;
- }
-
- /**
- * Check the member lists for the provided member ID and move the member to the obsolete list
- *
- * @param memberId The member ID of the member to search
- */
- public void moveMemberToObsoleteList(String memberId) {
- if (memberId == null) {
- return;
- }
-
- // check active member list
- Iterator<MemberContext> activeMemberIterator = activeMembers.listIterator();
- MemberContext removedMember = this.removeMemberFrom(activeMemberIterator, memberId);
- if (removedMember != null) {
- this.addObsoleteMember(removedMember);
- removedMember.setObsoleteInitTime(System.currentTimeMillis());
- if (log.isDebugEnabled()) {
- log.debug(String.format("Active member is removed and added to the " +
- "obsolete member list. [Member Id] %s", memberId));
- }
-
- return;
- }
-
- // check pending member list
- Iterator<MemberContext> pendingMemberIterator = pendingMembers.listIterator();
- removedMember = this.removeMemberFrom(pendingMemberIterator, memberId);
- if (removedMember != null) {
- this.addObsoleteMember(removedMember);
- removedMember.setObsoleteInitTime(System.currentTimeMillis());
- if (log.isDebugEnabled()) {
- log.debug(String.format("Pending member is removed and added to the " +
- "obsolete member list. [Member Id] %s", memberId));
- }
-
- return;
- }
-
- // check termination pending member list
- Iterator<MemberContext> terminationPendingMembersIterator = terminationPendingMembers.listIterator();
- removedMember = this.removeMemberFrom(terminationPendingMembersIterator, memberId);
- if (removedMember != null) {
- this.addObsoleteMember(removedMember);
- removedMember.setObsoleteInitTime(System.currentTimeMillis());
- if (log.isDebugEnabled()) {
- log.debug(String.format("Termination Pending member is removed and added to the " +
- "obsolete member list. [Member Id] %s", memberId));
- }
- }
- }
-
- /**
- * Removes the {@link org.apache.stratos.cloud.controller.stub.domain.MemberContext} object mapping
- * to the specified member id from the specified MemberContext collection
- *
- * @param iterator The {@link java.util.Iterator} for the collection containing {@link org.apache.stratos.cloud.controller.stub.domain.MemberContext}
- * objects
- * @param memberId Member Id {@link String} for the {@link org.apache.stratos.cloud.controller.stub.domain.MemberContext}
- * to be removed
- * @return {@link org.apache.stratos.cloud.controller.stub.domain.MemberContext} object if
- * object found and removed, null if otherwise.
- */
- private MemberContext removeMemberFrom(Iterator<MemberContext> iterator, String memberId) {
- while (iterator.hasNext()) {
- MemberContext activeMember = iterator.next();
- if (activeMember == null) {
- iterator.remove();
- continue;
- }
- if (memberId.equals(activeMember.getMemberId())) {
- iterator.remove();
- return activeMember;
- }
- }
-
- return null;
- }
-
- public String getInstanceId() {
- return instanceId;
- }
-
- public void setInstanceId(String instanceId) {
- this.instanceId = instanceId;
- }
-
- private class PendingMemberWatcher implements Runnable {
- private KubernetesClusterContext ctxt;
-
- public PendingMemberWatcher(KubernetesClusterContext ctxt) {
- this.ctxt = ctxt;
- }
-
- @Override
- public void run() {
-
- while (true) {
- long expiryTime = ctxt.getPendingMemberExpiryTime();
- List<MemberContext> pendingMembers = ctxt.getPendingMembers();
-
- synchronized (pendingMembers) {
- Iterator<MemberContext> iterator = pendingMembers
- .listIterator();
- while (iterator.hasNext()) {
- MemberContext pendingMember = iterator.next();
-
- if (pendingMember == null) {
- continue;
- }
- long pendingTime = System.currentTimeMillis()
- - pendingMember.getInitTime();
- if (pendingTime >= expiryTime) {
- iterator.remove();
- log.info("Pending state of member: " + pendingMember.getMemberId() +
- " is expired. " + "Adding as an obsoleted member.");
- ctxt.addObsoleteMember(pendingMember);
- }
- }
- }
- try {
- // TODO find a constant
- Thread.sleep(15000);
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- }
-
- private class ObsoletedMemberWatcher implements Runnable {
- private KubernetesClusterContext ctxt;
-
- public ObsoletedMemberWatcher(KubernetesClusterContext ctxt) {
- this.ctxt = ctxt;
- }
-
- @Override
- public void run() {
- while (true) {
-
- long obsoltedMemberExpiryTime = ctxt.getObsoltedMemberExpiryTime();
- Map<String, MemberContext> obsoletedMembers = ctxt.getObsoletedMembers();
- Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers.entrySet().iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<String, MemberContext> pairs = iterator.next();
- MemberContext obsoleteMember = (MemberContext) pairs.getValue();
- if (obsoleteMember == null) {
- continue;
- }
- long obsoleteTime = System.currentTimeMillis() - obsoleteMember.getInitTime();
- if (obsoleteTime >= obsoltedMemberExpiryTime) {
- iterator.remove();
- log.info("Obsolete state of member: " + obsoleteMember.getMemberId() +
- " is expired. " + "Removing from obsolete member list");
- }
- }
- try {
- // TODO find a constant
- Thread.sleep(15000);
- } catch (InterruptedException ignore) {
- }
- }
- }
- }
-
- public float getAverageRequestsInFlight() {
- return requestsInFlight.getAverage();
- }
-
- public void setAverageRequestsInFlight(float averageRequestsInFlight) {
- requestsInFlight.setAverage(averageRequestsInFlight);
- averageRifReset = true;
- if (secondDerivativeRifRest && gradientRifReset) {
- rifReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Requests in flights stats are reset, "
- + "ready to do scale check [kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getRequestsInFlightSecondDerivative() {
- return requestsInFlight.getSecondDerivative();
- }
-
- public void setRequestsInFlightSecondDerivative(
- float requestsInFlightSecondDerivative) {
- requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative);
- secondDerivativeRifRest = true;
- if (averageRifReset && gradientRifReset) {
- rifReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getRequestsInFlightGradient() {
- return requestsInFlight.getGradient();
- }
-
- public void setRequestsInFlightGradient(float requestsInFlightGradient) {
- requestsInFlight.setGradient(requestsInFlightGradient);
- gradientRifReset = true;
- if (secondDerivativeRifRest && averageRifReset) {
- rifReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Requests in flights stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public boolean isRifReset() {
- return rifReset;
- }
-
- public void setRifReset(boolean rifReset) {
- this.rifReset = rifReset;
- this.averageRifReset = rifReset;
- this.gradientRifReset = rifReset;
- this.secondDerivativeRifRest = rifReset;
- }
-
- public float getAverageMemoryConsumption() {
- return memoryConsumption.getAverage();
- }
-
- public void setAverageMemoryConsumption(float averageMemoryConsumption) {
- memoryConsumption.setAverage(averageMemoryConsumption);
- averageMemoryConsumptionReset = true;
- if (secondDerivativeMemoryConsumptionRest
- && gradientMemoryConsumptionReset) {
- memoryConsumptionReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getMemoryConsumptionSecondDerivative() {
- return memoryConsumption.getSecondDerivative();
- }
-
- public void setMemoryConsumptionSecondDerivative(
- float memoryConsumptionSecondDerivative) {
- memoryConsumption
- .setSecondDerivative(memoryConsumptionSecondDerivative);
- secondDerivativeMemoryConsumptionRest = true;
- if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) {
- memoryConsumptionReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getMemoryConsumptionGradient() {
- return memoryConsumption.getGradient();
- }
-
- public void setMemoryConsumptionGradient(float memoryConsumptionGradient) {
- memoryConsumption.setGradient(memoryConsumptionGradient);
- gradientMemoryConsumptionReset = true;
- if (secondDerivativeMemoryConsumptionRest
- && averageMemoryConsumptionReset) {
- memoryConsumptionReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Memory consumption stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public boolean isMemoryConsumptionReset() {
- return memoryConsumptionReset;
- }
-
- public void setMemoryConsumptionReset(boolean memoryConsumptionReset) {
- this.memoryConsumptionReset = memoryConsumptionReset;
- this.averageMemoryConsumptionReset = memoryConsumptionReset;
- this.gradientMemoryConsumptionReset = memoryConsumptionReset;
- this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset;
- }
-
-
- public float getAverageLoadAverage() {
- return loadAverage.getAverage();
- }
-
- public void setAverageLoadAverage(float averageLoadAverage) {
- loadAverage.setAverage(averageLoadAverage);
- averageLoadAverageReset = true;
- if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) {
- loadAverageReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Load average stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getLoadAverageSecondDerivative() {
- return loadAverage.getSecondDerivative();
- }
-
- public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) {
- loadAverage.setSecondDerivative(loadAverageSecondDerivative);
- secondDerivativeLoadAverageRest = true;
- if (averageLoadAverageReset && gradientLoadAverageReset) {
- loadAverageReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Load average stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public float getLoadAverageGradient() {
- return loadAverage.getGradient();
- }
-
- public void setLoadAverageGradient(float loadAverageGradient) {
- loadAverage.setGradient(loadAverageGradient);
- gradientLoadAverageReset = true;
- if (secondDerivativeLoadAverageRest && averageLoadAverageReset) {
- loadAverageReset = true;
- if (log.isDebugEnabled()) {
- log.debug(String.format("Load average stats are reset, ready to do scale check "
- + "[kub cluster] %s", this.kubernetesClusterId));
- }
- }
- }
-
- public boolean isLoadAverageReset() {
- return loadAverageReset;
- }
-
- public void setLoadAverageReset(boolean loadAverageReset) {
- this.loadAverageReset = loadAverageReset;
- this.averageLoadAverageReset = loadAverageReset;
- this.gradientLoadAverageReset = loadAverageReset;
- this.secondDerivativeLoadAverageRest = loadAverageReset;
- }
-
- public void moveActiveMemberToTerminationPendingMembers(String memberId) {
- if (memberId == null) {
- return;
- }
- Iterator<MemberContext> iterator = activeMembers.listIterator();
- while ( iterator.hasNext()) {
- MemberContext activeMember = iterator.next();
- if(activeMember == null) {
- iterator.remove();
- continue;
- }
- if(memberId.equals(activeMember.getMemberId())){
- // member is activated
- // remove from pending list
- iterator.remove();
- // add to the activated list
- this.terminationPendingMembers.add(activeMember);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Active member is removed and added to the " +
- "termination pending member list. [Member Id] %s", memberId));
- }
- break;
- }
- }
- }
-
- public boolean removeTerminationPendingMember(String memberId) {
- boolean terminationPendingMemberAvailable = false;
- for (MemberContext memberContext: terminationPendingMembers){
- if(memberContext.getMemberId().equals(memberId)){
- terminationPendingMemberAvailable = true;
- terminationPendingMembers.remove(memberContext);
- break;
- }
- }
- return terminationPendingMemberAvailable;
- }
-
- public long getObsoltedMemberExpiryTime() {
- return obsoltedMemberExpiryTime;
- }
-
- public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) {
- this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime;
- }
-
- public void addObsoleteMember(MemberContext ctxt) {
- this.obsoletedMembers.put(ctxt.getMemberId(), ctxt);
- }
-
- public boolean removeObsoleteMember(String memberId) {
- if(this.obsoletedMembers.remove(memberId) == null) {
- return false;
- }
- return true;
- }
-
- public Map<String, MemberContext> getObsoletedMembers() {
- return obsoletedMembers;
- }
-
- public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) {
- this.obsoletedMembers = obsoletedMembers;
- }
-
- public MemberStatsContext getPartitionCtxt(String id) {
- return this.memberStatsContexts.get(id);
- }
-
- public List<MemberContext> getTerminationPendingMembers() {
- return terminationPendingMembers;
- }
-
- public void setTerminationPendingMembers(List<MemberContext> terminationPendingMembers) {
- this.terminationPendingMembers = terminationPendingMembers;
- }
-
- public int getTotalMemberCount() {
- return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size();
- }
-
- public int getNonTerminatedMemberCount() {
- return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size();
- }
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public boolean isServiceClusterCreated() {
- return isServiceClusterCreated;
- }
-
- public void setServiceClusterCreated(boolean isServiceClusterCreated) {
- this.isServiceClusterCreated = isServiceClusterCreated;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
deleted file mode 100644
index b371901..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.context.cluster;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.ApplicationHolder;
-import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
-import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
-import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
-import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.autoscaler.pojo.policy.autoscale.AutoscalePolicy;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.ChildPolicy;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelNetworkPartition;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelPartition;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.Partition;
-import org.apache.stratos.autoscaler.util.AutoscalerUtil;
-import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.messaging.domain.applications.Application;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-
-/*
- * It holds the runtime data of a VM cluster
- */
-public class VMClusterContext extends AbstractClusterContext {
-
- private static final long serialVersionUID = 17570842529682141L;
-
- private static final Log log = LogFactory.getLog(VMClusterContext.class);
-
- // Map<NetworkpartitionId, Network Partition Context>
- protected Map<String, ClusterLevelNetworkPartitionContext> networkPartitionCtxts;
-
- protected DeploymentPolicy deploymentPolicy;
- protected AutoscalePolicy autoscalePolicy;
-
- public VMClusterContext(String clusterId, String serviceId, AutoscalePolicy autoscalePolicy,
- DeploymentPolicy deploymentPolicy, boolean hasScalingDependents) {
-
- super(clusterId, serviceId);
- this.deploymentPolicy = deploymentPolicy;
- this.networkPartitionCtxts = new ConcurrentHashMap<String, ClusterLevelNetworkPartitionContext>();
- this.autoscalePolicy = autoscalePolicy;
-
- }
-
- public Map<String, ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() {
- return networkPartitionCtxts;
- }
-
- public DeploymentPolicy getDeploymentPolicy() {
- return deploymentPolicy;
- }
-
- public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
- this.deploymentPolicy = deploymentPolicy;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return autoscalePolicy;
- }
-
- public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
- this.autoscalePolicy = autoscalePolicy;
- }
-
- public ClusterLevelNetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
- return networkPartitionCtxts.get(networkPartitionId);
- }
-
- public void setPartitionCtxt(Map<String, ClusterLevelNetworkPartitionContext> partitionCtxt) {
- this.networkPartitionCtxts = partitionCtxt;
- }
-
- public boolean partitionCtxtAvailable(String partitionId) {
- return networkPartitionCtxts.containsKey(partitionId);
- }
-
- public void addNetworkPartitionCtxt(ClusterLevelNetworkPartitionContext ctxt) {
- this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
- }
-
- public ClusterLevelNetworkPartitionContext getPartitionCtxt(String id) {
- return this.networkPartitionCtxts.get(id);
- }
-
- public ClusterLevelNetworkPartitionContext getNetworkPartitionCtxt(Member member) {
- log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
- String networkPartitionId = member.getNetworkPartitionId();
- if (networkPartitionCtxts.containsKey(networkPartitionId)) {
- log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
- return networkPartitionCtxts.get(networkPartitionId);
- }
-
- log.info("returning null getNetworkPartitionCtxt");
- return null;
- }
-
- public void addInstanceContext(String instanceId, Cluster cluster, boolean hasScalingDependents,
- boolean groupScalingEnabledSubtree)
- throws PolicyValidationException, PartitionValidationException {
- ClusterLevelNetworkPartitionContext networkPartitionContext = null;
- ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId);
- ChildPolicy policy = this.deploymentPolicy.
- getChildPolicy(
- AutoscalerUtil.getAliasFromClusterId(clusterId));
- if (networkPartitionCtxts.containsKey(clusterInstance.getNetworkPartitionId())) {
- networkPartitionContext = this.networkPartitionCtxts.get(
- clusterInstance.getNetworkPartitionId());
- } else {
- if (policy != null) {
- ChildLevelNetworkPartition networkPartition = policy.
- getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());
- networkPartitionContext = new ClusterLevelNetworkPartitionContext(networkPartition.getId(),
- networkPartition.getPartitionAlgo(), 0);
- } else {
- //Parent should have the partition specified
- networkPartitionContext = new ClusterLevelNetworkPartitionContext(
- clusterInstance.getNetworkPartitionId());
- }
-
- }
-
- if (clusterInstance.getPartitionId() != null) {
- //Need to add partition Context based on the given one from the parent
- networkPartitionContext = addPartition(clusterInstance, cluster,
- networkPartitionContext, null, hasScalingDependents, groupScalingEnabledSubtree);
- } else {
- networkPartitionContext = parseDeploymentPolicy(clusterInstance, cluster,
- policy, networkPartitionContext, hasScalingDependents, groupScalingEnabledSubtree);
- }
- if (!networkPartitionCtxts.containsKey(clusterInstance.getNetworkPartitionId())) {
- this.networkPartitionCtxts.put(clusterInstance.getNetworkPartitionId(),
- networkPartitionContext);
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster instance context has been added to network partition, [cluster instance]" +
- " %s [network partition] %s", clusterInstance.getInstanceId(),
- clusterInstance.getNetworkPartitionId()));
- }
- }
-
- }
-
- private ClusterLevelNetworkPartitionContext parseDeploymentPolicy(
- ClusterInstance clusterInstance,
- Cluster cluster,
- ChildPolicy childPolicy,
- ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext,
- boolean hasGroupScalingDependent, boolean groupScalingEnabledSubtree)
- throws PolicyValidationException, PartitionValidationException {
-
- if (childPolicy == null) {
- String msg = "Deployment policy is null";
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Child policy alias: " + childPolicy.getAlias());
- }
-
- ChildLevelPartition[] childLevelPartitions = childPolicy.
- getChildLevelNetworkPartition(
- clusterLevelNetworkPartitionContext.getId()).
- getChildLevelPartitions();
- if (childLevelPartitions == null) {
- String msg = "Partitions are null in child policy: [alias]: " +
- childPolicy.getAlias();
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- //Retrieving the ChildLevelNetworkPartition and create NP Context
- ChildLevelNetworkPartition networkPartition;
- networkPartition = childPolicy.
- getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId());
-
- //Fill cluster instance context with child level partitions
- for (ChildLevelPartition childLevelPartition : networkPartition.getChildLevelPartitions()) {
- addPartition(clusterInstance, cluster, clusterLevelNetworkPartitionContext, childLevelPartition,
- hasGroupScalingDependent, groupScalingEnabledSubtree);
- }
- return clusterLevelNetworkPartitionContext;
- }
-
- private ClusterLevelNetworkPartitionContext addPartition(
- ClusterInstance clusterInstance,
- Cluster cluster,
- ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext,
- ChildLevelPartition childLevelPartition,
- boolean hasScalingDependents, boolean groupScalingEnabledSubtree)
- throws PolicyValidationException, PartitionValidationException {
- if (clusterLevelNetworkPartitionContext == null) {
- String msg =
- "Network Partition is null in deployment policy: [application-id]: " +
- deploymentPolicy.getApplicationId();
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- String nPartitionId = clusterLevelNetworkPartitionContext.getId();
-
- //Getting the associated partition
- if (clusterInstance.getPartitionId() == null && childLevelPartition == null) {
- String msg =
- "[Partition] " + clusterInstance.getPartitionId() + " for [networkPartition] " +
- clusterInstance.getNetworkPartitionId() + "is null " +
- "in deployment policy: [application-id]: " + deploymentPolicy.getApplicationId();
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- ClusterInstanceContext clusterInstanceContext = (ClusterInstanceContext) clusterLevelNetworkPartitionContext.
- getInstanceContext(clusterInstance.getInstanceId());
- int maxInstances = 1;
- if (clusterInstanceContext == null) {
- int minInstances = 1;
- try {
- ApplicationHolder.acquireReadLock();
- Application application = ApplicationHolder.getApplications().
- getApplication(cluster.getAppId());
- ClusterDataHolder dataHolder = application.
- getClusterDataHolderRecursivelyByAlias(
- AutoscalerUtil.getAliasFromClusterId(clusterId));
- minInstances = dataHolder.getMinInstances();
- maxInstances = dataHolder.getMaxInstances();
- } finally {
- ApplicationHolder.releaseReadLock();
- }
- clusterInstanceContext = new ClusterInstanceContext(clusterInstance.getInstanceId(),
- clusterLevelNetworkPartitionContext.getPartitionAlgorithm(),
- minInstances, maxInstances, nPartitionId, clusterId, hasScalingDependents, groupScalingEnabledSubtree);
- }
- String partitionId;
- if (childLevelPartition != null) {
- //use it own defined partition
- partitionId = childLevelPartition.getPartitionId();
- maxInstances = childLevelPartition.getMax();
- } else {
- //handling the partition given by the parent
- partitionId = clusterInstance.getPartitionId();
- }
- //Retrieving the actual partition from application
- Partition appPartition = deploymentPolicy.getApplicationLevelNetworkPartition(nPartitionId).
- getPartition(partitionId);
- org.apache.stratos.cloud.controller.stub.domain.Partition partition =
- convertTOCCPartition(appPartition);
-
- //Validate the partition
- //TODO validate partition removal
- //CloudControllerClient.getInstance().validatePartition(partition);
-
- //Creating cluster level partition context
- ClusterLevelPartitionContext clusterLevelPartitionContext = new ClusterLevelPartitionContext(
- maxInstances,
- partition,
- clusterInstance.getNetworkPartitionId(), clusterId);
- clusterLevelPartitionContext.setServiceName(cluster.getServiceName());
- clusterLevelPartitionContext.setProperties(cluster.getProperties());
-
- //add members to partition Context
- addMembersFromTopology(cluster, partition, clusterLevelPartitionContext);
-
- //adding it to the monitors context
- clusterInstanceContext.addPartitionCtxt(clusterLevelPartitionContext);
- if (log.isInfoEnabled()) {
- log.info(String.format("Partition context has been added: [partition] %s",
- clusterLevelPartitionContext.getPartitionId()));
- }
-
- clusterLevelNetworkPartitionContext.addInstanceContext(clusterInstanceContext);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Cluster Instance context has been added: " +
- "[ClusterInstanceContext] %s", clusterInstanceContext.getId()));
- }
-
-
- return clusterLevelNetworkPartitionContext;
- }
-
- private void addMembersFromTopology(Cluster cluster,
- org.apache.stratos.cloud.controller.stub.domain.Partition partition,
- ClusterLevelPartitionContext clusterLevelPartitionContext) {
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
- MemberContext memberContext = new MemberContext();
- memberContext.setClusterId(member.getClusterId());
- memberContext.setMemberId(memberId);
- memberContext.setInitTime(member.getInitTime());
- memberContext.setPartition(partition);
- memberContext.setProperties(AutoscalerUtil.toStubProperties(member.getProperties()));
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- clusterLevelPartitionContext.addActiveMember(memberContext);
- if (log.isDebugEnabled()) {
- String msg = String.format("Active member read from topology and added to active member list: %s", member.toString());
- log.debug(msg);
- }
- } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
- clusterLevelPartitionContext.addPendingMember(memberContext);
- if (log.isDebugEnabled()) {
- String msg = String.format("Pending member read from topology and added to pending member list: %s", member.toString());
- log.debug(msg);
- }
- }
- clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member-id] %s", memberId));
- }
- }
- }
- }
-
- private org.apache.stratos.cloud.controller.stub.domain.Partition convertTOCCPartition(Partition partition) {
- org.apache.stratos.cloud.controller.stub.domain.Partition partition1 = new
- org.apache.stratos.cloud.controller.stub.domain.Partition();
-
- partition1.setId(partition.getId());
- partition1.setProvider(partition.getProvider());
- partition1.setProperties(AutoscalerUtil.toStubProperties(partition.getProperties()));
-
- return partition1;
- }
-}
[3/5] stratos git commit: Removed kubernetes cluster
monitors/contexts and renamed vm cluster monitor to cluster monitor
Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
deleted file mode 100644
index 659751f..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.cluster.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
-import org.apache.stratos.autoscaler.client.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.drools.runtime.StatefulKnowledgeSession;
-
-/*
- * Every kubernetes cluster monitor should extend this class
- */
-public class KubernetesClusterMonitor extends VMClusterMonitor {
-
- private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
-
-// private StatefulKnowledgeSession dependentScaleCheckKnowledgeSession;
-
- protected KubernetesClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) {
-
- super(cluster, hasScalingDependents, groupScalingEnabledSubtree);
-
- autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.CONTAINER_OBSOLETE_CHECK_DROOL_FILE);
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE);
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE);
- autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
-
- this.obsoleteCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.CONTAINER_OBSOLETE_CHECK_DROOL_FILE);
- this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE);
- this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE);
- this.dependentScaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession(
- StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE);
-
- //this.kubernetesClusterCtxt = kubernetesClusterContext;
- }
-
-// @Override
-// public void handleAverageLoadAverageEvent(
-// AverageLoadAverageEvent averageLoadAverageEvent) {
-//
-// String clusterId = averageLoadAverageEvent.getClusterId();
-// float value = averageLoadAverageEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
-// clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setAverageLoadAverage(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-//
-// }
-//
-// @Override
-// public void handleGradientOfLoadAverageEvent(
-// GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-//
-// String clusterId = gradientOfLoadAverageEvent.getClusterId();
-// float value = gradientOfLoadAverageEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
-// clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setLoadAverageGradient(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleSecondDerivativeOfLoadAverageEvent(
-// SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-//
-// String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
-// float value = secondDerivativeOfLoadAverageEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
-// + "[value] %s", clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setLoadAverageSecondDerivative(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleAverageMemoryConsumptionEvent(
-// AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-//
-// String clusterId = averageMemoryConsumptionEvent.getClusterId();
-// float value = averageMemoryConsumptionEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
-// + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setAverageMemoryConsumption(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleGradientOfMemoryConsumptionEvent(
-// GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-//
-// String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
-// float value = gradientOfMemoryConsumptionEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
-// + "[value] %s", clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setMemoryConsumptionGradient(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleSecondDerivativeOfMemoryConsumptionEvent(
-// SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-//
-// String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
-// float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
-// + "[value] %s", clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleAverageRequestsInFlightEvent(
-// AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-//
-// float value = averageRequestsInFlightEvent.getValue();
-// String clusterId = averageRequestsInFlightEvent.getClusterId();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
-// clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setAverageRequestsInFlight(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleGradientOfRequestsInFlightEvent(
-// GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-//
-// String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
-// float value = gradientOfRequestsInFlightEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
-// clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setRequestsInFlightGradient(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleSecondDerivativeOfRequestsInFlightEvent(
-// SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-//
-// String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
-// float value = secondDerivativeOfRequestsInFlightEvent.getValue();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Second derivative of Rif event: [cluster] %s "
-// + "[value] %s", clusterId, value));
-// }
-// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
-// if (null != kubernetesClusterContext) {
-// kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
-// } else {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Kubernetes cluster context is not available for :" +
-// " [cluster] %s", clusterId));
-// }
-// }
-// }
-//
-// @Override
-// public void handleMemberAverageMemoryConsumptionEvent(
-// MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-//
-// String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
-// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-// if (null == memberStatsContext) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member context is not available for : [member] %s", memberId));
-// }
-// return;
-// }
-// float value = memberAverageMemoryConsumptionEvent.getValue();
-// memberStatsContext.setAverageMemoryConsumption(value);
-// }
-//
-// @Override
-// public void handleMemberGradientOfMemoryConsumptionEvent(
-// MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-//
-// String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
-// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-// if (null == memberStatsContext) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member context is not available for : [member] %s", memberId));
-// }
-// return;
-// }
-// float value = memberGradientOfMemoryConsumptionEvent.getValue();
-// memberStatsContext.setGradientOfMemoryConsumption(value);
-// }
-//
-// @Override
-// public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
-// MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-//
-// }
-//
-// @Override
-// public void handleMemberAverageLoadAverageEvent(
-// MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-//
-// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-// String memberId = memberAverageLoadAverageEvent.getMemberId();
-// float value = memberAverageLoadAverageEvent.getValue();
-// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-// if (null == memberStatsContext) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member context is not available for : [member] %s", memberId));
-// }
-// return;
-// }
-// memberStatsContext.setAverageLoadAverage(value);
-// }
-//
-// @Override
-// public void handleMemberGradientOfLoadAverageEvent(
-// MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-//
-// String memberId = memberGradientOfLoadAverageEvent.getMemberId();
-// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-// if (null == memberStatsContext) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member context is not available for : [member] %s", memberId));
-// }
-// return;
-// }
-// float value = memberGradientOfLoadAverageEvent.getValue();
-// memberStatsContext.setGradientOfLoadAverage(value);
-// }
-//
-// @Override
-// public void handleMemberSecondDerivativeOfLoadAverageEvent(
-// MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-//
-// String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
-// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
-// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
-// if (null == memberStatsContext) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member context is not available for : [member] %s", memberId));
-// }
-// return;
-// }
-// float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
-// memberStatsContext.setSecondDerivativeOfLoadAverage(value);
-// }
-//
-// @Override
-// public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-// // kill the container
-// String memberId = memberFaultEvent.getMemberId();
-// Member member = getMemberByMemberId(memberId);
-// if (null == member) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
-// }
-// return;
-// }
-// if (!member.isActive()) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member activated event has not received for the member %s. "
-// + "Therefore ignoring" + " the member fault health stat", memberId));
-// }
-// return;
-// }
-//
-// if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Could not find the active member in kubernetes cluster context, "
-// + "[member] %s ", memberId));
-// }
-// return;
-// }
-//
-// // move member to obsolete list
-// getKubernetesClusterCtxt().moveMemberToObsoleteList(memberId);
-// if (log.isInfoEnabled()) {
-// String clusterId = memberFaultEvent.getClusterId();
-// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-// log.info(String.format("Faulty member is moved to obsolete list and removed from the active members list: "
-// + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
-// }
-// }
-//
-// @Override
-// public void handleMemberStartedEvent(
-// MemberStartedEvent memberStartedEvent) {
-//
-// }
-//
-// @Override
-// public void handleMemberActivatedEvent(
-// MemberActivatedEvent memberActivatedEvent) {
-//
-// KubernetesClusterContext kubernetesClusterContext;
-// kubernetesClusterContext = getKubernetesClusterCtxt();
-// String memberId = memberActivatedEvent.getMemberId();
-// kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
-// if (log.isInfoEnabled()) {
-// log.info(String.format("Member stat context has been added successfully: "
-// + "[member] %s", memberId));
-// }
-// kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
-// }
-//
-// @Override
-// public void handleMemberMaintenanceModeEvent(
-// MemberMaintenanceModeEvent maintenanceModeEvent) {
-//
-// // no need to do anything here
-// // we will not be receiving this event for containers
-// // we will only receive member terminated event
-// }
-//
-// @Override
-// public void handleMemberReadyToShutdownEvent(
-// MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-//
-// // no need to do anything here
-// // we will not be receiving this event for containers
-// // we will only receive member terminated event
-// }
-//
-// @Override
-// public void handleMemberTerminatedEvent(
-// MemberTerminatedEvent memberTerminatedEvent) {
-//
-// String memberId = memberTerminatedEvent.getMemberId();
-// if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member is removed from termination pending members list: "
-// + "[member] %s", memberId));
-// }
-// } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
-// if (log.isDebugEnabled()) {
-// log.debug(String.format("Member is removed from pending members list: "
-// + "[member] %s", memberId));
-// }
-// } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
-// log.warn(String.format("Member is in the wrong list and it is removed from "
-// + "active members list: %s", memberId));
-// } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
-// log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and"
-// + " it is removed from obsolete members list: %s", memberId));
-// } else {
-// log.warn(String.format("Member is not available in any of the list active, "
-// + "pending and termination pending: %s", memberId));
-// }
-//
-// if (log.isInfoEnabled()) {
-// log.info(String.format("Member stat context has been removed successfully: "
-// + "[member] %s", memberId));
-// }
-// }
-//
-// @Override
-// public void handleClusterRemovedEvent(
-// ClusterRemovedEvent clusterRemovedEvent) {
-// getKubernetesClusterCtxt().getPendingMembers().clear();
-// getKubernetesClusterCtxt().getActiveMembers().clear();
-// getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
-// getKubernetesClusterCtxt().getObsoletedMembers().clear();
-// }
-//
-// public KubernetesClusterContext getKubernetesClusterCtxt() {
-// return (KubernetesClusterContext) getClusterContext();
-// }
-//
-// private Member getMemberByMemberId(String memberId) {
-// try {
-// TopologyManager.acquireReadLock();
-// for (Service service : TopologyManager.getTopology().getServices()) {
-// for (Cluster cluster : service.getClusters()) {
-// if (cluster.memberExists(memberId)) {
-// return cluster.getMember(memberId);
-// }
-// }
-// }
-// return null;
-// } finally {
-// TopologyManager.releaseReadLock();
-// }
-// }
-
- @Override
- public void terminateAllMembers(String instanceId, String networkPartitionId) {
- try {
- CloudControllerClient.getInstance().terminateAllContainers(getClusterId());
- } catch (TerminationException e) {
- log.error(String.format("Could not terminate containers: [cluster-id] %s",
- getClusterId()), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
deleted file mode 100644
index f09f440..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
-
- private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
-
- private String lbReferenceType;
-
- public KubernetesServiceClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) {
- super(cluster, hasScalingDependents, groupScalingEnabledSubtree);
- readConfigurations();
- }
-
- @Override
- public void run() {
-
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
- }
- try {
-
- //TODO to get status from correct instance if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
- monitor();
- /*} else {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
- + getStatus() + "state");
- }
- }*/
- } catch (Exception e) {
- log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
- e);
- }
- }
-
-// @Override
-// public void monitor() {
-// final String instanceId = this.getKubernetesClusterCtxt().getInstanceId();
-// Runnable monitoringRunnable = new Runnable() {
-//
-// @Override
-// public void run() {
-// obsoleteCheck();
-// minCheck();
-// scaleCheck(instanceId);
-// }
-// };
-// monitoringRunnable.run();
-// }
-//
-//
-// private void scaleCheck(String instanceId) {
-// boolean rifReset = getKubernetesClusterCtxt().isRifReset();
-// boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
-// boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
-// if (log.isDebugEnabled()) {
-// log.debug("flag of rifReset : " + rifReset
-// + " flag of memoryConsumptionReset : "
-// + memoryConsumptionReset + " flag of loadAverageReset : "
-// + loadAverageReset);
-// }
-// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-// String clusterId = getClusterId();
-// if (rifReset || memoryConsumptionReset || loadAverageReset) {
-// getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
-// getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy(instanceId));
-// getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
-// getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
-// getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
-// if (log.isDebugEnabled()) {
-// log.debug(String.format(
-// "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-// }
-// scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
-// getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
-// getKubernetesClusterCtxt().setRifReset(false);
-// getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
-// getKubernetesClusterCtxt().setLoadAverageReset(false);
-// } else if (log.isDebugEnabled()) {
-// log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
-// + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
-// }
-// }
-//
-// private AutoscalePolicy getAutoscalePolicy(String instanceId) {
-// KubernetesClusterContext kubernetesClusterContext = (KubernetesClusterContext) this.clusterContext;
-// return kubernetesClusterContext.getAutoscalePolicy();
-// }
-//
-// private void minCheck() {
-// getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format(
-// "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-// }
-// minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
-// getMinCheckKnowledgeSession(), minCheckFactHandle,
-// getKubernetesClusterCtxt());
-// }
-//
-// private void obsoleteCheck() {
-// getObsoleteCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
-// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
-// if (log.isDebugEnabled()) {
-// log.debug(String.format(
-// "Running obsolete check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
-// }
-// obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate(
-// getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle,
-// getKubernetesClusterCtxt());
-// }
-//
-// @Override
-// public void destroy() {
-// getMinCheckKnowledgeSession().dispose();
-// getObsoleteCheckKnowledgeSession().dispose();
-// getScaleCheckKnowledgeSession().dispose();
-// setDestroyed(true);
-// stopScheduler();
-// if (log.isDebugEnabled()) {
-// log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
-// }
-// }
-//
-// @Override
-// protected void readConfigurations() {
-// XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
-// int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
-// setMonitorIntervalMilliseconds(monitorInterval);
-// if (log.isDebugEnabled()) {
-// log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
-// }
-// }
-//
-// @Override
-// public String toString() {
-// return "KubernetesServiceClusterMonitor for " + "[ clusterId=" + getClusterId() + "]";
-// }
-//
-// public String getLbReferenceType() {
-// return lbReferenceType;
-// }
-//
-// public void setLbReferenceType(String lbReferenceType) {
-// this.lbReferenceType = lbReferenceType;
-// }
-//
-// @Override
-// public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-//
-// if (properties != null) {
-// Property[] propertyArray = properties.getProperties();
-// if (propertyArray == null) {
-// return;
-// }
-// List<Property> propertyList = Arrays.asList(propertyArray);
-//
-// for (Property property : propertyList) {
-// String key = property.getName();
-// String value = property.getValue();
-//
-// if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
-// int min = Integer.parseInt(value);
-// int max = getKubernetesClusterCtxt().getMaxReplicas();
-// if (min > max) {
-// String msg = String.format("%s should be less than %s . But %s is not less than %s.",
-// StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
-// log.error(msg);
-// throw new InvalidArgumentException(msg);
-// }
-// getKubernetesClusterCtxt().setMinReplicas(min);
-// break;
-// }
-// }
-//
-// }
-// }
-//
-// @Override
-// public void terminateAllMembers(String instanceId, String networkPartitionId) {
-//
-// }
-//
-// @Override
-// public void onChildScalingEvent(MonitorScalingEvent scalingEvent) {
-//
-// }
-//
-// @Override
-// public void onParentScalingEvent(MonitorScalingEvent scalingEvent) {
-//
-// }
-}