You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/31 05:04:06 UTC
[3/5] Adding autoscaler topology event listeners introduced by
service grouping
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
deleted file mode 100644
index 38ed1a6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java
+++ /dev/null
@@ -1,639 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
-import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
-import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-abstract public class VMClusterMonitor extends AbstractClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
- // Map<NetworkpartitionId, Network Partition Context>
- protected Map<String, NetworkPartitionContext> networkPartitionCtxts;
- protected DeploymentPolicy deploymentPolicy;
- protected AutoscalePolicy autoscalePolicy;
-
- protected VMClusterMonitor(String clusterId, String serviceId,
- AutoscalerRuleEvaluator autoscalerRuleEvaluator,
- DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy,
- Map<String, NetworkPartitionContext> networkPartitionCtxts) {
- super(clusterId, serviceId, autoscalerRuleEvaluator);
- this.deploymentPolicy = deploymentPolicy;
- this.autoscalePolicy = autoscalePolicy;
- this.networkPartitionCtxts = networkPartitionCtxts;
- }
-
- @Override
- public void handleAverageLoadAverageEvent(
- AverageLoadAverageEvent averageLoadAverageEvent) {
-
- String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId();
- String clusterId = averageLoadAverageEvent.getClusterId();
- float value = averageLoadAverageEvent.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
-
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setAverageLoadAverage(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
-
- }
-
- @Override
- public void handleGradientOfLoadAverageEvent(
- GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
-
- String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId();
- String clusterId = gradientOfLoadAverageEvent.getClusterId();
- float value = gradientOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setLoadAverageGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfLoadAverageEvent(
- SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) {
-
- String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
- float value = secondDerivativeOfLoadAverageEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of load avg event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setLoadAverageSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleAverageMemoryConsumptionEvent(
- AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) {
-
- String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = averageMemoryConsumptionEvent.getClusterId();
- float value = averageMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s "
- + "[value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setAverageMemoryConsumption(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Network partition context is not available for :"
- + " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleGradientOfMemoryConsumptionEvent(
- GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) {
-
- String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
- float value = gradientOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setMemoryConsumptionGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfMemoryConsumptionEvent(
- SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) {
-
- String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
- float value = secondDerivativeOfMemoryConsumptionEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setMemoryConsumptionSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleAverageRequestsInFlightEvent(
- AverageRequestsInFlightEvent averageRequestsInFlightEvent) {
-
- String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = averageRequestsInFlightEvent.getClusterId();
- float value = averageRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setAverageRequestsInFlight(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleGradientOfRequestsInFlightEvent(
- GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) {
-
- String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
- float value = gradientOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setRequestsInFlightGradient(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleSecondDerivativeOfRequestsInFlightEvent(
- SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) {
-
- String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId();
- String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
- float value = secondDerivativeOfRequestsInFlightEvent.getValue();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second derivative of Rif event: [cluster] %s "
- + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value));
- }
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- if (null != networkPartitionContext) {
- networkPartitionContext.setRequestsInFlightSecondDerivative(value);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- @Override
- public void handleMemberAverageMemoryConsumptionEvent(
- MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) {
-
- String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageMemoryConsumptionEvent.getValue();
- memberStatsContext.setAverageMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberGradientOfMemoryConsumptionEvent(
- MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) {
-
- String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfMemoryConsumptionEvent.getValue();
- memberStatsContext.setGradientOfMemoryConsumption(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
- MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) {
-
- }
-
- @Override
- public void handleMemberAverageLoadAverageEvent(
- MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) {
-
- String memberId = memberAverageLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberAverageLoadAverageEvent.getValue();
- memberStatsContext.setAverageLoadAverage(value);
- }
-
- @Override
- public void handleMemberGradientOfLoadAverageEvent(
- MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) {
-
- String memberId = memberGradientOfLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberGradientOfLoadAverageEvent.getValue();
- memberStatsContext.setGradientOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberSecondDerivativeOfLoadAverageEvent(
- MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) {
-
- String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- String networkPartitionId = getNetworkPartitionIdByMemberId(memberId);
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId());
- MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId);
- if (null == memberStatsContext) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return;
- }
- float value = memberSecondDerivativeOfLoadAverageEvent.getValue();
- memberStatsContext.setSecondDerivativeOfLoadAverage(value);
- }
-
- @Override
- public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) {
-
- String memberId = memberFaultEvent.getMemberId();
- Member member = getMemberByMemberId(memberId);
- if (null == member) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
- }
- return;
- }
- if (!member.isActive()) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member activated event has not received for the member %s. "
- + "Therefore ignoring" + " the member fault health stat", memberId));
- }
- return;
- }
-
- NetworkPartitionContext nwPartitionCtxt;
- nwPartitionCtxt = getNetworkPartitionCtxt(member);
- String partitionId = getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
- if (!partitionCtxt.activeMemberExist(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Could not find the active member in partition context, "
- + "[member] %s ", memberId));
- }
- return;
- }
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- try {
- ccClient.terminate(memberId);
- } catch (TerminationException e) {
- String msg = "TerminationException " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
- if (log.isInfoEnabled()) {
- String clusterId = memberFaultEvent.getClusterId();
- log.info(String.format("Faulty member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- }
- }
-
- @Override
- public void handleMemberStartedEvent(
- MemberStartedEvent memberStartedEvent) {
-
- }
-
- @Override
- public void handleMemberActivatedEvent(
- MemberActivatedEvent memberActivatedEvent) {
-
- String networkPartitionId = memberActivatedEvent.getNetworkPartitionId();
- String partitionId = memberActivatedEvent.getPartitionId();
- String memberId = memberActivatedEvent.getMemberId();
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext;
- partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added successfully: "
- + "[member] %s", memberId));
- }
- partitionContext.movePendingMemberToActiveMembers(memberId);
- }
-
- @Override
- public void handleMemberMaintenanceModeEvent(
- MemberMaintenanceModeEvent maintenanceModeEvent) {
-
- String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId();
- String partitionId = maintenanceModeEvent.getPartitionId();
- String memberId = maintenanceModeEvent.getMemberId();
- NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId);
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member has been moved as pending termination: "
- + "[member] %s", memberId));
- }
- partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
- }
-
- @Override
- public void handleMemberReadyToShutdownEvent(
- MemberReadyToShutdownEvent memberReadyToShutdownEvent) {
-
- NetworkPartitionContext nwPartitionCtxt;
- String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId();
- nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId);
-
- // start a new member in the same Partition
- String memberId = memberReadyToShutdownEvent.getMemberId();
- String partitionId = getPartitionOfMember(memberId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
- // terminate the shutdown ready member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- try {
- ccClient.terminate(memberId);
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- log.info(String.format("Member is terminated and removed from the active members list: "
- + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId));
- } catch (TerminationException e) {
- String msg = "TerminationException" + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
-
- @Override
- public void handleMemberTerminatedEvent(
- MemberTerminatedEvent memberTerminatedEvent) {
-
- String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId();
- String memberId = memberTerminatedEvent.getMemberId();
- String partitionId = memberTerminatedEvent.getPartitionId();
- NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId);
- PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId);
- partitionContext.removeMemberStatsContext(memberId);
-
- if (partitionContext.removeTerminationPendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from termination pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removePendingMember(memberId)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member is removed from pending members list: "
- + "[member] %s", memberId));
- }
- } else if (partitionContext.removeActiveMemberById(memberId)) {
- log.warn(String.format("Member is in the wrong list and it is removed from "
- + "active members list", memberId));
- } else if (partitionContext.removeObsoleteMember(memberId)) {
- log.warn(String.format("Member's obsolated timeout has been expired and "
- + "it is removed from obsolated members list", memberId));
- } else {
- log.warn(String.format("Member is not available in any of the list active, "
- + "pending and termination pending", memberId));
- }
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been removed successfully: "
- + "[member] %s", memberId));
- }
- }
-
- @Override
- public void handleClusterRemovedEvent(
- ClusterRemovedEvent clusterRemovedEvent) {
-
- }
-
- private String getNetworkPartitionIdByMemberId(String memberId) {
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId).getNetworkPartitionId();
- }
- }
- }
- return null;
- }
-
- private Member getMemberByMemberId(String memberId) {
- try {
- TopologyManager.acquireReadLock();
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId);
- }
- }
- }
- return null;
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- public NetworkPartitionContext getNetworkPartitionCtxt(Member member) {
- log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId());
- String networkPartitionId = member.getNetworkPartitionId();
- if (networkPartitionCtxts.containsKey(networkPartitionId)) {
- log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId));
- return networkPartitionCtxts.get(networkPartitionId);
- }
- log.info("returning null getNetworkPartitionCtxt");
- return null;
- }
-
- public String getPartitionOfMember(String memberId) {
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.memberExists(memberId)) {
- return cluster.getMember(memberId).getPartitionId();
- }
- }
- }
- return null;
- }
-
- public DeploymentPolicy getDeploymentPolicy() {
- return deploymentPolicy;
- }
-
- public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
- this.deploymentPolicy = deploymentPolicy;
- }
-
- public AutoscalePolicy getAutoscalePolicy() {
- return autoscalePolicy;
- }
-
- public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
- this.autoscalePolicy = autoscalePolicy;
- }
-
- public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() {
- return networkPartitionCtxts;
- }
-
- public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) {
- return networkPartitionCtxts.get(networkPartitionId);
- }
-
- public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) {
- this.networkPartitionCtxts = partitionCtxt;
- }
-
- public boolean partitionCtxtAvailable(String partitionId) {
- return networkPartitionCtxts.containsKey(partitionId);
- }
-
- public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
- this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
- }
-
- public NetworkPartitionContext getPartitionCtxt(String id) {
- return this.networkPartitionCtxts.get(id);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
deleted file mode 100644
index 3e6cddc..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.partition.PartitionManager;
-import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMLbClusterMonitor extends VMClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class);
-
- public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- super(clusterId, serviceId,
- new AutoscalerRuleEvaluator(
- StratosConstants.VM_MIN_CHECK_DROOL_FILE,
- StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
- deploymentPolicy, autoscalePolicy,
- new ConcurrentHashMap<String, NetworkPartitionContext>());
- readConfigurations();
- }
-
- @Override
- public void run() {
-
- while (!isDestroyed()) {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.Inactive.equals(status)) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("LB Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
- }
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- protected void monitor() {
- // TODO make this concurrent
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-
- // minimum check per partition
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
- .values()) {
-
- if (partitionContext != null) {
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getMinCheckKnowledgeSession().setGlobal("isPrimary", false);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for partition %s ",
- partitionContext.getPartitionId()));
- }
-
- minCheckFactHandle =
- AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(),
- minCheckFactHandle,
- partitionContext);
- // start only in the first partition context
- break;
- }
-
- }
-
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getMinCheckKnowledgeSession().dispose();
- setDestroyed(true);
- stopScheduler();
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString());
- }
- }
-
- @Override
- protected void readConfigurations() {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.VMLb_Cluster_MONITOR_INTERVAL, 90000);
- setMonitorIntervalMilliseconds(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
- }
- }
-
- @Override
- public void handleClusterRemovedEvent(
- ClusterRemovedEvent clusterRemovedEvent) {
-
- String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy();
- String clusterId = clusterRemovedEvent.getClusterId();
- DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
- if (depPolicy != null) {
- List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance()
- .getNetworkPartitionLbHolders(depPolicy);
-
- for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) {
- // removes lb cluster ids
- boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId);
- if (isRemoved) {
- log.info("Removed the lb cluster [id]:"
- + clusterId
- + " reference from Network Partition [id]: "
- + networkPartitionLbHolder
- .getNetworkPartitionId());
-
- }
- if (log.isDebugEnabled()) {
- log.debug(networkPartitionLbHolder);
- }
-
- }
- }
- }
-
- @Override
- public String toString() {
- return "VMLbClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + "]";
- }
-
- @Override
- public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
- // TODO
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
deleted file mode 100644
index 9a26b42..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.autoscaler.util.AutoScalerConstants;
-import org.apache.stratos.autoscaler.util.ConfUtil;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.common.constants.StratosConstants;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class VMServiceClusterMonitor extends VMClusterMonitor {
-
- private static final Log log = LogFactory.getLog(VMServiceClusterMonitor.class);
- private String lbReferenceType;
- private boolean hasPrimary;
-
- public VMServiceClusterMonitor(String clusterId, String serviceId,
- DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- super(clusterId, serviceId,
- new AutoscalerRuleEvaluator(StratosConstants.VM_MIN_CHECK_DROOL_FILE,
- StratosConstants.VM_SCALE_CHECK_DROOL_FILE),
- deploymentPolicy, autoscalePolicy,
- new ConcurrentHashMap<String, NetworkPartitionContext>());
- readConfigurations();
- }
-
- @Override
- public void run() {
- while (!isDestroyed()) {
- try {
- if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
- (this.status == ClusterStatus.Inactive && !hasDependent) ||
- !this.hasFaultyMember) {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed." + this.toString(), e);
- }
- try {
- Thread.sleep(getMonitorIntervalMilliseconds());
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- protected void monitor() {
-
- //TODO make this concurrent
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
- // store primary members in the network partition context
- List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
-
- //minimum check per partition
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
- // store primary members in the partition context
- List<String> primaryMemberListInPartition = new ArrayList<String>();
- // get active primary members in this partition context
- for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInPartition.add(memberContext.getMemberId());
- }
- }
- // get pending primary members in this partition context
- for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInPartition.add(memberContext.getMemberId());
- }
- }
- primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
- getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
- getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
- getMinCheckKnowledgeSession().setGlobal("primaryMemberCount", primaryMemberListInPartition.size());
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
- }
-
- minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession()
- , minCheckFactHandle, partitionContext);
-
- }
-
- boolean rifReset = networkPartitionContext.isRifReset();
- boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
- boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
- if (log.isDebugEnabled()) {
- log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
- + " flag of loadAverageReset" + loadAverageReset);
- }
- if (rifReset || memoryConsumptionReset || loadAverageReset) {
- getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
- //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
- getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy);
- getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
- getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
- getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
- getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
- getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
- getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
- log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
- }
-
- scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
- , scaleCheckFactHandle, networkPartitionContext);
-
- networkPartitionContext.setRifReset(false);
- networkPartitionContext.setMemoryConsumptionReset(false);
- networkPartitionContext.setLoadAverageReset(false);
- } else if (log.isDebugEnabled()) {
- log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
- "cycle for network partition %s", networkPartitionContext.getId()));
- }
- }
- }
-
- private boolean isPrimaryMember(MemberContext memberContext) {
- Properties props = memberContext.getProperties();
- if (log.isDebugEnabled()) {
- log.debug(" Properties [" + props + "] ");
- }
- if (props != null && props.getProperties() != null) {
- for (Property prop : props.getProperties()) {
- if (prop.getName().equals("PRIMARY")) {
- if (Boolean.parseBoolean(prop.getValue())) {
- log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
- "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
- return true;
- }
- }
- }
- }
- return false;
- }
-
- @Override
- protected void readConfigurations() {
- XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
- int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
- setMonitorIntervalMilliseconds(monitorInterval);
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds());
- }
- }
-
- @Override
- public void destroy() {
- getMinCheckKnowledgeSession().dispose();
- getScaleCheckKnowledgeSession().dispose();
- setDestroyed(true);
- stopScheduler();
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString());
- }
- }
-
- @Override
- public String toString() {
- return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() +
- ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
- ", lbReferenceType=" + lbReferenceType +
- ", hasPrimary=" + hasPrimary + " ]";
- }
-
- public String getLbReferenceType() {
- return lbReferenceType;
- }
-
- public void setLbReferenceType(String lbReferenceType) {
- this.lbReferenceType = lbReferenceType;
- }
-
- public boolean isHasPrimary() {
- return hasPrimary;
- }
-
- public void setHasPrimary(boolean hasPrimary) {
- this.hasPrimary = hasPrimary;
- }
-
- @Override
- public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException {
- // TODO
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
index 2bf85a6..cf92a18 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.Monitor;
import org.apache.stratos.autoscaler.monitor.MonitorStatusEventBuilder;
import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor;
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
new file mode 100644
index 0000000..bc23dd4
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.application;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
+import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext;
+import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
+import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
+import org.apache.stratos.autoscaler.monitor.Monitor;
+import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.cloud.controller.stub.pojo.Property;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.Map;
+
+/**
+ * Factory class to get the Monitors.
+ */
+public class ApplicationMonitorFactory {
+ private static final Log log = LogFactory.getLog(ApplicationMonitorFactory.class);
+
+ /**
+ * Factor method used to create relevant monitors based on the given context
+ *
+ * @param context Application/Group/Cluster context
+ * @param appId appId of the application which requires to create app monitor
+ * @param parentMonitor parent of the monitor
+ * @return Monitor which can be ApplicationMonitor/GroupMonitor/ClusterMonitor
+ * @throws TopologyInConsistentException throws while traversing thr topology
+ * @throws DependencyBuilderException throws while building dependency for app monitor
+ * @throws PolicyValidationException throws while validating the policy associated with cluster
+ * @throws PartitionValidationException throws while validating the partition used in a cluster
+ */
+ public static Monitor getMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
+ throws TopologyInConsistentException,
+ DependencyBuilderException, PolicyValidationException, PartitionValidationException {
+ Monitor monitor;
+
+ if (context instanceof GroupContext) {
+ monitor = getGroupMonitor(parentMonitor, context, appId);
+ } else if (context instanceof ClusterContext) {
+ monitor = getClusterMonitor(parentMonitor, (ClusterContext) context, appId);
+ //Start the thread
+ Thread th = new Thread((AbstractClusterMonitor) monitor);
+ th.start();
+ } else {
+ monitor = getApplicationMonitor(appId);
+ }
+ return monitor;
+ }
+
+ /**
+ * This will create the GroupMonitor based on given groupId by going thr Topology
+ *
+ * @param parentMonitor parent of the monitor
+ * @param context groupId of the group
+ * @param appId appId of the relevant application
+ * @return Group monitor
+ * @throws DependencyBuilderException throws while building dependency for app monitor
+ * @throws TopologyInConsistentException throws while traversing thr topology
+ */
+ public static Monitor getGroupMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId)
+ throws DependencyBuilderException,
+ TopologyInConsistentException {
+ GroupMonitor groupMonitor;
+ TopologyManager.acquireReadLockForApplication(appId);
+
+ try {
+ Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId());
+ groupMonitor = new GroupMonitor(group, appId);
+ groupMonitor.setAppId(appId);
+ if(parentMonitor != null) {
+ groupMonitor.setParent(parentMonitor);
+ //Setting the dependent behaviour of the monitor
+ if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
+ groupMonitor.setHasDependent(true);
+ } else {
+ groupMonitor.setHasDependent(false);
+ }
+ //TODO make sure when it is async
+
+ if (group.getStatus() != groupMonitor.getStatus()) {
+ //updating the status, if the group is not in created state when creating group Monitor
+ //so that groupMonitor will notify the parent (useful when restarting stratos)
+ groupMonitor.setStatus(group.getStatus());
+ }
+ }
+
+ } finally {
+ TopologyManager.releaseReadLockForApplication(appId);
+
+ }
+ return groupMonitor;
+
+ }
+
+ /**
+ * This will create a new app monitor based on the give appId by getting the
+ * application from Topology
+ *
+ * @param appId appId of the application which requires to create app monitor
+ * @return ApplicationMonitor
+ * @throws DependencyBuilderException throws while building dependency for app monitor
+ * @throws TopologyInConsistentException throws while traversing thr topology
+ */
+ public static ApplicationMonitor getApplicationMonitor(String appId)
+ throws DependencyBuilderException,
+ TopologyInConsistentException {
+ ApplicationMonitor applicationMonitor;
+ TopologyManager.acquireReadLockForApplication(appId);
+ try {
+ Application application = TopologyManager.getTopology().getApplication(appId);
+ if (application != null) {
+ applicationMonitor = new ApplicationMonitor(application);
+ applicationMonitor.setHasDependent(false);
+
+ } else {
+ String msg = "[Application] " + appId + " cannot be found in the Topology";
+ throw new TopologyInConsistentException(msg);
+ }
+ } finally {
+ TopologyManager.releaseReadLockForApplication(appId);
+ }
+
+ return applicationMonitor;
+
+ }
+
+ /**
+ * Updates ClusterContext for given cluster
+ *
+ * @param parentMonitor parent of the monitor
+ * @param context
+ * @return ClusterMonitor - Updated ClusterContext
+ * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
+ * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
+ */
+ public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
+ ClusterContext context, String appId)
+ throws PolicyValidationException,
+ PartitionValidationException,
+ TopologyInConsistentException {
+ //Retrieving the Cluster from Topology
+ String clusterId = context.getId();
+ String serviceName = context.getServiceName();
+
+ Cluster cluster;
+ AbstractClusterMonitor clusterMonitor;
+ //acquire read lock for the service and cluster
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ try {
+ Topology topology = TopologyManager.getTopology();
+ if (topology.serviceExists(serviceName)) {
+ Service service = topology.getService(serviceName);
+ if (service.clusterExists(clusterId)) {
+ cluster = service.getCluster(clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug("Dependency check starting the [cluster]" + clusterId);
+ }
+ // startClusterMonitor(this, cluster);
+ //context.setCurrentStatus(Status.Created);
+ } else {
+ String msg = "[Cluster] " + clusterId + " cannot be found in the " +
+ "Topology for [service] " + serviceName;
+ throw new TopologyInConsistentException(msg);
+ }
+ } else {
+ String msg = "[Service] " + serviceName + " cannot be found in the Topology";
+ throw new TopologyInConsistentException(msg);
+
+ }
+
+
+ clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
+ if (clusterMonitor instanceof VMClusterMonitor) {
+ return (VMClusterMonitor) clusterMonitor;
+ } else if (clusterMonitor != null) {
+ log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+ }
+ }
+
+
+ private static Properties convertMemberPropsToMemberContextProps(
+ java.util.Properties properties) {
+ Properties props = new Properties();
+ for (Map.Entry<Object, Object> e : properties.entrySet()) {
+ Property prop = new Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ props.addProperties(prop);
+ }
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/e1f37d63/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
new file mode 100644
index 0000000..f043f51
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.monitor.cluster;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.Monitor;
+import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
+import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent;
+import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
+/*
+ * Every cluster monitor, which are monitoring a cluster, should extend this class.
+ */
+public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
+
+ private String clusterId;
+ private String serviceId;
+ protected ClusterStatus status;
+ private int monitoringIntervalMilliseconds;
+
+ protected FactHandle minCheckFactHandle;
+ protected FactHandle scaleCheckFactHandle;
+ private StatefulKnowledgeSession minCheckKnowledgeSession;
+ private StatefulKnowledgeSession scaleCheckKnowledgeSession;
+ private boolean isDestroyed;
+
+ private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+ protected boolean hasFaultyMember = false;
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ protected AbstractClusterMonitor(String clusterId, String serviceId,
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+
+ super();
+ this.clusterId = clusterId;
+ this.serviceId = serviceId;
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
+ this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
+ }
+
+ protected abstract void readConfigurations();
+
+ public void startScheduler() {
+ scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ protected void stopScheduler() {
+ scheduler.shutdownNow();
+ }
+
+ protected abstract void monitor();
+
+ public abstract void destroy();
+
+ //handle health events
+ public abstract void handleAverageLoadAverageEvent(
+ AverageLoadAverageEvent averageLoadAverageEvent);
+
+ public abstract void handleGradientOfLoadAverageEvent(
+ GradientOfLoadAverageEvent gradientOfLoadAverageEvent);
+
+ public abstract void handleSecondDerivativeOfLoadAverageEvent(
+ SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleAverageMemoryConsumptionEvent(
+ AverageMemoryConsumptionEvent averageMemoryConsumptionEvent);
+
+ public abstract void handleGradientOfMemoryConsumptionEvent(
+ GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent);
+
+ public abstract void handleSecondDerivativeOfMemoryConsumptionEvent(
+ SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent);
+
+ public abstract void handleAverageRequestsInFlightEvent(
+ AverageRequestsInFlightEvent averageRequestsInFlightEvent);
+
+ public abstract void handleGradientOfRequestsInFlightEvent(
+ GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent);
+
+ public abstract void handleSecondDerivativeOfRequestsInFlightEvent(
+ SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent);
+
+ public abstract void handleMemberAverageMemoryConsumptionEvent(
+ MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent);
+
+ public abstract void handleMemberGradientOfMemoryConsumptionEvent(
+ MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent);
+
+ public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent(
+ MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent);
+
+
+ public abstract void handleMemberAverageLoadAverageEvent(
+ MemberAverageLoadAverageEvent memberAverageLoadAverageEvent);
+
+ public abstract void handleMemberGradientOfLoadAverageEvent(
+ MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent);
+
+ public abstract void handleMemberSecondDerivativeOfLoadAverageEvent(
+ MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent);
+
+ public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent);
+
+ //handle topology events
+ public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent);
+
+ public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent);
+
+ public abstract void handleMemberMaintenanceModeEvent(
+ MemberMaintenanceModeEvent maintenanceModeEvent);
+
+ public abstract void handleMemberReadyToShutdownEvent(
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent);
+
+ public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
+
+ public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
+
+ public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public void setStatus(ClusterStatus status) {
+ this.status = status;
+ }
+
+ public ClusterStatus getStatus() {
+ return status;
+ }
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public void setServiceId(String serviceId) {
+ this.serviceId = serviceId;
+ }
+
+ public int getMonitorIntervalMilliseconds() {
+ return monitoringIntervalMilliseconds;
+ }
+
+ public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) {
+ this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds;
+ }
+
+ public FactHandle getMinCheckFactHandle() {
+ return minCheckFactHandle;
+ }
+
+ public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
+ this.minCheckFactHandle = minCheckFactHandle;
+ }
+
+ public FactHandle getScaleCheckFactHandle() {
+ return scaleCheckFactHandle;
+ }
+
+ public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) {
+ this.scaleCheckFactHandle = scaleCheckFactHandle;
+ }
+
+ public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
+ return minCheckKnowledgeSession;
+ }
+
+ public void setMinCheckKnowledgeSession(
+ StatefulKnowledgeSession minCheckKnowledgeSession) {
+ this.minCheckKnowledgeSession = minCheckKnowledgeSession;
+ }
+
+ public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
+ return scaleCheckKnowledgeSession;
+ }
+
+ public void setScaleCheckKnowledgeSession(
+ StatefulKnowledgeSession scaleCheckKnowledgeSession) {
+ this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession;
+ }
+
+ public boolean isDestroyed() {
+ return isDestroyed;
+ }
+
+ public void setDestroyed(boolean isDestroyed) {
+ this.isDestroyed = isDestroyed;
+ }
+
+ public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
+ return autoscalerRuleEvaluator;
+ }
+
+ public void setAutoscalerRuleEvaluator(
+ AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
+ this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
+ }
+
+
+ @Override
+ public void onParentEvent(MonitorStatusEvent statusEvent) {
+ // send the ClusterTerminating event
+ if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+ ApplicationStatus.Terminating) {
+ StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
+ }
+ }
+
+ @Override
+ public void onChildEvent(MonitorStatusEvent statusEvent) {
+
+ }
+
+ @Override
+ public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
+
+ }
+
+ @Override
+ public void onEvent(MonitorScalingEvent scalingEvent) {
+
+ }
+
+ public void setHasFaultyMember(boolean hasFaultyMember) {
+ this.hasFaultyMember = hasFaultyMember;
+ }
+
+ public boolean isHasFaultyMember() {
+ return hasFaultyMember;
+ }
+
+ public abstract void terminateAllMembers();
+}