You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/31 05:04:04 UTC
[1/5] Adding autoscaler topology event listeners introduced by
service grouping
Repository: stratos
Updated Branches:
refs/heads/docker-grouping-merge a5dcbaa7d -> e1f37d63f
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
new file mode 100644
index 0000000..d4b6a25
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+abstract public class VMClusterMonitor extends AbstractClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
+ // Map<NetworkpartitionId, Network Partition Context>
+ protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
+ protected DeploymentPolicy deploymentPolicy;
+ protected AutoscalePolicy autoscalePolicy;
+
+ protected VMClusterMonitor(String clusterId, String serviceId,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+ DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
+ Map<String, NetworkPartitionContext> networkPartitionCtxts) {
+ super(clusterId, serviceId, autoscalerRuleEvaluator);
+ this.deploymentPolicy = deploymentPolicy;
+ this.autoscalePolicy = autoscalePolicy;
+ this.networkPartitionCtxts = networkPartitionCtxts;
+ }
+
+ @Override
+ public void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent) {
+
+ String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = averageLoadAverageEvent.getClusterId();
+ float value = averageLoadAverageEvent.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setAverageLoadAverage(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+
+ }
+
+ @Override
+ public void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+ String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
+ float value = gradientOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setLoadAverageGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+ String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+ float value = secondDerivativeOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setLoadAverageSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+ String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
+ float value = averageMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
+ + "[value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setAverageMemoryConsumption(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Network partition context is not available for :"
+ + " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+ String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+ float value = gradientOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setMemoryConsumptionGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+ String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+ float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+ String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
+ float value = averageRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setAverageRequestsInFlight(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+ String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+ float value = gradientOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setRequestsInFlightGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+ String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+ float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+ + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
+ }
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ if (null != networkPartitionContext) {
+ networkPartitionContext.setRequestsInFlightSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ @Override
+ public void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageMemoryConsumptionEvent.getValue();
+ memberStatsContext.setAverageMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfMemoryConsumptionEvent.getValue();
+ memberStatsContext.setGradientOfMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+ }
+
+ @Override
+ public void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageLoadAverageEvent.getValue();
+ memberStatsContext.setAverageLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfLoadAverageEvent.getValue();
+ memberStatsContext.setGradientOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
+ MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+ memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+
+ String memberId = memberFaultEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+ }
+ return;
+ }
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ NetworkPartitionContext nwPartitionCtxt;
+ nwPartitionCtxt = getNetworkPartitionCtxt(member);
+ String partitionId = getPartitionOfMember(memberId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+ if (!partitionCtxt.activeMemberExist(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Could not find the active member in partition context, "
+ + "[member] %s ", memberId));
+ }
+ return;
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ try {
+ ccClient.terminate(memberId);
+ } catch (TerminationException e) {
+ String msg = "TerminationException " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ // remove from active member list
+ partitionCtxt.removeActiveMemberById(memberId);
+ if (log.isInfoEnabled()) {
+ String clusterId = memberFaultEvent.getClusterId();
+ log.info(String.format("Faulty member is terminated and removed from the active members list: "
+ + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ }
+ }
+
+ @Override
+ public void handleMemberStartedEvent(
+ MemberStartedEvent memberStartedEvent) {
+
+ }
+
+ @Override
+ public void handleMemberActivatedEvent(
+ MemberActivatedEvent memberActivatedEvent) {
+
+ String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
+ String partitionId = memberActivatedEvent.getPartitionId();
+ String memberId = memberActivatedEvent.getMemberId();
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext;
+ partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added successfully: "
+ + "[member] %s", memberId));
+ }
+ partitionContext.movePendingMemberToActiveMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+ String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
+ String partitionId = maintenanceModeEvent.getPartitionId();
+ String memberId = maintenanceModeEvent.getMemberId();
+ NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member has been moved as pending termination: "
+ + "[member] %s", memberId));
+ }
+ partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+ NetworkPartitionContext nwPartitionCtxt;
+ String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
+ nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
+
+ // start a new member in the same Partition
+ String memberId = memberReadyToShutdownEvent.getMemberId();
+ String partitionId = getPartitionOfMember(memberId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+ // terminate the shutdown ready member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ try {
+ ccClient.terminate(memberId);
+ // remove from active member list
+ partitionCtxt.removeActiveMemberById(memberId);
+
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
+ log.info(String.format("Member is terminated and removed from the active members list: "
+ + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
+ } catch (TerminationException e) {
+ String msg = "TerminationException" + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+
+ @Override
+ public void handleMemberTerminatedEvent(
+ MemberTerminatedEvent memberTerminatedEvent) {
+
+ String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
+ String memberId = memberTerminatedEvent.getMemberId();
+ String partitionId = memberTerminatedEvent.getPartitionId();
+ NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
+ partitionContext.removeMemberStatsContext(memberId);
+
+ if (partitionContext.removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (partitionContext.removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (partitionContext.removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from "
+ + "active members list", memberId));
+ } else if (partitionContext.removeObsoleteMember(memberId)) {
+ log.warn(String.format("Member's obsolated timeout has been expired and "
+ + "it is removed from obsolated members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, "
+ + "pending and termination pending", memberId));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed successfully: "
+ + "[member] %s", memberId));
+ }
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+
+ }
+
+ private String getNetworkPartitionIdByMemberId(String memberId) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId).getNetworkPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ private Member getMemberByMemberId(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
+ log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
+ String networkPartitionId = member.getNetworkPartitionId();
+ if (networkPartitionCtxts.containsKey(networkPartitionId)) {
+ log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
+ return networkPartitionCtxts.get(networkPartitionId);
+ }
+ log.info("returning null getNetworkPartitionCtxt");
+ return null;
+ }
+
+ public String getPartitionOfMember(String memberId) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId).getPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ public DeploymentPolicy getDeploymentPolicy() {
+ return deploymentPolicy;
+ }
+
+ public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
+ this.deploymentPolicy = deploymentPolicy;
+ }
+
+ public AutoscalePolicy getAutoscalePolicy() {
+ return autoscalePolicy;
+ }
+
+ public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
+ this.autoscalePolicy = autoscalePolicy;
+ }
+
+ public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
+ return networkPartitionCtxts;
+ }
+
+ public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
+ return networkPartitionCtxts.get(networkPartitionId);
+ }
+
+ public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) {
+ this.networkPartitionCtxts = partitionCtxt;
+ }
+
+ public boolean partitionCtxtAvailable(String partitionId) {
+ return networkPartitionCtxts.containsKey(partitionId);
+ }
+
+ public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
+ this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
+ }
+
+ public NetworkPartitionContext getPartitionCtxt(String id) {
+ return this.networkPartitionCtxts.get(id);
+ }
+
+ @Override
+ public void terminateAllMembers() {
+
+ Thread memberTerminator = new Thread(new Runnable(){
+ public void run(){
+
+ for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
+ for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
+ //if (log.isDebugEnabled()) {
+ log.info("Starting to terminate all members in Network Partition [ " +
+ networkPartitionContext.getId() + " ], Partition [ " +
+ partitionContext.getPartitionId() + " ]");
+ // }
+ // need to terminate active, pending and obsolete members
+
+ // active members
+ for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) {
+ log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId());
+ terminateMember(activeMemberCtxt.getMemberId());
+ }
+
+ // pending members
+ for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) {
+ log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId());
+ terminateMember(pendingMemberCtxt.getMemberId());
+ }
+
+ // obsolete members
+ for (String obsoleteMemberId : partitionContext.getObsoletedMembers().keySet()) {
+ log.info("Terminating obsolete member [member id] " + obsoleteMemberId);
+ terminateMember(obsoleteMemberId);
+ }
+
+// terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll
+// (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext);
+ }
+ }
+ }
+ }, "Member Terminator - [cluster id] " + getClusterId());
+
+ memberTerminator.start();
+ }
+
+ private static void terminateMember (String memberId) {
+ try {
+ CloudControllerClient.getInstance().terminate(memberId);
+
+ } catch (TerminationException e) {
+ log.error("Unable to terminate member [member id ] " + memberId, e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
new file mode 100644
index 0000000..8a0959c
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class VMLbClusterMonitor extends VMClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
+
+ public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
+ AutoscalePolicy autoscalePolicy) {
+ super(clusterId, serviceId,
+ new AutoscalerRuleEvaluator(
+ StratosConstants.VM_MIN_CHECK_DROOL_FILE,
+ StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
+ deploymentPolicy, autoscalePolicy,
+ new ConcurrentHashMap<String, NetworkPartitionContext>());
+ readConfigurations();
+ }
+
+ @Override
+ public void run() {
+
+ while (!isDestroyed()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is running.. " + this.toString());
+ }
+ try {
+ if (!ClusterStatus.Inactive.equals(status)) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("LB Cluster monitor is suspended as the cluster is in " +
+ ClusterStatus.Inactive + " mode......");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
+ }
+ try {
+ Thread.sleep(getMonitorIntervalMilliseconds());
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+
+ @Override
+ protected void monitor() {
+ // TODO make this concurrent
+ for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
+
+ // minimum check per partition
+ for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
+ .values()) {
+
+ if (partitionContext != null) {
+ getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running minimum check for partition %s ",
+ partitionContext.getPartitionId()));
+ }
+
+ minCheckFactHandle =
+ AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
+ minCheckFactHandle,
+ partitionContext);
+ // start only in the first partition context
+ break;
+ }
+
+ }
+
+ }
+ }
+
+ @Override
+ public void destroy() {
+ getMinCheckKnowledgeSession().dispose();
+ getMinCheckKnowledgeSession().dispose();
+ setDestroyed(true);
+ stopScheduler();
+ if (log.isDebugEnabled()) {
+ log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
+ }
+ }
+
+ @Override
+ protected void readConfigurations() {
+ XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+ int monitorInterval = conf.getInt(AutoScalerConstants.VMLb_Cluster_MONITOR_INTERVAL, 90000);
+ setMonitorIntervalMilliseconds(monitorInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("VMLbClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+
+ String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
+ String clusterId = clusterRemovedEvent.getClusterId();
+ DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
+ if (depPolicy != null) {
+ List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+ .getNetworkPartitionLbHolders(depPolicy);
+
+ for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+ // removes lb cluster ids
+ boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+ if (isRemoved) {
+ log.info("Removed the lb cluster [id]:"
+ + clusterId
+ + " reference from Network Partition [id]: "
+ + networkPartitionLbHolder
+ .getNetworkPartitionId());
+
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(networkPartitionLbHolder);
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "VMLbClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + "]";
+ }
+
+ @Override
+ public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+ // TODO
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java
new file mode 100644
index 0000000..9d0f134
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ */
+public class VMServiceClusterMonitor extends VMClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(VMServiceClusterMonitor.class);
+ private String lbReferenceType;
+ private boolean hasPrimary;
+
+ public VMServiceClusterMonitor(String clusterId, String serviceId,
+ DeploymentPolicy deploymentPolicy,
+ AutoscalePolicy autoscalePolicy) {
+ super(clusterId, serviceId,
+ new AutoscalerRuleEvaluator(StratosConstants.VM_MIN_CHECK_DROOL_FILE,
+ StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
+ deploymentPolicy, autoscalePolicy,
+ new ConcurrentHashMap<String, NetworkPartitionContext>());
+ readConfigurations();
+ }
+
+ @Override
+ public void run() {
+ while (!isDestroyed()) {
+ try {
+ if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
+ (this.status == ClusterStatus.Inactive && !hasDependent) ||
+ !this.hasFaultyMember) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is running.. " + this.toString());
+ }
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is suspended as the cluster is in " +
+ ClusterStatus.Inactive + " mode......");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Cluster monitor: Monitor failed." + this.toString(), e);
+ }
+ try {
+ Thread.sleep(getMonitorIntervalMilliseconds());
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+
+ @Override
+ protected void monitor() {
+
+ //TODO make this concurrent
+ for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
+ // store primary members in the network partition context
+ List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
+
+ //minimum check per partition
+ for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
+ // store primary members in the partition context
+ List<String> primaryMemberListInPartition = new ArrayList<String>();
+ // get active primary members in this partition context
+ for (MemberContext memberContext : partitionContext.getActiveMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInPartition.add(memberContext.getMemberId());
+ }
+ }
+ // get pending primary members in this partition context
+ for (MemberContext memberContext : partitionContext.getPendingMembers()) {
+ if (isPrimaryMember(memberContext)) {
+ primaryMemberListInPartition.add(memberContext.getMemberId());
+ }
+ }
+ primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
+ getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+ getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+ getMinCheckKnowledgeSession().setGlobal("primaryMemberCount", primaryMemberListInPartition.size());
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
+ }
+
+ minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession()
+ , minCheckFactHandle, partitionContext);
+
+ }
+
+ boolean rifReset = networkPartitionContext.isRifReset();
+ boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
+ boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
+ if (log.isDebugEnabled()) {
+ log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
+ + " flag of loadAverageReset" + loadAverageReset);
+ }
+ if (rifReset || memoryConsumptionReset || loadAverageReset) {
+ getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
+ getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
+ getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+ getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+ getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+ getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+ getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+ getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
+ log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
+ }
+
+ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
+ , scaleCheckFactHandle, networkPartitionContext);
+
+ networkPartitionContext.setRifReset(false);
+ networkPartitionContext.setMemoryConsumptionReset(false);
+ networkPartitionContext.setLoadAverageReset(false);
+ } else if (log.isDebugEnabled()) {
+ log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
+ "cycle for network partition %s", networkPartitionContext.getId()));
+ }
+ }
+ }
+
+ private boolean isPrimaryMember(MemberContext memberContext) {
+ Properties props = memberContext.getProperties();
+ if (log.isDebugEnabled()) {
+ log.debug(" Properties [" + props + "] ");
+ }
+ if (props != null && props.getProperties() != null) {
+ for (Property prop : props.getProperties()) {
+ if (prop.getName().equals("PRIMARY")) {
+ if (Boolean.parseBoolean(prop.getValue())) {
+ log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
+ "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected void readConfigurations() {
+ XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+ int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
+ setMonitorIntervalMilliseconds(monitorInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public void destroy() {
+ getMinCheckKnowledgeSession().dispose();
+ getScaleCheckKnowledgeSession().dispose();
+ setDestroyed(true);
+ stopScheduler();
+ if (log.isDebugEnabled()) {
+ log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
+ ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
+ ", lbReferenceType=" + lbReferenceType +
+ ", hasPrimary=" + hasPrimary + " ]";
+ }
+
+ public String getLbReferenceType() {
+ return lbReferenceType;
+ }
+
+ public void setLbReferenceType(String lbReferenceType) {
+ this.lbReferenceType = lbReferenceType;
+ }
+
+ public boolean isHasPrimary() {
+ return hasPrimary;
+ }
+
+ public void setHasPrimary(boolean hasPrimary) {
+ this.hasPrimary = hasPrimary;
+ }
+
+ @Override
+ public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+ // TODO
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
index 5f3b590..73d85f0 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
@@ -24,7 +24,7 @@ import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
import org.apache.stratos.autoscaler.PartitionContext;
import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
[5/5] git commit: Adding autoscaler topology event listeners
introduced by service grouping
Posted by im...@apache.org.
Adding autoscaler topology event listeners introduced by service grouping
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e1f37d63
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e1f37d63
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e1f37d63
Branch: refs/heads/docker-grouping-merge
Commit: e1f37d63f291d091f6bf9daf216a66a171162ec1
Parents: a5dcbaa
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Oct 31 09:33:56 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Oct 31 09:33:56 2014 +0530
----------------------------------------------------------------------
.../stratos/autoscaler/AutoscalerContext.java | 47 +-
.../autoscaler/api/AutoScalerServiceImpl.java | 2 +-
.../AutoscalerHealthStatEventReceiver.java | 2 +-
.../AutoscalerTopologyEventReceiver.java | 803 ++++++++++++++++---
.../monitor/AbstractClusterMonitor.java | 287 -------
.../monitor/ApplicationMonitorFactory.java | 235 ------
.../monitor/ClusterMonitorFactory.java | 444 ----------
.../monitor/KubernetesClusterMonitor.java | 510 ------------
.../KubernetesServiceClusterMonitor.java | 202 -----
.../monitor/ParentComponentMonitor.java | 5 +-
.../autoscaler/monitor/VMClusterMonitor.java | 639 ---------------
.../autoscaler/monitor/VMLbClusterMonitor.java | 181 -----
.../monitor/VMServiceClusterMonitor.java | 235 ------
.../monitor/application/ApplicationMonitor.java | 2 +-
.../application/ApplicationMonitorFactory.java | 225 ++++++
.../monitor/cluster/AbstractClusterMonitor.java | 290 +++++++
.../monitor/cluster/ClusterMonitorFactory.java | 442 ++++++++++
.../cluster/KubernetesClusterMonitor.java | 520 ++++++++++++
.../KubernetesServiceClusterMonitor.java | 197 +++++
.../monitor/cluster/VMClusterMonitor.java | 692 ++++++++++++++++
.../monitor/cluster/VMLbClusterMonitor.java | 181 +++++
.../cluster/VMServiceClusterMonitor.java | 235 ++++++
.../status/checker/StatusChecker.java | 2 +-
23 files changed, 3498 insertions(+), 2880 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
index 2d10954..e8553bc 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java
@@ -25,7 +25,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
/**
* It holds all cluster monitors which are active in stratos.
@@ -35,16 +36,15 @@ public class AutoscalerContext {
private static final Log log = LogFactory.getLog(AutoscalerContext.class);
private static final AutoscalerContext INSTANCE = new AutoscalerContext();
- private AutoscalerContext() {
- try {
- setClusterMonitors(new HashMap<String, AbstractClusterMonitor>());
- } catch (Exception e) {
- log.error("Rule evaluateMinCheck error", e);
- }
- }
-
// Map<ClusterId, AbstractClusterMonitor>
private Map<String, AbstractClusterMonitor> clusterMonitors;
+ // Map<ApplicationId, ApplicationMonitor>
+ private Map<String, ApplicationMonitor> applicationMonitors;
+
+ private AutoscalerContext() {
+ clusterMonitors = new HashMap<String, AbstractClusterMonitor>();
+ applicationMonitors = new HashMap<String, ApplicationMonitor>();
+ }
public static AutoscalerContext getInstance() {
return INSTANCE;
@@ -58,22 +58,27 @@ public class AutoscalerContext {
return clusterMonitors.get(clusterId);
}
- public Map<String, AbstractClusterMonitor> getClusterMonitors() {
- return clusterMonitors;
+ public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
+ return clusterMonitors.remove(clusterId);
}
- public void setClusterMonitors(Map<String, AbstractClusterMonitor> clusterMonitors) {
- this.clusterMonitors = clusterMonitors;
+ public void addAppMonitor(ApplicationMonitor applicationMonitor) {
+ applicationMonitors.put(applicationMonitor.getId(), applicationMonitor);
}
- public AbstractClusterMonitor removeClusterMonitor(String clusterId) {
+ public ApplicationMonitor getAppMonitor(String applicationId) {
+ return applicationMonitors.get(applicationId);
+ }
+
+ public void removeAppMonitor(String applicationId) {
+ applicationMonitors.remove(applicationId);
+ }
+
+ public boolean appMonitorExist(String applicationId) {
+ return applicationMonitors.containsKey(applicationId);
+ }
- AbstractClusterMonitor monitor = clusterMonitors.remove(clusterId);
- if (monitor == null) {
- log.fatal("ClusterMonitor not found for cluster id: " + clusterId);
- } else {
- log.info("Removed ClusterMonitor [cluster id]: " + clusterId);
- }
- return monitor;
+ public boolean clusterMonitorExist(String clusterId) {
+ return clusterMonitors.containsKey(clusterId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
index 7748c09..3066034 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java
@@ -27,7 +27,7 @@ import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.*;
import org.apache.stratos.autoscaler.interfaces.AutoScalerServiceInterface;
import org.apache.stratos.autoscaler.kubernetes.KubernetesManager;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.partition.PartitionGroup;
import org.apache.stratos.autoscaler.partition.PartitionManager;
import org.apache.stratos.autoscaler.policy.PolicyManager;
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
index cd9aa9d..a5c6577 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -21,7 +21,7 @@ package org.apache.stratos.autoscaler.message.receiver.health;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index e03ff52..d6a140a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -21,37 +21,35 @@ package org.apache.stratos.autoscaler.message.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
+import org.apache.stratos.autoscaler.*;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.*;
+import org.apache.stratos.autoscaler.grouping.topic.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.KubernetesClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.autoscaler.status.checker.StatusChecker;
+import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener;
-import org.apache.stratos.messaging.listener.topology.MemberReadyToShutdownEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
+import java.util.List;
+import java.util.Set;
+
/**
* Autoscaler topology receiver.
*/
@@ -61,6 +59,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
private TopologyEventReceiver topologyEventReceiver;
private boolean terminated;
+ private boolean topologyInitialized;
public AutoscalerTopologyEventReceiver() {
this.topologyEventReceiver = new TopologyEventReceiver();
@@ -97,146 +96,614 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
- try {
+ if (!topologyInitialized) {
+ log.info("[CompleteTopologyEvent] Received: " + event.getClass());
+
TopologyManager.acquireReadLock();
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- startClusterMonitor(cluster);
+ try {
+ for (Application application : TopologyManager.getTopology().getApplications()) {
+ startApplicationMonitor(application.getUniqueIdentifier());
}
+
+ topologyInitialized = true;
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
}
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- } finally {
- TopologyManager.releaseReadLock();
}
}
});
- topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
+ topologyEventReceiver.addEventListener(new ApplicationCreatedEventListener() {
@Override
protected void onEvent(Event event) {
try {
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
+ log.info("[ApplicationCreatedEvent] Received: " + event.getClass());
+ ApplicationCreatedEvent applicationCreatedEvent = (ApplicationCreatedEvent) event;
+ try {
+
+ //acquire read lock
+ TopologyManager.acquireReadLockForApplication(
+ applicationCreatedEvent.getApplication().getUniqueIdentifier());
+ //start the application monitor
+ startApplicationMonitor(applicationCreatedEvent.getApplication().getUniqueIdentifier());
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ } finally {
+ //release read lock
+ TopologyManager.releaseReadLockForApplication(
+ applicationCreatedEvent.getApplication().getUniqueIdentifier());
}
- monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
+ } catch (ClassCastException e) {
+ String msg = "Error while casting the event " + e.getLocalizedMessage();
log.error(msg, e);
}
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterActivatedEvent] Received: " + event.getClass());
+
+ ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event;
+ String appId = clusterActivatedEvent.getAppId();
+ String clusterId = clusterActivatedEvent.getClusterId();
+ AbstractClusterMonitor clusterMonitor =
+ AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if(clusterMonitor!= null) {
+ clusterMonitor.setStatus(ClusterStatus.Active);
+ }
+
}
});
topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ log.info("[ClusterCreatedEvent] Received: " + event.getClass());
+
+ ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
+ String clusterId = clusterCreatedEvent.getClusterId();
+ AbstractClusterMonitor clusterMonitor =
+ AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ clusterMonitor.setStatus(ClusterStatus.Created);
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ClusterInActivateEvent] Received: " + event.getClass());
+
+ ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event;
+ String appId = clusterInactivateEvent.getAppId();
+ String clusterId = clusterInactivateEvent.getClusterId();
+ AbstractClusterMonitor clusterMonitor =
+ AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if(clusterMonitor!= null) {
+ clusterMonitor.setStatus(ClusterStatus.Inactive);
+ }
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ClusterTerminatingEvent] Received: " + event.getClass());
+
+ ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event;
+ String clusterId = clusterTerminatingEvent.getClusterId();
+ AbstractClusterMonitor clusterMonitor =
+ AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if (clusterMonitor != null) {
+ if (clusterMonitor.getStatus() == ClusterStatus.Active) {
+ // terminated gracefully
+ clusterMonitor.setStatus(ClusterStatus.Terminating);
+ InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+ } else {
+ clusterMonitor.setStatus(ClusterStatus.Terminating);
+ clusterMonitor.terminateAllMembers();
+ }
+
+ } else {
+ log.warn("No Cluster Monitor found for cluster id " + clusterId);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ClusterTerminatedEvent] Received: " + event.getClass());
+
+ ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event;
+ String appId = clusterTerminatedEvent.getAppId();
+ String clusterId = clusterTerminatedEvent.getClusterId();
+ AbstractClusterMonitor clusterMonitor =
+ AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if (clusterMonitor != null) {
+ clusterMonitor.setStatus(ClusterStatus.Terminated);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new GroupActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[GroupActivatedEvent] Received: " + event.getClass());
+
+ GroupActivatedEvent groupActivatedEvent = (GroupActivatedEvent) event;
+ String appId = groupActivatedEvent.getAppId();
+ String groupId = groupActivatedEvent.getGroupId();
+
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+ GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if (monitor != null) {
+ monitor.setStatus(GroupStatus.Active);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new GroupInActivateEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[GroupInActivateEvent] Received: " + event.getClass());
+
+ GroupInactivateEvent groupInactivateEvent = (GroupInactivateEvent) event;
+ String appId = groupInactivateEvent.getAppId();
+ String groupId = groupInactivateEvent.getGroupId();
+
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+ GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if (monitor != null) {
+ monitor.setStatus(GroupStatus.Inactive);
+ }
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new GroupTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[GroupTerminatingEvent] Received: " + event.getClass());
+
+ GroupTerminatingEvent groupTerminatingEvent = (GroupTerminatingEvent) event;
+ String appId = groupTerminatingEvent.getAppId();
+ String groupId = groupTerminatingEvent.getGroupId();
+
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+ GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if (monitor != null) {
+ monitor.setStatus(GroupStatus.Terminating);
+ }
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new GroupTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[GroupTerminatedEvent] Received: " + event.getClass());
+
+ GroupTerminatedEvent groupTerminatedEvent = (GroupTerminatedEvent) event;
+ String appId = groupTerminatedEvent.getAppId();
+ String groupId = groupTerminatedEvent.getGroupId();
+
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+ GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId);
+
+ //changing the status in the monitor, will notify its parent monitor
+ if (monitor != null) {
+ monitor.setStatus(GroupStatus.Terminated);
+ }
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ApplicationActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ApplicationActivatedEvent] Received: " + event.getClass());
+
+ ApplicationActivatedEvent applicationActivatedEvent = (ApplicationActivatedEvent) event;
+ String appId = applicationActivatedEvent.getAppId();
+
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId);
+ if(appMonitor != null) {
+ appMonitor.setStatus(ApplicationStatus.Active);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ApplicationUndeployedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ApplicationUndeployedEvent] Received: " + event.getClass());
+
+ ApplicationUndeployedEvent applicationUndeployedEvent = (ApplicationUndeployedEvent) event;
+
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().
+ getAppMonitor(applicationUndeployedEvent.getApplicationId());
+
+ // if any of Cluster Monitors are not added yet, should send the
+ // Cluster Terminated event for those clusters
+ Set<ClusterDataHolder> clusterDataHolders = applicationUndeployedEvent.getClusterData();
+ if (clusterDataHolders != null) {
+ for (ClusterDataHolder clusterDataHolder : clusterDataHolders) {
+ AbstractClusterMonitor clusterMonitor =
+ AutoscalerContext.getInstance().getClusterMonitor(clusterDataHolder.getClusterId());
+ if (clusterMonitor == null) {
+ // Cluster Monitor not found; send Cluster Terminated event to cleanup
+ StatusEventPublisher.sendClusterTerminatedEvent(
+ applicationUndeployedEvent.getApplicationId(),
+ clusterDataHolder.getServiceType(),
+ clusterDataHolder.getClusterId());
+ } else {
+ // if the Cluster Monitor exists, mark it as destroyed to stop it from spawning
+ // more instances
+ clusterMonitor.setDestroyed(true);
+ }
+ }
+ }
+
+ if (appMonitor != null) {
+ // set Application Monitor state to 'Terminating'
+ appMonitor.setStatus(ApplicationStatus.Terminating);
+
+ } else {
+ // ApplicationMonitor is not found, send Terminating event to clean up
+ StatusEventPublisher.sendApplicationTerminatedEvent(
+ applicationUndeployedEvent.getApplicationId(), applicationUndeployedEvent.getClusterData());
+ }
+ }
+ });
+
+
+ topologyEventReceiver.addEventListener(new ApplicationTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ApplicationTerminatingEvent] Received: " + event.getClass());
+
+ ApplicationTerminatingEvent appTerminatingEvent = (ApplicationTerminatingEvent) event;
+
+ // acquire read locks for application and relevant clusters
+ TopologyManager.acquireReadLockForApplication(appTerminatingEvent.getAppId());
+
try {
- log.info("Event received: " + event);
- ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event;
- TopologyManager.acquireReadLock();
- Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName());
- Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId());
- startClusterMonitor(cluster);
+ ApplicationMonitor appMonitor = AutoscalerContext.getInstance().
+ getAppMonitor(appTerminatingEvent.getAppId());
+
+ if (appMonitor != null) {
+ // update the status as Terminating
+ appMonitor.setStatus(ApplicationStatus.Terminating);
+
+ } else {
+ log.warn("Application Monitor cannot be found for the undeployed [application] "
+ + appTerminatingEvent.getAppId());
+ }
+
+ } finally {
+ TopologyManager.
+ releaseReadLockForApplication(appTerminatingEvent.getAppId());
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ApplicationTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ log.info("[ApplicationTerminatedEvent] Received: " + event.getClass());
+
+ ApplicationTerminatedEvent applicationRemovedEvent = (ApplicationTerminatedEvent) event;
+ Set<ClusterDataHolder> clusterDataHolders = applicationRemovedEvent.getClusterData();
+
+ try {
+ //TODO remove monitors as well as any starting or pending threads
+ ApplicationMonitor monitor = AutoscalerContext.getInstance().
+ getAppMonitor(applicationRemovedEvent.getAppId());
+ if (monitor != null) {
+ for (ClusterDataHolder clusterData : clusterDataHolders) {
+ //stopping the cluster monitor and remove it from the AS
+ VMClusterMonitor clusterMonitor = ((VMClusterMonitor)
+ AutoscalerContext.getInstance().getClusterMonitor(clusterData.getClusterId()));
+ if (clusterMonitor != null) {
+ clusterMonitor.setDestroyed(true);
+ AutoscalerContext.getInstance().removeClusterMonitor(clusterData.getClusterId());
+ } else {
+ log.warn("Cluster Monitor not found for [ cluster id ] " +
+ clusterData.getClusterId() + ", unable to remove");
+ }
+ }
+ //removing the application monitor
+ AutoscalerContext.getInstance().
+ removeAppMonitor(applicationRemovedEvent.getAppId());
+ } else {
+ log.warn("Application Monitor cannot be found for the terminated [application] "
+ + applicationRemovedEvent.getAppId() + ", unable to remove");
+ }
+
+
} catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
+ String msg = "Error processing event " + e.getMessage();
log.error(msg, e);
- } finally {
- TopologyManager.releaseReadLock();
}
}
});
- topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+ topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
@Override
protected void onEvent(Event event) {
try {
- ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
- String clusterId = clusterRemovedEvent.getClusterId();
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent =
+ (MemberReadyToShutdownEvent) event;
AutoscalerContext asCtx = AutoscalerContext.getInstance();
AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
+ String memberId = memberReadyToShutdownEvent.getMemberId();
+
+ if (asCtx.clusterMonitorExist(clusterId)) {
+ monitor = asCtx.getClusterMonitor(clusterId);
+ } else {
if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ log.debug(String.format("A cluster monitor is not found " +
+ "in autoscaler context [cluster] %s", clusterId));
}
return;
}
- monitor.handleClusterRemovedEvent(clusterRemovedEvent);
- asCtx.removeClusterMonitor(clusterId);
- monitor.destroy();
- log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
- clusterId));
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
+
+ if(monitor instanceof VMClusterMonitor) {
+ VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+ NetworkPartitionContext nwPartitionCtxt;
+ nwPartitionCtxt = vmClusterMonitor.getNetworkPartitionCtxt(
+ memberReadyToShutdownEvent.getNetworkPartitionId());
+
+ String partitionId = vmClusterMonitor.getPartitionOfMember(memberId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+ // terminate the member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ ccClient.terminate(memberId);
+
+ // remove from active member list
+ partitionCtxt.removeActiveMemberById(memberId);
+
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member is terminated and removed from the active " +
+ "members list: [member] %s [partition] %s [cluster] %s ",
+ memberId, partitionId, clusterId));
+ }
+ } else if(monitor instanceof KubernetesClusterMonitor) {
+ KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+ kubernetesClusterMonitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ }
+ } catch (TerminationException e) {
+ log.error(e);
}
}
+
});
- topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
+ topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
@Override
protected void onEvent(Event event) {
- }
+ ClusterRemovedEvent clusterRemovedEvent = null;
+ try {
+ clusterRemovedEvent = (ClusterRemovedEvent) event;
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(clusterRemovedEvent.getServiceName(),
+ clusterRemovedEvent.getClusterId());
+
+ String clusterId = clusterRemovedEvent.getClusterId();
+ String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
+
+ AbstractClusterMonitor monitor;
+
+ if (clusterRemovedEvent.isLbCluster()) {
+ DeploymentPolicy depPolicy = PolicyManager.getInstance().
+ getDeploymentPolicy(deploymentPolicy);
+ if (depPolicy != null) {
+ List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
+ .getNetworkPartitionLbHolders(depPolicy);
+
+ for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
+ // removes lb cluster ids
+ boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
+ if (isRemoved) {
+ log.info("Removed the lb cluster [id]:"
+ + clusterId
+ + " reference from Network Partition [id]: "
+ + networkPartitionLbHolder
+ .getNetworkPartitionId());
+
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(networkPartitionLbHolder);
+ }
+
+ }
+ }
+ monitor = AutoscalerContext.getInstance()
+ .removeClusterMonitor(clusterId);
+ } else {
+ monitor = (AbstractClusterMonitor) AutoscalerContext.getInstance()
+ .removeClusterMonitor(clusterId);
+ }
+
+ // runTerminateAllRule(monitor);
+ if (monitor != null) {
+ monitor.destroy();
+ log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ",
+ clusterId));
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(clusterRemovedEvent.getServiceName(),
+ clusterRemovedEvent.getClusterId());
+ }
+ }
});
topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ MemberTerminatedEvent memberTerminatedEvent = null;
try {
- MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+ //TopologyManager.acquireReadLock();
+
+ memberTerminatedEvent = (MemberTerminatedEvent) event;
+ String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
String clusterId = memberTerminatedEvent.getClusterId();
- AbstractClusterMonitor monitor;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
+ String partitionId = memberTerminatedEvent.getPartitionId();
+
+ TopologyManager.acquireReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
+
+ AbstractClusterMonitor monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ if(monitor == null) {
+ log.error(String.format("Cluster monitor not found in autoscaler context: [clusterId] %s ", clusterId));
return;
}
- monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+
+ if(monitor instanceof VMClusterMonitor) {
+ VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+ NetworkPartitionContext networkPartitionContext = vmClusterMonitor.
+ getNetworkPartitionCtxt(networkPartitionId);
+
+ PartitionContext partitionContext = networkPartitionContext.
+ getPartitionCtxt(partitionId);
+ String memberId = memberTerminatedEvent.getMemberId();
+ partitionContext.removeMemberStatsContext(memberId);
+
+
+ if (partitionContext.removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending " +
+ "members list: [member] %s", memberId));
+ }
+ } else if (partitionContext.removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: " +
+ "[member] %s", memberId));
+ }
+ } else if (partitionContext.removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed " +
+ "from active members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list " +
+ "active, pending and termination pending", memberId));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed " +
+ " successfully: [member] %s", memberId));
+ }
+ //Checking whether the cluster state can be changed either from in_active to created/terminating to terminated
+ StatusChecker.getInstance().onMemberTermination(clusterId);
+ } else if(monitor instanceof KubernetesClusterMonitor) {
+ KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+ kubernetesClusterMonitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+ }
+
} catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
+ log.error("Error processing event", e);
+ } finally {
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberTerminatedEvent.getServiceName(),
+ memberTerminatedEvent.getClusterId());
}
}
-
});
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
+
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
+
try {
- MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
String clusterId = memberActivatedEvent.getClusterId();
- AbstractClusterMonitor monitor;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
+ String partitionId = memberActivatedEvent.getPartitionId();
+ String memberId = memberActivatedEvent.getMemberId();
+
+ AbstractClusterMonitor monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ if(monitor == null) {
+ log.error(String.format("Cluster monitor not found in autoscaler context: [clusterId] %s ", clusterId));
return;
}
- monitor.handleMemberActivatedEvent(memberActivatedEvent);
+
+ if(monitor instanceof VMClusterMonitor) {
+ VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+ NetworkPartitionContext networkPartitionContext = vmClusterMonitor.
+ getNetworkPartitionCtxt(networkPartitionId);
+ PartitionContext partitionContext = networkPartitionContext.
+ getPartitionCtxt(partitionId);
+
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ // TODO starting the pending clusters which are waiting for this member activation in a cluster
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added " +
+ "successfully: [member] %s", memberId));
+ }
+ partitionContext.movePendingMemberToActiveMembers(memberId);
+ //triggering the status checker
+ StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId());
+ } else if(monitor instanceof KubernetesClusterMonitor) {
+ KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+ kubernetesClusterMonitor.handleMemberActivatedEvent(memberActivatedEvent);
+ }
+
} catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
+ log.error("Error processing event", e);
+ } finally {
+ //TopologyManager.releaseReadLock();
+ TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(),
+ memberActivatedEvent.getClusterId());
}
}
});
@@ -244,34 +711,51 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
@Override
protected void onEvent(Event event) {
+
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+ //TopologyManager.acquireReadLock();
+ TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+ memberMaintenanceModeEvent.getClusterId());
+
try {
- MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
- String clusterId = maintenanceModeEvent.getClusterId();
- AbstractClusterMonitor monitor;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
+
+ String memberId = memberMaintenanceModeEvent.getMemberId();
+ String partitionId = memberMaintenanceModeEvent.getPartitionId();
+ String networkPartitionId = memberMaintenanceModeEvent.getNetworkPartitionId();
+
+ PartitionContext partitionContext;
+ String clusterId = memberMaintenanceModeEvent.getClusterId();
+
+ AbstractClusterMonitor monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
+ if(monitor == null) {
+ log.error(String.format("Cluster monitor not found in autoscaler context: [clusterId] %s ", clusterId));
+ return;
+ }
+
+ if(monitor instanceof VMClusterMonitor) {
+ VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor;
+ partitionContext = vmClusterMonitor.getNetworkPartitionCtxt(networkPartitionId).
+ getPartitionCtxt(partitionId);
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ log.debug(String.format("Member has been moved as pending termination: " +
+ "[member] %s", memberId));
}
- return;
+ partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
+ } else if(monitor instanceof KubernetesClusterMonitor) {
+ KubernetesClusterMonitor kubernetesClusterMonitor = (KubernetesClusterMonitor) monitor;
+ kubernetesClusterMonitor.handleMemberMaintenanceModeEvent(memberMaintenanceModeEvent);
}
- monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+
} catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(),
+ memberMaintenanceModeEvent.getClusterId());
}
}
});
-
-
- topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- }
- });
}
private class ClusterMonitorAdder implements Runnable {
@@ -371,4 +855,85 @@ public class AutoscalerTopologyEventReceiver implements Runnable {
}
}
}
+
+ protected synchronized void startApplicationMonitor(String applicationId) {
+ Thread th = null;
+ if (!AutoscalerContext.getInstance().appMonitorExist(applicationId)) {
+ th = new Thread(
+ new ApplicationMonitorAdder(applicationId));
+ }
+
+ if (th != null) {
+ th.start();
+ // try {
+ // th.join();
+ // } catch (InterruptedException ignore) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Application monitor thread has been started successfully: " +
+ "[application] %s ", applicationId));
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Application monitor thread already exists: " +
+ "[application] %s ", applicationId));
+ }
+ }
+ }
+
+ private class ApplicationMonitorAdder implements Runnable {
+ private String appId;
+
+ public ApplicationMonitorAdder(String appId) {
+ this.appId = appId;
+ }
+
+ public void run() {
+ ApplicationMonitor applicationMonitor = null;
+ int retries = 5;
+ boolean success = false;
+ do {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("application monitor is going to be started for [application] " +
+ appId);
+ }
+ applicationMonitor = ApplicationMonitorFactory.getApplicationMonitor(appId);
+
+ long end = System.currentTimeMillis();
+ log.info("Time taken to start app monitor: " + (end - start) / 1000);
+ success = true;
+ } catch (DependencyBuilderException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ } catch (TopologyInConsistentException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ } while (!success && retries != 0);
+
+ if (applicationMonitor == null) {
+ String msg = "Application monitor creation failed, even after retrying for 5 times, "
+ + "for Application: " + appId;
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Application monitor has been added successfully: " +
+ "[application] %s", applicationMonitor.getId()));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
deleted file mode 100644
index 030bc53..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.drools.runtime.StatefulKnowledgeSession;
-import org.drools.runtime.rule.FactHandle;
-
-/*
- * Every cluster monitor, which are monitoring a cluster, should extend this class.
- */
-public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
-
- private String clusterId;
- private String serviceId;
- protected ClusterStatus status;
- private int monitoringIntervalMilliseconds;
-
- protected FactHandle minCheckFactHandle;
- protected FactHandle scaleCheckFactHandle;
- private StatefulKnowledgeSession minCheckKnowledgeSession;
- private StatefulKnowledgeSession scaleCheckKnowledgeSession;
- private boolean isDestroyed;
-
- private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
- protected boolean hasFaultyMember = false;
-
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-
- protected AbstractClusterMonitor(String clusterId, String serviceId,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-
- super();
- this.clusterId = clusterId;
- this.serviceId = serviceId;
- this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
- this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
- this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
- }
-
- protected abstract void readConfigurations();
-
- public void startScheduler() {
- scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
- }
-
- protected void stopScheduler() {
- scheduler.shutdownNow();
- }
-
- protected abstract void monitor();
-
- public abstract void destroy();
-
- //handle health events
- public abstract void handleAverageLoadAverageEvent(
- AverageLoadAverageEvent averageLoadAverageEvent);
-
- public abstract void handleGradientOfLoadAverageEvent(
- GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
-
- public abstract void handleSecondDerivativeOfLoadAverageEvent(
- SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
-
- public abstract void handleAverageMemoryConsumptionEvent(
- AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
-
- public abstract void handleGradientOfMemoryConsumptionEvent(
- GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
-
- public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
- SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
-
- public abstract void handleAverageRequestsInFlightEvent(
- AverageRequestsInFlightEvent averageRequestsInFlightEvent);
-
- public abstract void handleGradientOfRequestsInFlightEvent(
- GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
-
- public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
- SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
-
- public abstract void handleMemberAverageMemoryConsumptionEvent(
- MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
-
- public abstract void handleMemberGradientOfMemoryConsumptionEvent(
- MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
-
- public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
- MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
-
-
- public abstract void handleMemberAverageLoadAverageEvent(
- MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
-
- public abstract void handleMemberGradientOfLoadAverageEvent(
- MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
-
- public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
- MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
-
- public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
-
- //handle topology events
- public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
-
- public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
-
- public abstract void handleMemberMaintenanceModeEvent(
- MemberMaintenanceModeEvent maintenanceModeEvent);
-
- public abstract void handleMemberReadyToShutdownEvent(
- MemberReadyToShutdownEvent memberReadyToShutdownEvent);
-
- public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
-
- public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
-
- public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public void setStatus(ClusterStatus status) {
- this.status = status;
- }
-
- public ClusterStatus getStatus() {
- return status;
- }
-
- public String getServiceId() {
- return serviceId;
- }
-
- public void setServiceId(String serviceId) {
- this.serviceId = serviceId;
- }
-
- public int getMonitorIntervalMilliseconds() {
- return monitoringIntervalMilliseconds;
- }
-
- public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
- this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
- }
-
- public FactHandle getMinCheckFactHandle() {
- return minCheckFactHandle;
- }
-
- public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
- this.minCheckFactHandle = minCheckFactHandle;
- }
-
- public FactHandle getScaleCheckFactHandle() {
- return scaleCheckFactHandle;
- }
-
- public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
- this.scaleCheckFactHandle = scaleCheckFactHandle;
- }
-
- public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
- return minCheckKnowledgeSession;
- }
-
- public void setMinCheckKnowledgeSession(
- StatefulKnowledgeSession minCheckKnowledgeSession) {
- this.minCheckKnowledgeSession = minCheckKnowledgeSession;
- }
-
- public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
- return scaleCheckKnowledgeSession;
- }
-
- public void setScaleCheckKnowledgeSession(
- StatefulKnowledgeSession scaleCheckKnowledgeSession) {
- this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
- }
-
- public boolean isDestroyed() {
- return isDestroyed;
- }
-
- public void setDestroyed(boolean isDestroyed) {
- this.isDestroyed = isDestroyed;
- }
-
- public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
- return autoscalerRuleEvaluator;
- }
-
- public void setAutoscalerRuleEvaluator(
- AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
- this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
- }
-
-
- @Override
- public void onParentEvent(MonitorStatusEvent statusEvent) {
- // send the ClusterTerminating event
- if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
- ApplicationStatus.Terminating) {
- StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
- }
- }
-
- @Override
- public void onChildEvent(MonitorStatusEvent statusEvent) {
-
- }
-
- @Override
- public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
-
- }
-
- @Override
- public void onEvent(MonitorScalingEvent scalingEvent) {
-
- }
-
- public void setHasFaultyMember(boolean hasFaultyMember) {
- this.hasFaultyMember = hasFaultyMember;
- }
-
- public boolean isHasFaultyMember() {
- return hasFaultyMember;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
deleted file mode 100644
index 9cf3709..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
-import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
-import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
-import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
-import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.status.checker.StatusChecker;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
-
-import java.util.Map;
-
-/**
- * Factory class to get the Monitors.
- */
-public class ApplicationMonitorFactory {
- private static final Log log = LogFactory.getLog(ApplicationMonitorFactory.class);
-
- /**
- * Factor method used to create relevant monitors based on the given context
- *
- * @param context Application/Group/Cluster context
- * @param appId appId of the application which requires to create app monitor
- * @param parentMonitor parent of the monitor
- * @return Monitor which can be ApplicationMonitor/GroupMonitor/ClusterMonitor
- * @throws TopologyInConsistentException throws while traversing thr topology
- * @throws DependencyBuilderException throws while building dependency for app monitor
- * @throws PolicyValidationException throws while validating the policy associated with cluster
- * @throws PartitionValidationException throws while validating the partition used in a cluster
- */
- public static Monitor getMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
- throws TopologyInConsistentException,
- DependencyBuilderException, PolicyValidationException, PartitionValidationException {
- Monitor monitor;
-
- if (context instanceof GroupContext) {
- monitor = getGroupMonitor(parentMonitor, context, appId);
- } else if (context instanceof ClusterContext) {
- monitor = getClusterMonitor(parentMonitor, (ClusterContext) context, appId);
- //Start the thread
- Thread th = new Thread((AbstractClusterMonitor) monitor);
- th.start();
- } else {
- monitor = getApplicationMonitor(appId);
- }
- return monitor;
- }
-
- /**
- * This will create the GroupMonitor based on given groupId by going thr Topology
- *
- * @param parentMonitor parent of the monitor
- * @param context groupId of the group
- * @param appId appId of the relevant application
- * @return Group monitor
- * @throws DependencyBuilderException throws while building dependency for app monitor
- * @throws TopologyInConsistentException throws while traversing thr topology
- */
- public static Monitor getGroupMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
- throws DependencyBuilderException,
- TopologyInConsistentException {
- GroupMonitor groupMonitor;
- TopologyManager.acquireReadLockForApplication(appId);
-
- try {
- Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId());
- groupMonitor = new GroupMonitor(group, appId);
- groupMonitor.setAppId(appId);
- if(parentMonitor != null) {
- groupMonitor.setParent(parentMonitor);
- //Setting the dependent behaviour of the monitor
- if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
- groupMonitor.setHasDependent(true);
- } else {
- groupMonitor.setHasDependent(false);
- }
- //TODO make sure when it is async
-
- if (group.getStatus() != groupMonitor.getStatus()) {
- //updating the status, if the group is not in created state when creating group Monitor
- //so that groupMonitor will notify the parent (useful when restarting stratos)
- groupMonitor.setStatus(group.getStatus());
- }
- }
-
- } finally {
- TopologyManager.releaseReadLockForApplication(appId);
-
- }
- return groupMonitor;
-
- }
-
- /**
- * This will create a new app monitor based on the give appId by getting the
- * application from Topology
- *
- * @param appId appId of the application which requires to create app monitor
- * @return ApplicationMonitor
- * @throws DependencyBuilderException throws while building dependency for app monitor
- * @throws TopologyInConsistentException throws while traversing thr topology
- */
- public static ApplicationMonitor getApplicationMonitor(String appId)
- throws DependencyBuilderException,
- TopologyInConsistentException {
- ApplicationMonitor applicationMonitor;
- TopologyManager.acquireReadLockForApplication(appId);
- try {
- Application application = TopologyManager.getTopology().getApplication(appId);
- if (application != null) {
- applicationMonitor = new ApplicationMonitor(application);
- applicationMonitor.setHasDependent(false);
-
- } else {
- String msg = "[Application] " + appId + " cannot be found in the Topology";
- throw new TopologyInConsistentException(msg);
- }
- } finally {
- TopologyManager.releaseReadLockForApplication(appId);
- }
-
- return applicationMonitor;
-
- }
-
- /**
- * Updates ClusterContext for given cluster
- *
- * @param parentMonitor parent of the monitor
- * @param context
- * @return ClusterMonitor - Updated ClusterContext
- * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
- * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
- */
- public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
- ClusterContext context, String appId)
- throws PolicyValidationException,
- PartitionValidationException,
- TopologyInConsistentException {
- //Retrieving the Cluster from Topology
- String clusterId = context.getId();
- String serviceName = context.getServiceName();
-
- Cluster cluster;
- AbstractClusterMonitor clusterMonitor;
- //acquire read lock for the service and cluster
- TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
- try {
- Topology topology = TopologyManager.getTopology();
- if (topology.serviceExists(serviceName)) {
- Service service = topology.getService(serviceName);
- if (service.clusterExists(clusterId)) {
- cluster = service.getCluster(clusterId);
- if (log.isDebugEnabled()) {
- log.debug("Dependency check starting the [cluster]" + clusterId);
- }
- // startClusterMonitor(this, cluster);
- //context.setCurrentStatus(Status.Created);
- } else {
- String msg = "[Cluster] " + clusterId + " cannot be found in the " +
- "Topology for [service] " + serviceName;
- throw new TopologyInConsistentException(msg);
- }
- } else {
- String msg = "[Service] " + serviceName + " cannot be found in the Topology";
- throw new TopologyInConsistentException(msg);
-
- }
-
-
- clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
- if (clusterMonitor instanceof VMClusterMonitor) {
- return (VMClusterMonitor) clusterMonitor;
- } else if (clusterMonitor != null) {
- log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
- }
- return null;
- } finally {
- TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
- }
- }
-
-
- private static Properties convertMemberPropsToMemberContextProps(
- java.util.Properties properties) {
- Properties props = new Properties();
- for (Map.Entry<Object, Object> e : properties.entrySet()) {
- Property prop = new Property();
- prop.setName((String) e.getKey());
- prop.setValue((String) e.getValue());
- props.addProperties(prop);
- }
- return props;
- }
-}
[3/5] Adding autoscaler topology event listeners introduced by
service grouping
Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
deleted file mode 100644
index 38ed1a6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
+++ /dev/null
@@ -1,639 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-abstract public class VMClusterMonitor extends AbstractClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
- // Map<NetworkpartitionId, Network Partition Context>
- protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
- protected DeploymentPolicy deploymentPolicy;
- protected AutoscalePolicy autoscalePolicy;
-
- protected VMClusterMonitor(String clusterId, String serviceId,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator,
- DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
- Map<String, NetworkPartitionContext> networkPartitionCtxts) {
- super(clusterId, serviceId, autoscalerRuleEvaluator);
- this.deploymentPolicy = deploymentPolicy;
- this.autoscalePolicy = autoscalePolicy;
- this.networkPartitionCtxts = networkPartitionCtxts;
- }
-
- @Override
- public void handleAverageLoadAverageEvent(
- AverageLoadAverageEvent averageLoadAverageEvent) {
-
- String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
- String clusterId = averageLoadAverageEvent.getClusterId();
- float value = averageLoadAverageEvent.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
-
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setAverageLoadAverage(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
-
- }
-
- @Override
- public void handleGradientOfLoadAverageEvent(
- GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
- String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
- String clusterId = gradientOfLoadAverageEvent.getClusterId();
- float value = gradientOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setLoadAverageGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfLoadAverageEvent(
- SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
- String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
- float value = secondDerivativeOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setLoadAverageSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleAverageMemoryConsumptionEvent(
- AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
- String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = averageMemoryConsumptionEvent.getClusterId();
- float value = averageMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
- + "[value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setAverageMemoryConsumption(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Network partition context is not available for :"
- + " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleGradientOfMemoryConsumptionEvent(
- GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
- String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
- float value = gradientOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setMemoryConsumptionGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfMemoryConsumptionEvent(
- SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
- String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
- float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleAverageRequestsInFlightEvent(
- AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
- String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = averageRequestsInFlightEvent.getClusterId();
- float value = averageRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setAverageRequestsInFlight(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleGradientOfRequestsInFlightEvent(
- GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
- String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
- float value = gradientOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setRequestsInFlightGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfRequestsInFlightEvent(
- SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
- String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
- float value = secondDerivativeOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second derivative of Rif event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setRequestsInFlightSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleMemberAverageMemoryConsumptionEvent(
- MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
- String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageMemoryConsumptionEvent.getValue();
- memberStatsContext.setAverageMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberGradientOfMemoryConsumptionEvent(
- MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
- String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfMemoryConsumptionEvent.getValue();
- memberStatsContext.setGradientOfMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
- MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
- }
-
- @Override
- public void handleMemberAverageLoadAverageEvent(
- MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
- String memberId = memberAverageLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageLoadAverageEvent.getValue();
- memberStatsContext.setAverageLoadAverage(value);
- }
-
- @Override
- public void handleMemberGradientOfLoadAverageEvent(
- MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
- String memberId = memberGradientOfLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfLoadAverageEvent.getValue();
- memberStatsContext.setGradientOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfLoadAverageEvent(
- MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
- String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
- memberStatsContext.setSecondDerivativeOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-
- String memberId = memberFaultEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- if (null == member) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
- }
- return;
- }
- if (!member.isActive()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the member fault health stat", memberId));
- }
- return;
- }
-
- NetworkPartitionContext nwPartitionCtxt;
- nwPartitionCtxt = getNetworkPartitionCtxt(member);
- String partitionId = getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
- if (!partitionCtxt.activeMemberExist(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Could not find the active member in partition context, "
- + "[member] %s ", memberId));
- }
- return;
- }
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- try {
- ccClient.terminate(memberId);
- } catch (TerminationException e) {
- String msg = "TerminationException " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
- if (log.isInfoEnabled()) {
- String clusterId = memberFaultEvent.getClusterId();
- log.info(String.format("Faulty member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- }
-
- @Override
- public void handleMemberStartedEvent(
- MemberStartedEvent memberStartedEvent) {
-
- }
-
- @Override
- public void handleMemberActivatedEvent(
- MemberActivatedEvent memberActivatedEvent) {
-
- String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
- String partitionId = memberActivatedEvent.getPartitionId();
- String memberId = memberActivatedEvent.getMemberId();
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext;
- partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- partitionContext.movePendingMemberToActiveMembers(memberId);
- }
-
- @Override
- public void handleMemberMaintenanceModeEvent(
- MemberMaintenanceModeEvent maintenanceModeEvent) {
-
- String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
- String partitionId = maintenanceModeEvent.getPartitionId();
- String memberId = maintenanceModeEvent.getMemberId();
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member has been moved as pending termination: "
- + "[member] %s", memberId));
- }
- partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
- }
-
- @Override
- public void handleMemberReadyToShutdownEvent(
- MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
- NetworkPartitionContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-
- // start a new member in the same Partition
- String memberId = memberReadyToShutdownEvent.getMemberId();
- String partitionId = getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
- // terminate the shutdown ready member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- try {
- ccClient.terminate(memberId);
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- log.info(String.format("Member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- } catch (TerminationException e) {
- String msg = "TerminationException" + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
-
- @Override
- public void handleMemberTerminatedEvent(
- MemberTerminatedEvent memberTerminatedEvent) {
-
- String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
- String memberId = memberTerminatedEvent.getMemberId();
- String partitionId = memberTerminatedEvent.getPartitionId();
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
- partitionContext.removeMemberStatsContext(memberId);
-
- if (partitionContext.removeTerminationPendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from termination pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removePendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removeActiveMemberById(memberId)) {
- log.warn(String.format("Member is in the wrong list and it is removed from "
- + "active members list", memberId));
- } else if (partitionContext.removeObsoleteMember(memberId)) {
- log.warn(String.format("Member's obsolated timeout has been expired and "
- + "it is removed from obsolated members list", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, "
- + "pending and termination pending", memberId));
- }
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been removed successfully: "
- + "[member] %s", memberId));
- }
- }
-
- @Override
- public void handleClusterRemovedEvent(
- ClusterRemovedEvent clusterRemovedEvent) {
-
- }
-
- private String getNetworkPartitionIdByMemberId(String memberId) {
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId).getNetworkPartitionId();
- }
- }
- }
- return null;
- }
-
- private Member getMemberByMemberId(String memberId) {
- try {
- TopologyManager.acquireReadLock();
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId);
- }
- }
- }
- return null;
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
- log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
- String networkPartitionId = member.getNetworkPartitionId();
- if (networkPartitionCtxts.containsKey(networkPartitionId)) {
- log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
- return networkPartitionCtxts.get(networkPartitionId);
- }
- log.info("returning null getNetworkPartitionCtxt");
- return null;
- }
-
- public String getPartitionOfMember(String memberId) {
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId).getPartitionId();
- }
- }
- }
- return null;
- }
-
- public DeploymentPolicy getDeploymentPolicy() {
- return deploymentPolicy;
- }
-
- public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
- this.deploymentPolicy = deploymentPolicy;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return autoscalePolicy;
- }
-
- public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
- this.autoscalePolicy = autoscalePolicy;
- }
-
- public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
- return networkPartitionCtxts;
- }
-
- public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
- return networkPartitionCtxts.get(networkPartitionId);
- }
-
- public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) {
- this.networkPartitionCtxts = partitionCtxt;
- }
-
- public boolean partitionCtxtAvailable(String partitionId) {
- return networkPartitionCtxts.containsKey(partitionId);
- }
-
- public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
- this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
- }
-
- public NetworkPartitionContext getPartitionCtxt(String id) {
- return this.networkPartitionCtxts.get(id);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
deleted file mode 100644
index 3e6cddc..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMLbClusterMonitor extends VMClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
-
- public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- super(clusterId, serviceId,
- new AutoscalerRuleEvaluator(
- StratosConstants.VM_MIN_CHECK_DROOL_FILE,
- StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
- deploymentPolicy, autoscalePolicy,
- new ConcurrentHashMap<String, NetworkPartitionContext>());
- readConfigurations();
- }
-
- @Override
- public void run() {
-
- while (!isDestroyed()) {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.Inactive.equals(status)) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("LB Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
- }
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- protected void monitor() {
- // TODO make this concurrent
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-
- // minimum check per partition
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
- .values()) {
-
- if (partitionContext != null) {
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for partition %s ",
- partitionContext.getPartitionId()));
- }
-
- minCheckFactHandle =
- AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
- minCheckFactHandle,
- partitionContext);
- // start only in the first partition context
- break;
- }
-
- }
-
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getMinCheckKnowledgeSession().dispose();
- setDestroyed(true);
- stopScheduler();
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
- }
- }
-
- @Override
- protected void readConfigurations() {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.VMLb_Cluster_MONITOR_INTERVAL, 90000);
- setMonitorIntervalMilliseconds(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
- }
- }
-
- @Override
- public void handleClusterRemovedEvent(
- ClusterRemovedEvent clusterRemovedEvent) {
-
- String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
- String clusterId = clusterRemovedEvent.getClusterId();
- DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
- if (depPolicy != null) {
- List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
- .getNetworkPartitionLbHolders(depPolicy);
-
- for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
- // removes lb cluster ids
- boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
- if (isRemoved) {
- log.info("Removed the lb cluster [id]:"
- + clusterId
- + " reference from Network Partition [id]: "
- + networkPartitionLbHolder
- .getNetworkPartitionId());
-
- }
- if (log.isDebugEnabled()) {
- log.debug(networkPartitionLbHolder);
- }
-
- }
- }
- }
-
- @Override
- public String toString() {
- return "VMLbClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + "]";
- }
-
- @Override
- public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
- // TODO
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
deleted file mode 100644
index 9a26b42..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMServiceClusterMonitor extends VMClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMServiceClusterMonitor.class);
- private String lbReferenceType;
- private boolean hasPrimary;
-
- public VMServiceClusterMonitor(String clusterId, String serviceId,
- DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- super(clusterId, serviceId,
- new AutoscalerRuleEvaluator(StratosConstants.VM_MIN_CHECK_DROOL_FILE,
- StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
- deploymentPolicy, autoscalePolicy,
- new ConcurrentHashMap<String, NetworkPartitionContext>());
- readConfigurations();
- }
-
- @Override
- public void run() {
- while (!isDestroyed()) {
- try {
- if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
- (this.status == ClusterStatus.Inactive && !hasDependent) ||
- !this.hasFaultyMember) {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed." + this.toString(), e);
- }
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- protected void monitor() {
-
- //TODO make this concurrent
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
- // store primary members in the network partition context
- List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
-
- //minimum check per partition
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
- // store primary members in the partition context
- List<String> primaryMemberListInPartition = new ArrayList<String>();
- // get active primary members in this partition context
- for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInPartition.add(memberContext.getMemberId());
- }
- }
- // get pending primary members in this partition context
- for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInPartition.add(memberContext.getMemberId());
- }
- }
- primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
- getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
- getMinCheckKnowledgeSession().setGlobal("primaryMemberCount", primaryMemberListInPartition.size());
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
- }
-
- minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession()
- , minCheckFactHandle, partitionContext);
-
- }
-
- boolean rifReset = networkPartitionContext.isRifReset();
- boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
- boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
- if (log.isDebugEnabled()) {
- log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
- + " flag of loadAverageReset" + loadAverageReset);
- }
- if (rifReset || memoryConsumptionReset || loadAverageReset) {
- getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
- getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
- getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
- getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
- getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
- getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
- getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
- getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
- log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
- }
-
- scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
- , scaleCheckFactHandle, networkPartitionContext);
-
- networkPartitionContext.setRifReset(false);
- networkPartitionContext.setMemoryConsumptionReset(false);
- networkPartitionContext.setLoadAverageReset(false);
- } else if (log.isDebugEnabled()) {
- log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
- "cycle for network partition %s", networkPartitionContext.getId()));
- }
- }
- }
-
- private boolean isPrimaryMember(MemberContext memberContext) {
- Properties props = memberContext.getProperties();
- if (log.isDebugEnabled()) {
- log.debug(" Properties [" + props + "] ");
- }
- if (props != null && props.getProperties() != null) {
- for (Property prop : props.getProperties()) {
- if (prop.getName().equals("PRIMARY")) {
- if (Boolean.parseBoolean(prop.getValue())) {
- log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
- "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- protected void readConfigurations() {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
- setMonitorIntervalMilliseconds(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getScaleCheckKnowledgeSession().dispose();
- setDestroyed(true);
- stopScheduler();
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
- }
- }
-
- @Override
- public String toString() {
- return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
- ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
- ", lbReferenceType=" + lbReferenceType +
- ", hasPrimary=" + hasPrimary + " ]";
- }
-
- public String getLbReferenceType() {
- return lbReferenceType;
- }
-
- public void setLbReferenceType(String lbReferenceType) {
- this.lbReferenceType = lbReferenceType;
- }
-
- public boolean isHasPrimary() {
- return hasPrimary;
- }
-
- public void setHasPrimary(boolean hasPrimary) {
- this.hasPrimary = hasPrimary;
- }
-
- @Override
- public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
- // TODO
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
index 2bf85a6..cf92a18 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.Monitor;
import org.apache.stratos.autoscaler.monitor.MonitorStatusEventBuilder;
import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor;
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
new file mode 100644
index 0000000..bc23dd4
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.application;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
+import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
+import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
+import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
+import org.apache.stratos.autoscaler.monitor.Monitor;
+import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.Map;
+
+/**
+ * Factory class to get the Monitors.
+ */
+public class ApplicationMonitorFactory {
+ private static final Log log = LogFactory.getLog(ApplicationMonitorFactory.class);
+
+ /**
+ * Factor method used to create relevant monitors based on the given context
+ *
+ * @param context Application/Group/Cluster context
+ * @param appId appId of the application which requires to create app monitor
+ * @param parentMonitor parent of the monitor
+ * @return Monitor which can be ApplicationMonitor/GroupMonitor/ClusterMonitor
+ * @throws TopologyInConsistentException throws while traversing thr topology
+ * @throws DependencyBuilderException throws while building dependency for app monitor
+ * @throws PolicyValidationException throws while validating the policy associated with cluster
+ * @throws PartitionValidationException throws while validating the partition used in a cluster
+ */
+ public static Monitor getMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
+ throws TopologyInConsistentException,
+ DependencyBuilderException, PolicyValidationException, PartitionValidationException {
+ Monitor monitor;
+
+ if (context instanceof GroupContext) {
+ monitor = getGroupMonitor(parentMonitor, context, appId);
+ } else if (context instanceof ClusterContext) {
+ monitor = getClusterMonitor(parentMonitor, (ClusterContext) context, appId);
+ //Start the thread
+ Thread th = new Thread((AbstractClusterMonitor) monitor);
+ th.start();
+ } else {
+ monitor = getApplicationMonitor(appId);
+ }
+ return monitor;
+ }
+
+ /**
+ * This will create the GroupMonitor based on given groupId by going thr Topology
+ *
+ * @param parentMonitor parent of the monitor
+ * @param context groupId of the group
+ * @param appId appId of the relevant application
+ * @return Group monitor
+ * @throws DependencyBuilderException throws while building dependency for app monitor
+ * @throws TopologyInConsistentException throws while traversing thr topology
+ */
+ public static Monitor getGroupMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
+ throws DependencyBuilderException,
+ TopologyInConsistentException {
+ GroupMonitor groupMonitor;
+ TopologyManager.acquireReadLockForApplication(appId);
+
+ try {
+ Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId());
+ groupMonitor = new GroupMonitor(group, appId);
+ groupMonitor.setAppId(appId);
+ if(parentMonitor != null) {
+ groupMonitor.setParent(parentMonitor);
+ //Setting the dependent behaviour of the monitor
+ if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
+ groupMonitor.setHasDependent(true);
+ } else {
+ groupMonitor.setHasDependent(false);
+ }
+ //TODO make sure when it is async
+
+ if (group.getStatus() != groupMonitor.getStatus()) {
+ //updating the status, if the group is not in created state when creating group Monitor
+ //so that groupMonitor will notify the parent (useful when restarting stratos)
+ groupMonitor.setStatus(group.getStatus());
+ }
+ }
+
+ } finally {
+ TopologyManager.releaseReadLockForApplication(appId);
+
+ }
+ return groupMonitor;
+
+ }
+
+ /**
+ * This will create a new app monitor based on the give appId by getting the
+ * application from Topology
+ *
+ * @param appId appId of the application which requires to create app monitor
+ * @return ApplicationMonitor
+ * @throws DependencyBuilderException throws while building dependency for app monitor
+ * @throws TopologyInConsistentException throws while traversing thr topology
+ */
+ public static ApplicationMonitor getApplicationMonitor(String appId)
+ throws DependencyBuilderException,
+ TopologyInConsistentException {
+ ApplicationMonitor applicationMonitor;
+ TopologyManager.acquireReadLockForApplication(appId);
+ try {
+ Application application = TopologyManager.getTopology().getApplication(appId);
+ if (application != null) {
+ applicationMonitor = new ApplicationMonitor(application);
+ applicationMonitor.setHasDependent(false);
+
+ } else {
+ String msg = "[Application] " + appId + " cannot be found in the Topology";
+ throw new TopologyInConsistentException(msg);
+ }
+ } finally {
+ TopologyManager.releaseReadLockForApplication(appId);
+ }
+
+ return applicationMonitor;
+
+ }
+
+ /**
+ * Updates ClusterContext for given cluster
+ *
+ * @param parentMonitor parent of the monitor
+ * @param context
+ * @return ClusterMonitor - Updated ClusterContext
+ * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
+ * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
+ */
+ public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
+ ClusterContext context, String appId)
+ throws PolicyValidationException,
+ PartitionValidationException,
+ TopologyInConsistentException {
+ //Retrieving the Cluster from Topology
+ String clusterId = context.getId();
+ String serviceName = context.getServiceName();
+
+ Cluster cluster;
+ AbstractClusterMonitor clusterMonitor;
+ //acquire read lock for the service and cluster
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ try {
+ Topology topology = TopologyManager.getTopology();
+ if (topology.serviceExists(serviceName)) {
+ Service service = topology.getService(serviceName);
+ if (service.clusterExists(clusterId)) {
+ cluster = service.getCluster(clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug("Dependency check starting the [cluster]" + clusterId);
+ }
+ // startClusterMonitor(this, cluster);
+ //context.setCurrentStatus(Status.Created);
+ } else {
+ String msg = "[Cluster] " + clusterId + " cannot be found in the " +
+ "Topology for [service] " + serviceName;
+ throw new TopologyInConsistentException(msg);
+ }
+ } else {
+ String msg = "[Service] " + serviceName + " cannot be found in the Topology";
+ throw new TopologyInConsistentException(msg);
+
+ }
+
+
+ clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
+ if (clusterMonitor instanceof VMClusterMonitor) {
+ return (VMClusterMonitor) clusterMonitor;
+ } else if (clusterMonitor != null) {
+ log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+ }
+ }
+
+
+ private static Properties convertMemberPropsToMemberContextProps(
+ java.util.Properties properties) {
+ Properties props = new Properties();
+ for (Map.Entry<Object, Object> e : properties.entrySet()) {
+ Property prop = new Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ props.addProperties(prop);
+ }
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
new file mode 100644
index 0000000..f043f51
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.Monitor;
+import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
+/*
+ * Every cluster monitor, which are monitoring a cluster, should extend this class.
+ */
+public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
+
+ private String clusterId;
+ private String serviceId;
+ protected ClusterStatus status;
+ private int monitoringIntervalMilliseconds;
+
+ protected FactHandle minCheckFactHandle;
+ protected FactHandle scaleCheckFactHandle;
+ private StatefulKnowledgeSession minCheckKnowledgeSession;
+ private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+ private boolean isDestroyed;
+
+ private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+ protected boolean hasFaultyMember = false;
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ protected AbstractClusterMonitor(String clusterId, String serviceId,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+ super();
+ this.clusterId = clusterId;
+ this.serviceId = serviceId;
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
+ this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
+ }
+
+ protected abstract void readConfigurations();
+
+ public void startScheduler() {
+ scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ protected void stopScheduler() {
+ scheduler.shutdownNow();
+ }
+
+ protected abstract void monitor();
+
+ public abstract void destroy();
+
+ //handle health events
+ public abstract void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent);
+
+ public abstract void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+ public abstract void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+ public abstract void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+ public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+ public abstract void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+ public abstract void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+ public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+ public abstract void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+ public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+ public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+ public abstract void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+ public abstract void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+ public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+ //handle topology events
+ public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+ public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+ public abstract void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent);
+
+ public abstract void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+ public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+ public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+ public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public void setStatus(ClusterStatus status) {
+ this.status = status;
+ }
+
+ public ClusterStatus getStatus() {
+ return status;
+ }
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public void setServiceId(String serviceId) {
+ this.serviceId = serviceId;
+ }
+
+ public int getMonitorIntervalMilliseconds() {
+ return monitoringIntervalMilliseconds;
+ }
+
+ public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+ this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+ }
+
+ public FactHandle getMinCheckFactHandle() {
+ return minCheckFactHandle;
+ }
+
+ public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+ this.minCheckFactHandle = minCheckFactHandle;
+ }
+
+ public FactHandle getScaleCheckFactHandle() {
+ return scaleCheckFactHandle;
+ }
+
+ public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+ this.scaleCheckFactHandle = scaleCheckFactHandle;
+ }
+
+ public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+ return minCheckKnowledgeSession;
+ }
+
+ public void setMinCheckKnowledgeSession(
+ StatefulKnowledgeSession minCheckKnowledgeSession) {
+ this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+ }
+
+ public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+ return scaleCheckKnowledgeSession;
+ }
+
+ public void setScaleCheckKnowledgeSession(
+ StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+ this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+ }
+
+ public boolean isDestroyed() {
+ return isDestroyed;
+ }
+
+ public void setDestroyed(boolean isDestroyed) {
+ this.isDestroyed = isDestroyed;
+ }
+
+ public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+ return autoscalerRuleEvaluator;
+ }
+
+ public void setAutoscalerRuleEvaluator(
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ }
+
+
+ @Override
+ public void onParentEvent(MonitorStatusEvent statusEvent) {
+ // send the ClusterTerminating event
+ if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+ ApplicationStatus.Terminating) {
+ StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
+ }
+ }
+
+ @Override
+ public void onChildEvent(MonitorStatusEvent statusEvent) {
+
+ }
+
+ @Override
+ public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
+
+ }
+
+ @Override
+ public void onEvent(MonitorScalingEvent scalingEvent) {
+
+ }
+
+ public void setHasFaultyMember(boolean hasFaultyMember) {
+ this.hasFaultyMember = hasFaultyMember;
+ }
+
+ public boolean isHasFaultyMember() {
+ return hasFaultyMember;
+ }
+
+ public abstract void terminateAllMembers();
+}
[2/5] Adding autoscaler topology event listeners introduced by
service grouping
Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
new file mode 100644
index 0000000..e286187
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.partition.PartitionGroup;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.util.Constants;
+
+/*
+ * Factory class for creating cluster monitors.
+ */
+public class ClusterMonitorFactory {
+
+ private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
+
+ /**
+ * @param cluster the cluster to be monitored
+ * @return the created cluster monitor
+ * @throws PolicyValidationException when deployment policy is not valid
+ * @throws PartitionValidationException when partition is not valid
+ */
+ public static AbstractClusterMonitor getMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+
+ AbstractClusterMonitor clusterMonitor;
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor = getDockerServiceClusterMonitor(cluster);
+ } else if (cluster.isLbCluster()) {
+ clusterMonitor = getVMLbClusterMonitor(cluster);
+ } else {
+ clusterMonitor = getVMServiceClusterMonitor(cluster);
+ }
+
+ return clusterMonitor;
+ }
+
+ private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+ // FIXME fix the following code to correctly update
+ // AutoscalerContext context = AutoscalerContext.getInstance();
+ if (null == cluster) {
+ return null;
+ }
+
+ String autoscalePolicyName = cluster.getAutoscalePolicyName();
+ String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Deployment policy name: " + deploymentPolicyName);
+ log.debug("Autoscaler policy name: " + autoscalePolicyName);
+ }
+
+ AutoscalePolicy policy =
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
+ DeploymentPolicy deploymentPolicy =
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
+
+ if (deploymentPolicy == null) {
+ String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ Partition[] allPartitions = deploymentPolicy.getAllPartitions();
+ if (allPartitions == null) {
+ String msg =
+ "Deployment Policy's Partitions are null. Policy name: " +
+ deploymentPolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
+
+ VMServiceClusterMonitor clusterMonitor =
+ new VMServiceClusterMonitor(cluster.getClusterId(),
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
+ clusterMonitor.setStatus(ClusterStatus.Created);
+
+ for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+ NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
+
+ for (Partition partition : partitionGroup.getPartitions()) {
+ PartitionContext partitionContext = new PartitionContext(partition);
+ partitionContext.setServiceName(cluster.getServiceName());
+ partitionContext.setProperties(cluster.getProperties());
+ partitionContext.setNetworkPartitionId(partitionGroup.getId());
+
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
+ MemberContext memberContext = new MemberContext();
+ memberContext.setClusterId(member.getClusterId());
+ memberContext.setMemberId(memberId);
+ memberContext.setInitTime(member.getInitTime());
+ memberContext.setPartition(partition);
+ memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addActiveMember(memberContext);
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+// partitionContext.incrementCurrentActiveMemberCount(1);
+
+ } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addPendingMember(memberContext);
+
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+ } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+// partitionContext.addFaultyMember(memberId);
+ }
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member] %s", memberId));
+ }
+ }
+
+ }
+ networkPartitionContext.addPartitionContext(partitionContext);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Partition context has been added: [partition] %s",
+ partitionContext.getPartitionId()));
+ }
+ }
+
+ clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Network partition context has been added: [network partition] %s",
+ networkPartitionContext.getId()));
+ }
+ }
+
+
+ // find lb reference type
+ java.util.Properties props = cluster.getProperties();
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+ clusterMonitor.setLbReferenceType(value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
+ }
+ }
+
+ // set hasPrimary property
+ // hasPrimary is true if there are primary members available in that cluster
+ clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
+
+ log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
+ return clusterMonitor;
+ }
+
+ private static Properties convertMemberPropsToMemberContextProps(
+ java.util.Properties properties) {
+ Properties props = new Properties();
+ for (Map.Entry<Object, Object> e : properties.entrySet()) {
+ Property prop = new Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ props.addProperties(prop);
+ }
+ return props;
+ }
+
+
+ private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
+ throws PolicyValidationException, PartitionValidationException {
+ // FIXME fix the following code to correctly update
+ // AutoscalerContext context = AutoscalerContext.getInstance();
+ if (null == cluster) {
+ return null;
+ }
+
+ String autoscalePolicyName = cluster.getAutoscalePolicyName();
+ String deploymentPolicyName = cluster.getDeploymentPolicyName();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Deployment policy name: " + deploymentPolicyName);
+ log.debug("Autoscaler policy name: " + autoscalePolicyName);
+ }
+
+ AutoscalePolicy policy =
+ PolicyManager.getInstance()
+ .getAutoscalePolicy(autoscalePolicyName);
+ DeploymentPolicy deploymentPolicy =
+ PolicyManager.getInstance()
+ .getDeploymentPolicy(deploymentPolicyName);
+
+ if (deploymentPolicy == null) {
+ String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ String clusterId = cluster.getClusterId();
+ VMLbClusterMonitor clusterMonitor =
+ new VMLbClusterMonitor(clusterId,
+ cluster.getServiceName(),
+ deploymentPolicy, policy);
+ clusterMonitor.setStatus(ClusterStatus.Created);
+ // partition group = network partition context
+ for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
+
+ NetworkPartitionLbHolder networkPartitionLbHolder =
+ PartitionManager.getInstance()
+ .getNetworkPartitionLbHolder(partitionGroup.getId());
+// PartitionManager.getInstance()
+// .getNetworkPartitionLbHolder(partitionGroup.getId());
+ // FIXME pick a random partition
+ Partition partition =
+ partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
+ PartitionContext partitionContext = new PartitionContext(partition);
+ partitionContext.setServiceName(cluster.getServiceName());
+ partitionContext.setProperties(cluster.getProperties());
+ partitionContext.setNetworkPartitionId(partitionGroup.getId());
+ partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
+
+ NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
+ partitionGroup.getPartitionAlgo(),
+ partitionGroup.getPartitions());
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
+ MemberContext memberContext = new MemberContext();
+ memberContext.setClusterId(member.getClusterId());
+ memberContext.setMemberId(memberId);
+ memberContext.setPartition(partition);
+ memberContext.setInitTime(member.getInitTime());
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addActiveMember(memberContext);
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+// partitionContext.incrementCurrentActiveMemberCount(1);
+ } else if (MemberStatus.Created.equals(member.getStatus()) ||
+ MemberStatus.Starting.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+ log.debug(msg);
+ }
+ partitionContext.addPendingMember(memberContext);
+// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
+ } else if (MemberStatus.Suspended.equals(member.getStatus())) {
+// partitionContext.addFaultyMember(memberId);
+ }
+
+ partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member] %s", memberId));
+ }
+ }
+
+ }
+ networkPartitionContext.addPartitionContext(partitionContext);
+
+ // populate lb cluster id in network partition context.
+ java.util.Properties props = cluster.getProperties();
+
+ // get service type of load balanced cluster
+ String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
+
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+
+ if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
+ networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
+
+ } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
+ String serviceName = cluster.getServiceName();
+ // TODO: check if this is correct
+ networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
+
+ if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
+ networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
+ }
+ }
+ }
+ }
+
+ clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
+ }
+
+ log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
+ return clusterMonitor;
+ }
+
+ /**
+ * @param cluster - the cluster which needs to be monitored
+ * @return - the cluster monitor
+ */
+ private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
+ throws PolicyValidationException {
+
+ if (null == cluster) {
+ return null;
+ }
+
+ String autoscalePolicyName = cluster.getAutoscalePolicyName();
+ if (log.isDebugEnabled()) {
+ log.debug("Autoscaler policy name: " + autoscalePolicyName);
+ }
+
+ AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
+
+ if (policy == null) {
+ String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
+ log.error(msg);
+ throw new PolicyValidationException(msg);
+ }
+
+ java.util.Properties props = cluster.getProperties();
+ String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
+ KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
+ cluster.getClusterId());
+
+ String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
+ if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
+ int minReplicas = Integer.parseInt(minReplicasProperty);
+ kubernetesClusterCtxt.setMinReplicas(minReplicas);
+ }
+
+ String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
+ if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
+ int maxReplicas = Integer.parseInt(maxReplicasProperty);
+ kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
+ }
+
+ KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
+ kubernetesClusterCtxt,
+ cluster.getClusterId(),
+ cluster.getServiceName(),
+ policy.getId());
+
+ dockerClusterMonitor.setStatus(ClusterStatus.Created);
+
+ //populate the members after restarting
+ for (Member member : cluster.getMembers()) {
+ String memberId = member.getMemberId();
+ String clusterId = member.getClusterId();
+ MemberContext memberContext = new MemberContext();
+ memberContext.setMemberId(memberId);
+ memberContext.setClusterId(clusterId);
+ memberContext.setInitTime(member.getInitTime());
+
+ // if there is at least one member in the topology, that means service has been created already
+ // this is to avoid calling startContainer() method again
+ kubernetesClusterCtxt.setServiceClusterCreated(true);
+
+ if (MemberStatus.Activated.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
+ log.debug(msg);
+ }
+ dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
+ } else if (MemberStatus.Created.equals(member.getStatus())
+ || MemberStatus.Starting.equals(member.getStatus())) {
+ if (log.isDebugEnabled()) {
+ String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
+ log.debug(msg);
+ }
+ dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
+ }
+
+ kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added: [member] %s", memberId));
+ }
+ }
+
+ // find lb reference type
+ if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
+ String value = props.getProperty(Constants.LOAD_BALANCER_REF);
+ dockerClusterMonitor.setLbReferenceType(value);
+ if (log.isDebugEnabled()) {
+ log.debug("Set the lb reference type: " + value);
+ }
+ }
+
+ log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
+ return dockerClusterMonitor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
new file mode 100644
index 0000000..39fbd46
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/*
+ * Every kubernetes cluster monitor should extend this class
+ */
+public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
+
+ private KubernetesClusterContext kubernetesClusterCtxt;
+ protected String autoscalePolicyId;
+
+ protected KubernetesClusterMonitor(String clusterId, String serviceId,
+ KubernetesClusterContext kubernetesClusterContext,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator,
+ String autoscalePolicyId) {
+
+ super(clusterId, serviceId, autoscalerRuleEvaluator);
+ this.kubernetesClusterCtxt = kubernetesClusterContext;
+ this.autoscalePolicyId = autoscalePolicyId;
+ }
+
+ @Override
+ public void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent) {
+
+ String clusterId = averageLoadAverageEvent.getClusterId();
+ float value = averageLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageLoadAverage(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ @Override
+ public void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
+
+ String clusterId = gradientOfLoadAverageEvent.getClusterId();
+ float value = gradientOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setLoadAverageGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
+
+ String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
+ float value = secondDerivativeOfLoadAverageEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setLoadAverageSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
+
+ String clusterId = averageMemoryConsumptionEvent.getClusterId();
+ float value = averageMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
+ + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageMemoryConsumption(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
+
+ String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
+ float value = gradientOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setMemoryConsumptionGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
+
+ String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
+ float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
+
+ float value = averageRequestsInFlightEvent.getValue();
+ String clusterId = averageRequestsInFlightEvent.getClusterId();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setAverageRequestsInFlight(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
+
+ String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
+ float value = gradientOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
+ clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setRequestsInFlightGradient(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
+
+ String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
+ float value = secondDerivativeOfRequestsInFlightEvent.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s "
+ + "[value] %s", clusterId, value));
+ }
+ KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
+ if (null != kubernetesClusterContext) {
+ kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Kubernetes cluster context is not available for :" +
+ " [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ @Override
+ public void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
+
+ String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberAverageMemoryConsumptionEvent.getValue();
+ memberStatsContext.setAverageMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
+
+ String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfMemoryConsumptionEvent.getValue();
+ memberStatsContext.setGradientOfMemoryConsumption(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
+
+ }
+
+ @Override
+ public void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
+
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ String memberId = memberAverageLoadAverageEvent.getMemberId();
+ float value = memberAverageLoadAverageEvent.getValue();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ memberStatsContext.setAverageLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
+
+ String memberId = memberGradientOfLoadAverageEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberGradientOfLoadAverageEvent.getValue();
+ memberStatsContext.setGradientOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
+
+ String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
+ KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
+ MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
+ if (null == memberStatsContext) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return;
+ }
+ float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
+ memberStatsContext.setSecondDerivativeOfLoadAverage(value);
+ }
+
+ @Override
+ public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
+ // kill the container
+ String memberId = memberFaultEvent.getMemberId();
+ Member member = getMemberByMemberId(memberId);
+ if (null == member) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+ }
+ return;
+ }
+ if (!member.isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member activated event has not received for the member %s. "
+ + "Therefore ignoring" + " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Could not find the active member in kubernetes cluster context, "
+ + "[member] %s ", memberId));
+ }
+ return;
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ try {
+ ccClient.terminateContainer(memberId);
+ // remove from active member list
+ getKubernetesClusterCtxt().removeActiveMemberById(memberId);
+ if (log.isInfoEnabled()) {
+ String clusterId = memberFaultEvent.getClusterId();
+ String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+ log.info(String.format("Faulty member is terminated and removed from the active members list: "
+ + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
+ }
+ } catch (TerminationException e) {
+ String msg = "Cannot delete a container " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+
+ @Override
+ public void handleMemberStartedEvent(
+ MemberStartedEvent memberStartedEvent) {
+
+ }
+
+ @Override
+ public void handleMemberActivatedEvent(
+ MemberActivatedEvent memberActivatedEvent) {
+
+ KubernetesClusterContext kubernetesClusterContext;
+ kubernetesClusterContext = getKubernetesClusterCtxt();
+ String memberId = memberActivatedEvent.getMemberId();
+ kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been added successfully: "
+ + "[member] %s", memberId));
+ }
+ kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
+ }
+
+ @Override
+ public void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // we will only receive member terminated event
+ }
+
+ @Override
+ public void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
+
+ // no need to do anything here
+ // we will not be receiving this event for containers
+ // we will only receive member terminated event
+ }
+
+ @Override
+ public void handleMemberTerminatedEvent(
+ MemberTerminatedEvent memberTerminatedEvent) {
+
+ String memberId = memberTerminatedEvent.getMemberId();
+ if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from termination pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member is removed from pending members list: "
+ + "[member] %s", memberId));
+ }
+ } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
+ log.warn(String.format("Member is in the wrong list and it is removed from "
+ + "active members list", memberId));
+ } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
+ log.warn(String.format("Member's obsolated timeout has been expired and "
+ + "it is removed from obsolated members list", memberId));
+ } else {
+ log.warn(String.format("Member is not available in any of the list active, "
+ + "pending and termination pending", memberId));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Member stat context has been removed successfully: "
+ + "[member] %s", memberId));
+ }
+ }
+
+ @Override
+ public void handleClusterRemovedEvent(
+ ClusterRemovedEvent clusterRemovedEvent) {
+ getKubernetesClusterCtxt().getPendingMembers().clear();
+ getKubernetesClusterCtxt().getActiveMembers().clear();
+ getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
+ getKubernetesClusterCtxt().getObsoletedMembers().clear();
+ }
+
+ public KubernetesClusterContext getKubernetesClusterCtxt() {
+ return kubernetesClusterCtxt;
+ }
+
+ public void setKubernetesClusterCtxt(
+ KubernetesClusterContext kubernetesClusterCtxt) {
+ this.kubernetesClusterCtxt = kubernetesClusterCtxt;
+ }
+
+ public AutoscalePolicy getAutoscalePolicy() {
+ return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
+ }
+
+ private Member getMemberByMemberId(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ @Override
+ public void terminateAllMembers() {
+ try {
+ CloudControllerClient.getInstance().terminateAllContainers(getKubernetesClusterCtxt().getClusterId());
+ } catch (TerminationException e) {
+ log.error(String.format("Could not terminate containers: [cluster-id] %s",
+ getKubernetesClusterCtxt().getClusterId()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
new file mode 100644
index 0000000..2615651
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.List;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.KubernetesClusterContext;
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+/*
+ * It is monitoring a kubernetes service cluster periodically.
+ */
+public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
+
+ private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
+
+ private String lbReferenceType;
+
+ public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
+ String serviceClusterID, String serviceId,
+ String autoscalePolicyId) {
+ super(serviceClusterID, serviceId, kubernetesClusterCtxt,
+ new AutoscalerRuleEvaluator(
+ StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
+ StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
+ autoscalePolicyId);
+ readConfigurations();
+ }
+
+ @Override
+ public void run() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
+ }
+ try {
+ if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
+ + getStatus() + "state");
+ }
+ }
+ } catch (Exception e) {
+ log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
+ e);
+ }
+ }
+
+ @Override
+ protected void monitor() {
+ minCheck();
+ scaleCheck();
+ }
+
+ private void scaleCheck() {
+ boolean rifReset = getKubernetesClusterCtxt().isRifReset();
+ boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
+ boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
+ if (log.isDebugEnabled()) {
+ log.debug("flag of rifReset : " + rifReset
+ + " flag of memoryConsumptionReset : "
+ + memoryConsumptionReset + " flag of loadAverageReset : "
+ + loadAverageReset);
+ }
+ String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+ String clusterId = getClusterId();
+ if (rifReset || memoryConsumptionReset || loadAverageReset) {
+ getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
+ getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
+ getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+ getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+ getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+ }
+ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
+ getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
+ getKubernetesClusterCtxt().setRifReset(false);
+ getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
+ getKubernetesClusterCtxt().setLoadAverageReset(false);
+ } else if (log.isDebugEnabled()) {
+ log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
+ + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
+ }
+ }
+
+ private void minCheck() {
+ getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+ String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format(
+ "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
+ }
+ minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
+ getMinCheckKnowledgeSession(), minCheckFactHandle,
+ getKubernetesClusterCtxt());
+ }
+
+ @Override
+ public void destroy() {
+ getMinCheckKnowledgeSession().dispose();
+ getScaleCheckKnowledgeSession().dispose();
+ setDestroyed(true);
+ stopScheduler();
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
+ }
+ }
+
+ @Override
+ protected void readConfigurations() {
+ XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+ int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
+ setMonitorIntervalMilliseconds(monitorInterval);
+ if (log.isDebugEnabled()) {
+ log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "KubernetesServiceClusterMonitor "
+ + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
+ + ", clusterId=" + getClusterId()
+ + ", serviceId=" + getServiceId() + "]";
+ }
+
+ public String getLbReferenceType() {
+ return lbReferenceType;
+ }
+
+ public void setLbReferenceType(String lbReferenceType) {
+ this.lbReferenceType = lbReferenceType;
+ }
+
+ @Override
+ public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
+
+ if (properties != null) {
+ Property[] propertyArray = properties.getProperties();
+ if (propertyArray == null) {
+ return;
+ }
+ List<Property> propertyList = Arrays.asList(propertyArray);
+
+ for (Property property : propertyList) {
+ String key = property.getName();
+ String value = property.getValue();
+
+ if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
+ int min = Integer.parseInt(value);
+ int max = getKubernetesClusterCtxt().getMaxReplicas();
+ if (min > max) {
+ String msg = String.format("%s should be less than %s . But %s is not less than %s.",
+ StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
+ log.error(msg);
+ throw new InvalidArgumentException(msg);
+ }
+ getKubernetesClusterCtxt().setMinReplicas(min);
+ break;
+ }
+ }
+
+ }
+ }
+}
\ No newline at end of file
[4/5] Adding autoscaler topology event listeners introduced by
service grouping
Posted by im...@apache.org.
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
deleted file mode 100644
index d7238bf..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.partition.PartitionGroup;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.util.Constants;
-
-/*
- * Factory class for creating cluster monitors.
- */
-public class ClusterMonitorFactory {
-
- private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class);
-
- /**
- * @param cluster the cluster to be monitored
- * @return the created cluster monitor
- * @throws PolicyValidationException when deployment policy is not valid
- * @throws PartitionValidationException when partition is not valid
- */
- public static AbstractClusterMonitor getMonitor(Cluster cluster)
- throws PolicyValidationException, PartitionValidationException {
-
- AbstractClusterMonitor clusterMonitor;
- if (cluster.isKubernetesCluster()) {
- clusterMonitor = getDockerServiceClusterMonitor(cluster);
- } else if (cluster.isLbCluster()) {
- clusterMonitor = getVMLbClusterMonitor(cluster);
- } else {
- clusterMonitor = getVMServiceClusterMonitor(cluster);
- }
-
- return clusterMonitor;
- }
-
- private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster)
- throws PolicyValidationException, PartitionValidationException {
- // FIXME fix the following code to correctly update
- // AutoscalerContext context = AutoscalerContext.getInstance();
- if (null == cluster) {
- return null;
- }
-
- String autoscalePolicyName = cluster.getAutoscalePolicyName();
- String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
- if (log.isDebugEnabled()) {
- log.debug("Deployment policy name: " + deploymentPolicyName);
- log.debug("Autoscaler policy name: " + autoscalePolicyName);
- }
-
- AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
- DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
-
- if (deploymentPolicy == null) {
- String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- Partition[] allPartitions = deploymentPolicy.getAllPartitions();
- if (allPartitions == null) {
- String msg =
- "Deployment Policy's Partitions are null. Policy name: " +
- deploymentPolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-
- VMServiceClusterMonitor clusterMonitor =
- new VMServiceClusterMonitor(cluster.getClusterId(),
- cluster.getServiceName(),
- deploymentPolicy, policy);
- clusterMonitor.setStatus(ClusterStatus.Created);
-
- for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
- NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(),
- partitionGroup.getPartitions());
-
- for (Partition partition : partitionGroup.getPartitions()) {
- PartitionContext partitionContext = new PartitionContext(partition);
- partitionContext.setServiceName(cluster.getServiceName());
- partitionContext.setProperties(cluster.getProperties());
- partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
- MemberContext memberContext = new MemberContext();
- memberContext.setClusterId(member.getClusterId());
- memberContext.setMemberId(memberId);
- memberContext.setInitTime(member.getInitTime());
- memberContext.setPartition(partition);
- memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
- log.debug(msg);
- }
- partitionContext.addActiveMember(memberContext);
-// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-// partitionContext.incrementCurrentActiveMemberCount(1);
-
- } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
- log.debug(msg);
- }
- partitionContext.addPendingMember(memberContext);
-
-// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-// partitionContext.addFaultyMember(memberId);
- }
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member] %s", memberId));
- }
- }
-
- }
- networkPartitionContext.addPartitionContext(partitionContext);
- if (log.isInfoEnabled()) {
- log.info(String.format("Partition context has been added: [partition] %s",
- partitionContext.getPartitionId()));
- }
- }
-
- clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
- if (log.isInfoEnabled()) {
- log.info(String.format("Network partition context has been added: [network partition] %s",
- networkPartitionContext.getId()));
- }
- }
-
-
- // find lb reference type
- java.util.Properties props = cluster.getProperties();
-
- if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
- String value = props.getProperty(Constants.LOAD_BALANCER_REF);
- clusterMonitor.setLbReferenceType(value);
- if (log.isDebugEnabled()) {
- log.debug("Set the lb reference type: " + value);
- }
- }
-
- // set hasPrimary property
- // hasPrimary is true if there are primary members available in that cluster
- clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
-
- log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString());
- return clusterMonitor;
- }
-
- private static Properties convertMemberPropsToMemberContextProps(
- java.util.Properties properties) {
- Properties props = new Properties();
- for (Map.Entry<Object, Object> e : properties.entrySet()) {
- Property prop = new Property();
- prop.setName((String) e.getKey());
- prop.setValue((String) e.getValue());
- props.addProperties(prop);
- }
- return props;
- }
-
-
- private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster)
- throws PolicyValidationException, PartitionValidationException {
- // FIXME fix the following code to correctly update
- // AutoscalerContext context = AutoscalerContext.getInstance();
- if (null == cluster) {
- return null;
- }
-
- String autoscalePolicyName = cluster.getAutoscalePolicyName();
- String deploymentPolicyName = cluster.getDeploymentPolicyName();
-
- if (log.isDebugEnabled()) {
- log.debug("Deployment policy name: " + deploymentPolicyName);
- log.debug("Autoscaler policy name: " + autoscalePolicyName);
- }
-
- AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
- DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
-
- if (deploymentPolicy == null) {
- String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- String clusterId = cluster.getClusterId();
- VMLbClusterMonitor clusterMonitor =
- new VMLbClusterMonitor(clusterId,
- cluster.getServiceName(),
- deploymentPolicy, policy);
- clusterMonitor.setStatus(ClusterStatus.Created);
- // partition group = network partition context
- for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
- NetworkPartitionLbHolder networkPartitionLbHolder =
- PartitionManager.getInstance()
- .getNetworkPartitionLbHolder(partitionGroup.getId());
-// PartitionManager.getInstance()
-// .getNetworkPartitionLbHolder(partitionGroup.getId());
- // FIXME pick a random partition
- Partition partition =
- partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)];
- PartitionContext partitionContext = new PartitionContext(partition);
- partitionContext.setServiceName(cluster.getServiceName());
- partitionContext.setProperties(cluster.getProperties());
- partitionContext.setNetworkPartitionId(partitionGroup.getId());
- partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions
-
- NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(),
- partitionGroup.getPartitions());
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) {
- MemberContext memberContext = new MemberContext();
- memberContext.setClusterId(member.getClusterId());
- memberContext.setMemberId(memberId);
- memberContext.setPartition(partition);
- memberContext.setInitTime(member.getInitTime());
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
- log.debug(msg);
- }
- partitionContext.addActiveMember(memberContext);
-// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-// partitionContext.incrementCurrentActiveMemberCount(1);
- } else if (MemberStatus.Created.equals(member.getStatus()) ||
- MemberStatus.Starting.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
- log.debug(msg);
- }
- partitionContext.addPendingMember(memberContext);
-// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-// partitionContext.addFaultyMember(memberId);
- }
-
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member] %s", memberId));
- }
- }
-
- }
- networkPartitionContext.addPartitionContext(partitionContext);
-
- // populate lb cluster id in network partition context.
- java.util.Properties props = cluster.getProperties();
-
- // get service type of load balanced cluster
- String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
-
- if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
- String value = props.getProperty(Constants.LOAD_BALANCER_REF);
-
- if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) {
- networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
-
- } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) {
- String serviceName = cluster.getServiceName();
- // TODO: check if this is correct
- networkPartitionLbHolder.addServiceLB(serviceName, clusterId);
-
- if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) {
- networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
- if (log.isDebugEnabled()) {
- log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType);
- }
- }
- }
- }
-
- clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
- }
-
- log.info("VMLbClusterMonitor created: " + clusterMonitor.toString());
- return clusterMonitor;
- }
-
- /**
- * @param cluster - the cluster which needs to be monitored
- * @return - the cluster monitor
- */
- private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster)
- throws PolicyValidationException {
-
- if (null == cluster) {
- return null;
- }
-
- String autoscalePolicyName = cluster.getAutoscalePolicyName();
- if (log.isDebugEnabled()) {
- log.debug("Autoscaler policy name: " + autoscalePolicyName);
- }
-
- AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
-
- if (policy == null) {
- String msg = "Autoscale Policy is null. Policy name: " + autoscalePolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- java.util.Properties props = cluster.getProperties();
- String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
- KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID,
- cluster.getClusterId());
-
- String minReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS);
- if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) {
- int minReplicas = Integer.parseInt(minReplicasProperty);
- kubernetesClusterCtxt.setMinReplicas(minReplicas);
- }
-
- String maxReplicasProperty = props.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS);
- if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) {
- int maxReplicas = Integer.parseInt(maxReplicasProperty);
- kubernetesClusterCtxt.setMaxReplicas(maxReplicas);
- }
-
- KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(
- kubernetesClusterCtxt,
- cluster.getClusterId(),
- cluster.getServiceName(),
- policy.getId());
-
- dockerClusterMonitor.setStatus(ClusterStatus.Created);
-
- //populate the members after restarting
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- String clusterId = member.getClusterId();
- MemberContext memberContext = new MemberContext();
- memberContext.setMemberId(memberId);
- memberContext.setClusterId(clusterId);
- memberContext.setInitTime(member.getInitTime());
-
- // if there is at least one member in the topology, that means service has been created already
- // this is to avoid calling startContainer() method again
- kubernetesClusterCtxt.setServiceClusterCreated(true);
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString());
- log.debug(msg);
- }
- dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext);
- } else if (MemberStatus.Created.equals(member.getStatus())
- || MemberStatus.Starting.equals(member.getStatus())) {
- if (log.isDebugEnabled()) {
- String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString());
- log.debug(msg);
- }
- dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext);
- }
-
- kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member] %s", memberId));
- }
- }
-
- // find lb reference type
- if (props.containsKey(Constants.LOAD_BALANCER_REF)) {
- String value = props.getProperty(Constants.LOAD_BALANCER_REF);
- dockerClusterMonitor.setLbReferenceType(value);
- if (log.isDebugEnabled()) {
- log.debug("Set the lb reference type: " + value);
- }
- }
-
- log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString());
- return dockerClusterMonitor;
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
deleted file mode 100644
index 0254030..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/*
- * Every kubernetes cluster monitor should extend this class
- */
-public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor {
-
- private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class);
-
- private KubernetesClusterContext kubernetesClusterCtxt;
- protected String autoscalePolicyId;
-
- protected KubernetesClusterMonitor(String clusterId, String serviceId,
- KubernetesClusterContext kubernetesClusterContext,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator,
- String autoscalePolicyId) {
-
- super(clusterId, serviceId, autoscalerRuleEvaluator);
- this.kubernetesClusterCtxt = kubernetesClusterContext;
- this.autoscalePolicyId = autoscalePolicyId;
- }
-
- @Override
- public void handleAverageLoadAverageEvent(
- AverageLoadAverageEvent averageLoadAverageEvent) {
-
- String clusterId = averageLoadAverageEvent.getClusterId();
- float value = averageLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg load avg event: [cluster] %s [value] %s",
- clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setAverageLoadAverage(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
-
- }
-
- @Override
- public void handleGradientOfLoadAverageEvent(
- GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
- String clusterId = gradientOfLoadAverageEvent.getClusterId();
- float value = gradientOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s",
- clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setLoadAverageGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfLoadAverageEvent(
- SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
- String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
- float value = secondDerivativeOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
- + "[value] %s", clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setLoadAverageSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleAverageMemoryConsumptionEvent(
- AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
- String clusterId = averageMemoryConsumptionEvent.getClusterId();
- float value = averageMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg Memory Consumption event: [cluster] %s "
- + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setAverageMemoryConsumption(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleGradientOfMemoryConsumptionEvent(
- GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
- String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
- float value = gradientOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
- + "[value] %s", clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setMemoryConsumptionGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfMemoryConsumptionEvent(
- SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
- String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
- float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
- + "[value] %s", clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleAverageRequestsInFlightEvent(
- AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
- float value = averageRequestsInFlightEvent.getValue();
- String clusterId = averageRequestsInFlightEvent.getClusterId();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Rif event: [cluster] %s [value] %s",
- clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setAverageRequestsInFlight(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleGradientOfRequestsInFlightEvent(
- GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
- String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
- float value = gradientOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s",
- clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setRequestsInFlightGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfRequestsInFlightEvent(
- SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
- String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
- float value = secondDerivativeOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second derivative of Rif event: [cluster] %s "
- + "[value] %s", clusterId, value));
- }
- KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt();
- if (null != kubernetesClusterContext) {
- kubernetesClusterContext.setRequestsInFlightSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Kubernetes cluster context is not available for :" +
- " [cluster] %s", clusterId));
- }
- }
- }
-
- @Override
- public void handleMemberAverageMemoryConsumptionEvent(
- MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
- String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
- KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
- MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageMemoryConsumptionEvent.getValue();
- memberStatsContext.setAverageMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberGradientOfMemoryConsumptionEvent(
- MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
- String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
- KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
- MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfMemoryConsumptionEvent.getValue();
- memberStatsContext.setGradientOfMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
- MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
- }
-
- @Override
- public void handleMemberAverageLoadAverageEvent(
- MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
- KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
- String memberId = memberAverageLoadAverageEvent.getMemberId();
- float value = memberAverageLoadAverageEvent.getValue();
- MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- memberStatsContext.setAverageLoadAverage(value);
- }
-
- @Override
- public void handleMemberGradientOfLoadAverageEvent(
- MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
- String memberId = memberGradientOfLoadAverageEvent.getMemberId();
- KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
- MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfLoadAverageEvent.getValue();
- memberStatsContext.setGradientOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfLoadAverageEvent(
- MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
- String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
- KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt();
- MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
- memberStatsContext.setSecondDerivativeOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
- // kill the container
- String memberId = memberFaultEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- if (null == member) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
- }
- return;
- }
- if (!member.isActive()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the member fault health stat", memberId));
- }
- return;
- }
-
- if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Could not find the active member in kubernetes cluster context, "
- + "[member] %s ", memberId));
- }
- return;
- }
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- try {
- ccClient.terminateContainer(memberId);
- // remove from active member list
- getKubernetesClusterCtxt().removeActiveMemberById(memberId);
- if (log.isInfoEnabled()) {
- String clusterId = memberFaultEvent.getClusterId();
- String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
- log.info(String.format("Faulty member is terminated and removed from the active members list: "
- + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId));
- }
- } catch (TerminationException e) {
- String msg = "Cannot delete a container " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
-
- @Override
- public void handleMemberStartedEvent(
- MemberStartedEvent memberStartedEvent) {
-
- }
-
- @Override
- public void handleMemberActivatedEvent(
- MemberActivatedEvent memberActivatedEvent) {
-
- KubernetesClusterContext kubernetesClusterContext;
- kubernetesClusterContext = getKubernetesClusterCtxt();
- String memberId = memberActivatedEvent.getMemberId();
- kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
- }
-
- @Override
- public void handleMemberMaintenanceModeEvent(
- MemberMaintenanceModeEvent maintenanceModeEvent) {
-
- // no need to do anything here
- // we will not be receiving this event for containers
- // we will only receive member terminated event
- }
-
- @Override
- public void handleMemberReadyToShutdownEvent(
- MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
- // no need to do anything here
- // we will not be receiving this event for containers
- // we will only receive member terminated event
- }
-
- @Override
- public void handleMemberTerminatedEvent(
- MemberTerminatedEvent memberTerminatedEvent) {
-
- String memberId = memberTerminatedEvent.getMemberId();
- if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from termination pending members list: "
- + "[member] %s", memberId));
- }
- } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from pending members list: "
- + "[member] %s", memberId));
- }
- } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) {
- log.warn(String.format("Member is in the wrong list and it is removed from "
- + "active members list", memberId));
- } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) {
- log.warn(String.format("Member's obsolated timeout has been expired and "
- + "it is removed from obsolated members list", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, "
- + "pending and termination pending", memberId));
- }
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been removed successfully: "
- + "[member] %s", memberId));
- }
- }
-
- @Override
- public void handleClusterRemovedEvent(
- ClusterRemovedEvent clusterRemovedEvent) {
- getKubernetesClusterCtxt().getPendingMembers().clear();
- getKubernetesClusterCtxt().getActiveMembers().clear();
- getKubernetesClusterCtxt().getTerminationPendingMembers().clear();
- getKubernetesClusterCtxt().getObsoletedMembers().clear();
- }
-
- public KubernetesClusterContext getKubernetesClusterCtxt() {
- return kubernetesClusterCtxt;
- }
-
- public void setKubernetesClusterCtxt(
- KubernetesClusterContext kubernetesClusterCtxt) {
- this.kubernetesClusterCtxt = kubernetesClusterCtxt;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyId);
- }
-
- private Member getMemberByMemberId(String memberId) {
- try {
- TopologyManager.acquireReadLock();
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId);
- }
- }
- }
- return null;
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
deleted file mode 100644
index 15b14b6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections.ListUtils;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.KubernetesClusterContext;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-import edu.emory.mathcs.backport.java.util.Arrays;
-
-/*
- * It is monitoring a kubernetes service cluster periodically.
- */
-public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor {
-
- private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class);
-
- private String lbReferenceType;
-
- public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt,
- String serviceClusterID, String serviceId,
- String autoscalePolicyId) {
- super(serviceClusterID, serviceId, kubernetesClusterCtxt,
- new AutoscalerRuleEvaluator(
- StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE,
- StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE),
- autoscalePolicyId);
- readConfigurations();
- }
-
- @Override
- public void run() {
-
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
- }
- try {
- if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
- + getStatus() + "state");
- }
- }
- } catch (Exception e) {
- log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
- e);
- }
- }
-
- @Override
- protected void monitor() {
- minCheck();
- scaleCheck();
- }
-
- private void scaleCheck() {
- boolean rifReset = getKubernetesClusterCtxt().isRifReset();
- boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset();
- boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset();
- if (log.isDebugEnabled()) {
- log.debug("flag of rifReset : " + rifReset
- + " flag of memoryConsumptionReset : "
- + memoryConsumptionReset + " flag of loadAverageReset : "
- + loadAverageReset);
- }
- String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
- String clusterId = getClusterId();
- if (rifReset || memoryConsumptionReset || loadAverageReset) {
- getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId);
- getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy());
- getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
- getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
- getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
- }
- scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(
- getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt());
- getKubernetesClusterCtxt().setRifReset(false);
- getKubernetesClusterCtxt().setMemoryConsumptionReset(false);
- getKubernetesClusterCtxt().setLoadAverageReset(false);
- } else if (log.isDebugEnabled()) {
- log.debug(String.format("Scale check will not run since none of the statistics have not received yet for "
- + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId));
- }
- }
-
- private void minCheck() {
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID();
- if (log.isDebugEnabled()) {
- log.debug(String.format(
- "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId()));
- }
- minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(
- getMinCheckKnowledgeSession(), minCheckFactHandle,
- getKubernetesClusterCtxt());
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getScaleCheckKnowledgeSession().dispose();
- setDestroyed(true);
- stopScheduler();
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString());
- }
- }
-
- @Override
- protected void readConfigurations() {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000);
- setMonitorIntervalMilliseconds(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
- }
- }
-
- @Override
- public String toString() {
- return "KubernetesServiceClusterMonitor "
- + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID()
- + ", clusterId=" + getClusterId()
- + ", serviceId=" + getServiceId() + "]";
- }
-
- public String getLbReferenceType() {
- return lbReferenceType;
- }
-
- public void setLbReferenceType(String lbReferenceType) {
- this.lbReferenceType = lbReferenceType;
- }
-
- @Override
- public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
-
- if (properties != null) {
- Property[] propertyArray = properties.getProperties();
- if (propertyArray == null) {
- return;
- }
- List<Property> propertyList = Arrays.asList(propertyArray);
-
- for (Property property : propertyList) {
- String key = property.getName();
- String value = property.getValue();
-
- if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) {
- int min = Integer.parseInt(value);
- int max = getKubernetesClusterCtxt().getMaxReplicas();
- if (min > max) {
- String msg = String.format("%s should be less than %s . But %s is not less than %s.",
- StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max);
- log.error(msg);
- throw new InvalidArgumentException(msg);
- }
- getKubernetesClusterCtxt().setMinReplicas(min);
- break;
- }
- }
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
index fa8f425..8aeae94 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
@@ -28,10 +28,9 @@ import org.apache.stratos.autoscaler.grouping.dependency.DependencyBuilder;
import org.apache.stratos.autoscaler.grouping.dependency.DependencyTree;
import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.status.checker.StatusChecker;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
import org.apache.stratos.messaging.domain.topology.ParentComponent;
import java.util.HashMap;