You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/31 05:04:05 UTC
[2/5] Adding autoscaler topology event listeners introduced by
service grouping
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
new file mode 100644
index 0000000..e286187
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.partition.PartitionGroup;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.util.Constants;
+
+/*
+ * Factory class for creating cluster monitors.
+ */
+public class ClusterMonitorFactory {
+
+ private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+ /**
+ * @param cluster the cluster to be monitored
+ * @return the created cluster monitor
+ * @throws PolicyValidationException when deployment policy is not valid
+ * @throws PartitionValidationException when partition is not valid
+ */
+ public static AbstractClusterMonitor getMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+
+ AbstractClusterMonitor clusterMonitor;
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor = getDockerServiceClusterMonitor(cluster);
+ } else if (cluster.isLbCluster()) {
+ clusterMonitor = getVMLbClusterMonitor(cluster);
+ } else {
+ clusterMonitor = getVMServiceClusterMonitor(cluster);
+ }
+
+ return clusterMonitor;
+ }
+
+ private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+ // FIXME fix the following code to correctly update
+ // AutoscalerContext context = AutoscalerContext.getInstance();
+ if (null == cluster) {
+ return null;
+ }
+
+ String autoscalePolicyName = cluster.getAutoscalePolicyName();
+ String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Deployment policy name: " + deploymentPolicyName);
+ log.debug("Autoscaler policy name: " + autoscalePolicyName);
+ }
+
+ AutoscalePolicy policy =
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
+ DeploymentPolicy deploymentPolicy =
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
+
+ if (deploymentPolicy == null) {
+ String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ Partition[] allPartitions = deploymentPolicy.getAllPartitions();
+ if (allPartitions == null) {
+ String msg =
+ "Deployment Policy's Partitions are null. Policy name: " +
+ deploymentPolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
+
+ VMServiceClusterMonitor clusterMonitor =
+ new VMServiceClusterMonitor(cluster.getClusterId(),
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
+ clusterMonitor.setStatus(ClusterStatus.Created);
+
+ for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+ NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
+
+ for (Partition partition : partitionGroup.getPartitions()) {
+ PartitionContext partitionContext = new PartitionContext(partition);
+ partitionContext.setServiceName(cluster.getServiceName());
+ partitionContext.setProperties(cluster.getProperties());
+ partitionContext.setNetworkPartitionId(partitionGroup.getId());
+
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
+ MemberContext memberContext = new MemberContext();
+ memberContext.setClusterId(member.getClusterId());
+ memberContext.setMemberId(memberId);
+ memberContext.setInitTime(member.getInitTime());
+ memberContext.setPartition(partition);
+ memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addActiveMember(memberContext);
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+// partitionContext.incrementCurrentActiveMemberCount(1);
+
+ } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addPendingMember(memberContext);
+
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+ } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+// partitionContext.addFaultyMember(memberId);
+ }
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member] %s", memberId));
+ }
+ }
+
+ }
+ networkPartitionContext.addPartitionContext(partitionContext);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Partition context has been added: [partition] %s",
+ partitionContext.getPartitionId()));
+ }
+ }
+
+ clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Network partition context has been added: [network partition] %s",
+ networkPartitionContext.getId()));
+ }
+ }
+
+
+ // find lb reference type
+ java.util.Properties props = cluster.getProperties();
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+ clusterMonitor.setLbReferenceType(value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
+ }
+ }
+
+ // set hasPrimary property
+ // hasPrimary is true if there are primary members available in that cluster
+ clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
+
+ log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
+ return clusterMonitor;
+ }
+
+ private static Properties convertMemberPropsToMemberContextProps(
+ java.util.Properties properties) {
+ Properties props = new Properties();
+ for (Map.Entry<Object, Object> e : properties.entrySet()) {
+ Property prop = new Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ props.addProperties(prop);
+ }
+ return props;
+ }
+
+
+ private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+ // FIXME fix the following code to correctly update
+ // AutoscalerContext context = AutoscalerContext.getInstance();
+ if (null == cluster) {
+ return null;
+ }
+
+ String autoscalePolicyName = cluster.getAutoscalePolicyName();
+ String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Deployment policy name: " + deploymentPolicyName);
+ log.debug("Autoscaler policy name: " + autoscalePolicyName);
+ }
+
+ AutoscalePolicy policy =
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
+ DeploymentPolicy deploymentPolicy =
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
+
+ if (deploymentPolicy == null) {
+ String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ String clusterId = cluster.getClusterId();
+ VMLbClusterMonitor clusterMonitor =
+ new VMLbClusterMonitor(clusterId,
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
+ clusterMonitor.setStatus(ClusterStatus.Created);
+ // partition group = network partition context
+ for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+ NetworkPartitionLbHolder networkPartitionLbHolder =
+ PartitionManager.getInstance()
+ .getNetworkPartitionLbHolder(partitionGroup.getId());
+// PartitionManager.getInstance()
+// .getNetworkPartitionLbHolder(partitionGroup.getId());
+ // FIXME pick a random partition
+ Partition partition =
+ partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+ PartitionContext partitionContext = new PartitionContext(partition);
+ partitionContext.setServiceName(cluster.getServiceName());
+ partitionContext.setProperties(cluster.getProperties());
+ partitionContext.setNetworkPartitionId(partitionGroup.getId());
+ partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
+
+ NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
+ MemberContext memberContext = new MemberContext();
+ memberContext.setClusterId(member.getClusterId());
+ memberContext.setMemberId(memberId);
+ memberContext.setPartition(partition);
+ memberContext.setInitTime(member.getInitTime());
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addActiveMember(memberContext);
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+// partitionContext.incrementCurrentActiveMemberCount(1);
+ } else if (MemberStatus.Created.equals(member.getStatus()) ||
+ MemberStatus.Starting.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addPendingMember(memberContext);
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+ } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+// partitionContext.addFaultyMember(memberId);
+ }
+
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member] %s", memberId));
+ }
+ }
+
+ }
+ networkPartitionContext.addPartitionContext(partitionContext);
+
+ // populate lb cluster id in network partition context.
+ java.util.Properties props = cluster.getProperties();
+
+ // get service type of load balanced cluster
+ String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+
+ if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
+ networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
+
+ } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
+ String serviceName = cluster.getServiceName();
+ // TODO: check if this is correct
+ networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
+
+ if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
+ networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
+ }
+ }
+ }
+ }
+
+ clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+ }
+
+ log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
+ return clusterMonitor;
+ }
+
+ /**
+ * @param cluster - the cluster which needs to be monitored
+ * @return - the cluster monitor
+ */
+ private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
+ throws PolicyValidationException {
+
+ if (null == cluster) {
+ return null;
+ }
+
+ String autoscalePolicyName = cluster.getAutoscalePolicyName();
+ if (log.isDebugEnabled()) {
+ log.debug("Autoscaler policy name: " + autoscalePolicyName);
+ }
+
+ AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
+
+ if (policy == null) {
+ String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ java.util.Properties props = cluster.getProperties();
+ String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
+ KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+ cluster.getClusterId());
+
+ String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
+ if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
+ int minReplicas = Integer.parseInt(minReplicasProperty);
+ kubernetesClusterCtxt.setMinReplicas(minReplicas);
+ }
+
+ String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
+ if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
+ int maxReplicas = Integer.parseInt(maxReplicasProperty);
+ kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
+ }
+
+ KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+ kubernetesClusterCtxt,
+ cluster.getClusterId(),
+ cluster.getServiceName(),
+ policy.getId());
+
+ dockerClusterMonitor.setStatus(ClusterStatus.Created);
+
+ //populate the members after restarting
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ String clusterId = member.getClusterId();
+ MemberContext memberContext = new MemberContext();
+ memberContext.setMemberId(memberId);
+ memberContext.setClusterId(clusterId);
+ memberContext.setInitTime(member.getInitTime());
+
+ // if there is at least one member in the topology, that means service has been created already
+ // this is to avoid calling startContainer() method again
+ kubernetesClusterCtxt.setServiceClusterCreated(true);
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+ log.debug(msg);
+ }
+ dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+ } else if (MemberStatus.Created.equals(member.getStatus())
+ || MemberStatus.Starting.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+ log.debug(msg);
+ }
+ dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+ }
+
+ kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member] %s", memberId));
+ }
+ }
+
+ // find lb reference type
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+ dockerClusterMonitor.setLbReferenceType(value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
+ }
+ }
+
+ log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
+ return dockerClusterMonitor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
new file mode 100644
index 0000000..39fbd46
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/*
+ * Every kubernetes cluster monitor should extend this class
+ */
+public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
+
+ private KubernetesClusterContext kubernetesClusterCtxt;
+ protected String autoscalePolicyId;
+
+ protected KubernetesClusterMonitor(String clusterId, String serviceId,
+ KubernetesClusterContext kubernetesClusterContext,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+ String autoscalePolicyId) {
+
+ super(clusterId, serviceId, autoscalerRuleEvaluator);
+ this.kubernetesClusterCtxt = kubernetesClusterContext;
+ this.autoscalePolicyId = autoscalePolicyId;
+ }
+
+ @Override
+ public void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent) {
+
+ String clusterId = averageLoadAverageEvent.getClusterId();
+ float value = averageLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageLoadAverage(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ @Override
+ public void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
+ float value = gradientOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setLoadAverageGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+ float value = secondDerivativeOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setLoadAverageSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
+ float value = averageMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
+ + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageMemoryConsumption(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+ float value = gradientOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setMemoryConsumptionGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+ float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+ float value = averageRequestsInFlightEvent.getValue();
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageRequestsInFlight(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+ float value = gradientOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setRequestsInFlightGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+ float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageMemoryConsumptionEvent.getValue();
+ memberStatsContext.setAverageMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfMemoryConsumptionEvent.getValue();
+ memberStatsContext.setGradientOfMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+ }
+
+ @Override
+ public void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ float value = memberAverageLoadAverageEvent.getValue();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ memberStatsContext.setAverageLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfLoadAverageEvent.getValue();
+ memberStatsContext.setGradientOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+ memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+ // kill the container
+ String memberId = memberFaultEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+ }
+ return;
+ }
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Could not find the active member in kubernetes cluster context, "
+ + "[member] %s ", memberId));
+ }
+ return;
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ try {
+ ccClient.terminateContainer(memberId);
+ // remove from active member list
+ getKubernetesClusterCtxt().removeActiveMemberById(memberId);
+ if (log.isInfoEnabled()) {
+ String clusterId = memberFaultEvent.getClusterId();
+ String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+ log.info(String.format("Faulty member is terminated and removed from the active members list: "
+ + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
+ }
+ } catch (TerminationException e) {
+ String msg = "Cannot delete a container " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+
+ @Override
+ public void handleMemberStartedEvent(
+ MemberStartedEvent memberStartedEvent) {
+
+ }
+
+ @Override
+ public void handleMemberActivatedEvent(
+ MemberActivatedEvent memberActivatedEvent) {
+
+ KubernetesClusterContext kubernetesClusterContext;
+ kubernetesClusterContext = getKubernetesClusterCtxt();
+ String memberId = memberActivatedEvent.getMemberId();
+ kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added successfully: "
+ + "[member] %s", memberId));
+ }
+ kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // we will only receive member terminated event
+ }
+
+ @Override
+ public void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // we will only receive member terminated event
+ }
+
+ @Override
+ public void handleMemberTerminatedEvent(
+ MemberTerminatedEvent memberTerminatedEvent) {
+
+ String memberId = memberTerminatedEvent.getMemberId();
+ if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from "
+ + "active members list", memberId));
+ } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+ log.warn(String.format("Member's obsolated timeout has been expired and "
+ + "it is removed from obsolated members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, "
+ + "pending and termination pending", memberId));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed successfully: "
+ + "[member] %s", memberId));
+ }
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+ getKubernetesClusterCtxt().getPendingMembers().clear();
+ getKubernetesClusterCtxt().getActiveMembers().clear();
+ getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
+ getKubernetesClusterCtxt().getObsoletedMembers().clear();
+ }
+
+ public KubernetesClusterContext getKubernetesClusterCtxt() {
+ return kubernetesClusterCtxt;
+ }
+
+ public void setKubernetesClusterCtxt(
+ KubernetesClusterContext kubernetesClusterCtxt) {
+ this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+ }
+
+ public AutoscalePolicy getAutoscalePolicy() {
+ return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
+ }
+
+ private Member getMemberByMemberId(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ @Override
+ public void terminateAllMembers() {
+ try {
+ CloudControllerClient.getInstance().terminateAllContainers(getKubernetesClusterCtxt().getClusterId());
+ } catch (TerminationException e) {
+ log.error(String.format("Could not terminate containers: [cluster-id] %s",
+ getKubernetesClusterCtxt().getClusterId()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
new file mode 100644
index 0000000..2615651
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.List;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/*
+ * It is monitoring a kubernetes service cluster periodically.
+ */
+public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
+
+ private String lbReferenceType;
+
+ public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
+ String serviceClusterID, String serviceId,
+ String autoscalePolicyId) {
+ super(serviceClusterID, serviceId, kubernetesClusterCtxt,
+ new AutoscalerRuleEvaluator(
+ StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
+ StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
+ autoscalePolicyId);
+ readConfigurations();
+ }
+
+ @Override
+ public void run() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
+ }
+ try {
+ if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+ + getStatus() + "state");
+ }
+ }
+ } catch (Exception e) {
+ log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
+ e);
+ }
+ }
+
+ @Override
+ protected void monitor() {
+ minCheck();
+ scaleCheck();
+ }
+
+ private void scaleCheck() {
+ boolean rifReset = getKubernetesClusterCtxt().isRifReset();
+ boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
+ boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
+ if (log.isDebugEnabled()) {
+ log.debug("flag of rifReset : " + rifReset
+ + " flag of memoryConsumptionReset : "
+ + memoryConsumptionReset + " flag of loadAverageReset : "
+ + loadAverageReset);
+ }
+ String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+ String clusterId = getClusterId();
+ if (rifReset || memoryConsumptionReset || loadAverageReset) {
+ getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
+ getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
+ getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+ getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+ getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+ }
+ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
+ getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
+ getKubernetesClusterCtxt().setRifReset(false);
+ getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
+ getKubernetesClusterCtxt().setLoadAverageReset(false);
+ } else if (log.isDebugEnabled()) {
+ log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
+ + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
+ }
+ }
+
+ private void minCheck() {
+ getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+ }
+ minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
+ getMinCheckKnowledgeSession(), minCheckFactHandle,
+ getKubernetesClusterCtxt());
+ }
+
+ @Override
+ public void destroy() {
+ getMinCheckKnowledgeSession().dispose();
+ getScaleCheckKnowledgeSession().dispose();
+ setDestroyed(true);
+ stopScheduler();
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
+ }
+ }
+
+ @Override
+ protected void readConfigurations() {
+ XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+ int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
+ setMonitorIntervalMilliseconds(monitorInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "KubernetesServiceClusterMonitor "
+ + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
+ + ", clusterId=" + getClusterId()
+ + ", serviceId=" + getServiceId() + "]";
+ }
+
+ public String getLbReferenceType() {
+ return lbReferenceType;
+ }
+
+ public void setLbReferenceType(String lbReferenceType) {
+ this.lbReferenceType = lbReferenceType;
+ }
+
+ @Override
+ public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+
+ if (properties != null) {
+ Property[] propertyArray = properties.getProperties();
+ if (propertyArray == null) {
+ return;
+ }
+ List<Property> propertyList = Arrays.asList(propertyArray);
+
+ for (Property property : propertyList) {
+ String key = property.getName();
+ String value = property.getValue();
+
+ if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
+ int min = Integer.parseInt(value);
+ int max = getKubernetesClusterCtxt().getMaxReplicas();
+ if (min > max) {
+ String msg = String.format("%s should be less than %s . But %s is not less than %s.",
+ StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
+ log.error(msg);
+ throw new InvalidArgumentException(msg);
+ }
+ getKubernetesClusterCtxt().setMinReplicas(min);
+ break;
+ }
+ }
+
+ }
+ }
+}
\ No newline at end of file