You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/17 21:26:00 UTC
[4/4] git commit: Updated TopologyEventReceiver, TenantEventReceiver,
InstanceNotifierEventReceiver,
HealthStatEventReceiver and implemented separate message queues for each
receiver instance
Updated TopologyEventReceiver, TenantEventReceiver, InstanceNotifierEventReceiver, HealthStatEventReceiver and implemented separate message queues for each receiver instance
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/bccad5be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/bccad5be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/bccad5be
Branch: refs/heads/master
Commit: bccad5be0cb6cd09940c13784a4e1634713fbb30
Parents: a04e657
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Apr 18 00:46:03 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Apr 18 00:46:03 2014 +0530
----------------------------------------------------------------------
.../internal/AutoscalerServerComponent.java | 16 +-
.../AutoscalerHealthStatEventReceiver.java | 763 ++++++++++++++++++
.../health/AutoscalerHealthStatReceiver.java | 774 -------------------
.../AutoscalerTopologyEventReceiver.java | 494 ++++++++++++
.../topology/AutoscalerTopologyReceiver.java | 502 ------------
.../topology/TopologyEventMessageListener.java | 53 --
.../topology/TopologyEventMessageQueue.java | 38 -
.../stratos/cartridge/agent/CartridgeAgent.java | 29 +-
.../extension/api/LoadBalancerExtension.java | 36 +-
.../LoadBalancerTenantEventReceiver.java | 202 +++++
.../balancer/LoadBalancerTenantReceiver.java | 210 -----
.../LoadBalancerTopologyEventReceiver.java | 208 +++++
.../balancer/LoadBalancerTopologyReceiver.java | 216 ------
.../internal/LoadBalancerServiceComponent.java | 12 +-
.../internal/ADCManagementServerComponent.java | 10 +-
.../StratosManagerTopologyEventReceiver.java | 297 +++++++
.../StratosManagerTopologyReceiver.java | 322 --------
.../stat/HealthStatEventMessageDelegator.java | 14 +-
.../stat/HealthStatEventMessageListener.java | 9 +-
.../stat/HealthStatEventMessageQueue.java | 17 +-
.../health/stat/HealthStatEventReceiver.java | 87 +++
.../health/stat/HealthStatReceiver.java | 82 --
.../InstanceNotifierEventMessageDelegator.java | 13 +-
.../InstanceNotifierEventMessageListener.java | 10 +-
.../InstanceNotifierEventMessageQueue.java | 17 +-
.../InstanceNotifierEventMessageReceiver.java | 86 ---
.../notifier/InstanceNotifierEventReceiver.java | 90 +++
.../tenant/TenantEventMessageDelegator.java | 14 +-
.../tenant/TenantEventMessageListener.java | 10 +-
.../tenant/TenantEventMessageQueue.java | 17 +-
.../receiver/tenant/TenantEventReceiver.java | 87 +++
.../message/receiver/tenant/TenantReceiver.java | 83 --
.../topology/TopologyEventMessageDelegator.java | 27 +-
.../topology/TopologyEventMessageListener.java | 17 +-
.../topology/TopologyEventMessageQueue.java | 20 +-
.../topology/TopologyEventReceiver.java | 87 +++
.../receiver/topology/TopologyReceiver.java | 99 ---
37 files changed, 2436 insertions(+), 2632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index ac637ab..4823057 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -23,8 +23,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.AutoScalerException;
-import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatReceiver;
-import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyReceiver;
+import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver;
+import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyEventReceiver;
import org.apache.stratos.autoscaler.partition.PartitionManager;
import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
@@ -51,14 +51,14 @@ import java.util.List;
public class AutoscalerServerComponent {
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
- AutoscalerTopologyReceiver asTopologyReceiver;
+ AutoscalerTopologyEventReceiver asTopologyReceiver;
// TopicSubscriber healthStatTopicSubscriber;
- AutoscalerHealthStatReceiver autoscalerHealthStatReceiver;
+ AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
try {
// Start topology receiver
- asTopologyReceiver = new AutoscalerTopologyReceiver();
+ asTopologyReceiver = new AutoscalerTopologyEventReceiver();
Thread topologyTopicSubscriberThread = new Thread(asTopologyReceiver);
topologyTopicSubscriberThread.start();
if (log.isDebugEnabled()) {
@@ -74,8 +74,8 @@ public class AutoscalerServerComponent {
// Start health stat receiver
- autoscalerHealthStatReceiver = new AutoscalerHealthStatReceiver();
- Thread healthDelegatorThread = new Thread(autoscalerHealthStatReceiver);
+ autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
+ Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
healthDelegatorThread.start();
if (log.isDebugEnabled()) {
log.debug("Health message processor thread started");
@@ -121,7 +121,7 @@ public class AutoscalerServerComponent {
protected void deactivate(ComponentContext context) {
asTopologyReceiver.terminate();
- autoscalerHealthStatReceiver.terminate();
+ autoscalerHealthStatEventReceiver.terminate();
}
protected void setRegistryService(RegistryService registryService) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
new file mode 100644
index 0000000..2bcfb52
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.message.receiver.health;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.listener.health.stat.*;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+
+/**
+ * A thread for processing topology messages and updating the topology data structure.
+ */
+public class AutoscalerHealthStatEventReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
+ private boolean terminated = false;
+
+ private HealthStatEventReceiver healthStatEventReceiver;
+
+ public AutoscalerHealthStatEventReceiver() {
+ this.healthStatEventReceiver = new HealthStatEventReceiver();
+ addEventListeners();
+ }
+
+ @Override
+ public void run() {
+ //FIXME this activated before autoscaler deployer activated.
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException ignore) {
+ }
+ Thread thread = new Thread(healthStatEventReceiver);
+ thread.start();
+ if(log.isInfoEnabled()) {
+ log.info("Autoscaler health stat event receiver thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated){
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ if(log.isInfoEnabled()) {
+ log.info("Autoscaler health stat event receiver thread terminated");
+ }
+ }
+
+ private void addEventListeners() {
+ // Listen to health stat events that affect clusters
+ healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageLoadAverage(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageMemoryConsumption(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+ Float floatValue = e.getValue();
+
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageRequestsInFlight(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setLoadAverageGradient(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ };
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setRequestsInFlightGradient(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", e.getMemberId()
+ , floatValue));
+ }
+ }
+
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
+ MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+ if(memoryConsumption != null) {
+
+ Float floatValue = e.getValue();
+ memoryConsumption.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member avg Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberFaultEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberFaultEvent e = (MemberFaultEvent) event;
+ String clusterId = e.getClusterId();
+ String memberId = e.getMemberId();
+
+ if (memberId == null || memberId.isEmpty()) {
+ if(log.isErrorEnabled()) {
+ log.error("Member id not found in received message");
+ }
+ } else {
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member fault event: [member] %s ", e.getMemberId()));
+ }
+ handleMemberFaultEvent(clusterId, memberId);
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member grad of load avg event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
+ MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+ if(memoryConsumption != null) {
+
+ Float floatValue = e.getValue();
+ memoryConsumption.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member grad of Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setSecondDerivative(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member Second Derivation of load avg event: [member] %s [value] %s", e.getMemberId()
+ , floatValue));
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+
+ }
+
+ });
+ healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second derivative of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ }
+ }
+
+ });
+ }
+
+
+ private LoadAverage findLoadAverage(String memberId) {
+// String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+
+ if(null == member){
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+ }
+ return null;
+ }
+ String clusterId = member.getClusterId();
+
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return null;
+ }
+ String networkPartitionId = findNetworkPartitionId(memberId);
+ MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId);
+ if(null == memberStatsContext){
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+ else if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the health stat", memberId));
+ }
+ return null;
+ }
+
+ LoadAverage loadAverage = memberStatsContext.getLoadAverage();
+ return loadAverage;
+ }
+
+ private MemoryConsumption findMemoryConsumption(String memberId) {
+// String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+
+ if(null == member){
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Member not found in the Topology : [member] %s", memberId));
+ }
+ return null;
+ }
+ String clusterId = member.getClusterId();
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
+ if(null == monitor){
+
+ monitor = AutoscalerContext.getInstance().getLBMonitor(member.getClusterId());
+ if(null == monitor){
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Cluster monitor is not available for : [member] %s", memberId));
+ }
+ }
+ return null;
+ }
+
+ String networkPartitionId = findNetworkPartitionId(memberId);
+ MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId);
+ if(null == memberStatsContext){
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return null;
+ }else if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the health stat", memberId));
+ }
+ return null;
+ }
+ MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
+
+ return memoryConsumption;
+ }
+
+ private String findNetworkPartitionId(String memberId) {
+ for(Service service: TopologyManager.getTopology().getServices()){
+ for(Cluster cluster: service.getClusters()){
+ if(cluster.memberExists(memberId)){
+ return cluster.getMember(memberId).getNetworkPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ private Member findMember(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for(Service service : TopologyManager.getTopology().getServices()) {
+ for(Cluster cluster : service.getClusters()) {
+ if(cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ private void handleMemberFaultEvent(String clusterId, String memberId) {
+ try {
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.monitorExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMonitorExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ if(log.isDebugEnabled()){
+ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+ }
+ return;
+ }
+
+ NetworkPartitionContext nwPartitionCtxt;
+ try{
+ TopologyManager.acquireReadLock();
+ Member member = findMember(memberId);
+
+ if(null == member){
+ return;
+ }
+ if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
+
+ }finally{
+ TopologyManager.releaseReadLock();
+ }
+ // start a new member in the same Partition
+ String partitionId = monitor.getPartitionOfMember(memberId);
+ Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+ if(!partitionCtxt.activeMemberExist(memberId)){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId));
+ }
+ return;
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ ccClient.terminate(memberId);
+
+ // remove from active member list
+ partitionCtxt.removeActiveMemberById(memberId);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ",
+ memberId, partitionId, clusterId));
+ }
+
+
+ } catch (TerminationException e) {
+ log.error(e);
+ }
+ }
+
+ public void terminate(){
+ this.terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
deleted file mode 100644
index 288410b..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
+++ /dev/null
@@ -1,774 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.SpawningException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
-import org.apache.stratos.autoscaler.policy.model.LoadAverage;
-import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.*;
-import org.apache.stratos.messaging.listener.health.stat.*;
-import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatReceiver;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-
-/**
- * A thread for processing topology messages and updating the topology data structure.
- */
-public class AutoscalerHealthStatReceiver implements Runnable {
-
- private static final Log log = LogFactory.getLog(AutoscalerHealthStatReceiver.class);
- private boolean terminated = false;
-
- private HealthStatReceiver healthStatReceiver;
-
- public AutoscalerHealthStatReceiver() {
- this.healthStatReceiver = new HealthStatReceiver(createMessageDelegator());
- }
-
- @Override
- public void run() {
- //FIXME this activated before autoscaler deployer activated.
- try {
- Thread.sleep(15000);
- } catch (InterruptedException ignore) {
- }
- Thread thread = new Thread(healthStatReceiver);
- thread.start();
- if(log.isInfoEnabled()) {
- log.info("Autoscaler health stat receiver thread started");
- }
-
- // Keep the thread live until terminated
- while (!terminated){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- if(log.isInfoEnabled()) {
- log.info("Autoscaler health stat receiver thread terminated");
- }
- }
-
- private HealthStatEventMessageDelegator createMessageDelegator() {
- HealthStatMessageProcessorChain processorChain = createEventProcessorChain();
- return new HealthStatEventMessageDelegator(processorChain);
- }
-
- private HealthStatMessageProcessorChain createEventProcessorChain() {
- // Listen to health stat events that affect clusters
- HealthStatMessageProcessorChain processorChain = new HealthStatMessageProcessorChain();
- processorChain.addEventListener(new AverageLoadAverageEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageLoadAverage(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- }
-
- });
- processorChain.addEventListener(new AverageMemoryConsumptionEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
-
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageMemoryConsumption(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
- processorChain.addEventListener(new AverageRequestsInFlightEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
- Float floatValue = e.getValue();
-
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageRequestsInFlight(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
- processorChain.addEventListener(new GradientOfLoadAverageEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setLoadAverageGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
- processorChain.addEventListener(new GradientOfMemoryConsumptionEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Grad of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- };
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setMemoryConsumptionGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
- processorChain.addEventListener(new GradientOfRequestsInFlightEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setRequestsInFlightGradient(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
- processorChain.addEventListener(new MemberAverageLoadAverageEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
- LoadAverage loadAverage = findLoadAverage(e.getMemberId());
- if(loadAverage != null) {
-
- Float floatValue = e.getValue();
- loadAverage.setAverage(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", e.getMemberId()
- , floatValue));
- }
- }
-
- }
-
- });
- processorChain.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
- MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
- if(memoryConsumption != null) {
-
- Float floatValue = e.getValue();
- memoryConsumption.setAverage(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member avg Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
- floatValue));
- }
- }
-
- }
-
- });
- processorChain.addEventListener(new MemberFaultEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberFaultEvent e = (MemberFaultEvent) event;
- String clusterId = e.getClusterId();
- String memberId = e.getMemberId();
-
- if (memberId == null || memberId.isEmpty()) {
- if(log.isErrorEnabled()) {
- log.error("Member id not found in received message");
- }
- } else {
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member fault event: [member] %s ", e.getMemberId()));
- }
- handleMemberFaultEvent(clusterId, memberId);
- }
- }
-
- });
- processorChain.addEventListener(new MemberGradientOfLoadAverageEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
- LoadAverage loadAverage = findLoadAverage(e.getMemberId());
- if(loadAverage != null) {
-
- Float floatValue = e.getValue();
- loadAverage.setGradient(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member grad of load avg event: [member] %s [value] %s", e.getMemberId(),
- floatValue));
- }
- }
-
- }
-
- });
- processorChain.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
- MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
- if(memoryConsumption != null) {
-
- Float floatValue = e.getValue();
- memoryConsumption.setGradient(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member grad of Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
- floatValue));
- }
- }
-
- }
-
- });
- processorChain.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
- LoadAverage loadAverage = findLoadAverage(e.getMemberId());
- if(loadAverage != null) {
-
- Float floatValue = e.getValue();
- loadAverage.setSecondDerivative(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member Second Derivation of load avg event: [member] %s [value] %s", e.getMemberId()
- , floatValue));
- }
- }
- }
-
- });
- processorChain.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- }
-
- });
- processorChain.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of load avg event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
- processorChain.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
- SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
-
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
-
- }
-
- });
- processorChain.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
- @Override
- protected void onEvent(org.apache.stratos.messaging.event.Event event) {
- SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
- String clusterId = e.getClusterId();
- String networkPartitionId = e.getNetworkPartitionId();
- Float floatValue = e.getValue();
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Second derivative of Rif event: [cluster] %s [network-partition] %s [value] %s",
- clusterId, networkPartitionId, floatValue));
- }
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
- } else {
- if(log.isDebugEnabled()) {
- log.debug(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- }
- }
-
- });
-
- return processorChain;
- }
-
-
- private LoadAverage findLoadAverage(String memberId) {
-// String memberId = event.getProperties().get("member_id");
- Member member = findMember(memberId);
-
- if(null == member){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
- }
- return null;
- }
- String clusterId = member.getClusterId();
-
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return null;
- }
- String networkPartitionId = findNetworkPartitionId(memberId);
- MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
- .getPartitionCtxt(member.getPartitionId())
- .getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }
- else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
- " the health stat", memberId));
- }
- return null;
- }
-
- LoadAverage loadAverage = memberStatsContext.getLoadAverage();
- return loadAverage;
- }
-
- private MemoryConsumption findMemoryConsumption(String memberId) {
-// String memberId = event.getProperties().get("member_id");
- Member member = findMember(memberId);
-
- if(null == member){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member not found in the Topology : [member] %s", memberId));
- }
- return null;
- }
- String clusterId = member.getClusterId();
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
- if(null == monitor){
-
- monitor = AutoscalerContext.getInstance().getLBMonitor(member.getClusterId());
- if(null == monitor){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster monitor is not available for : [member] %s", memberId));
- }
- }
- return null;
- }
-
- String networkPartitionId = findNetworkPartitionId(memberId);
- MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
- .getPartitionCtxt(member.getPartitionId())
- .getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isDebugEnabled()) {
- log.debug(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
- " the health stat", memberId));
- }
- return null;
- }
- MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
- return memoryConsumption;
- }
-
- private String findNetworkPartitionId(String memberId) {
- for(Service service: TopologyManager.getTopology().getServices()){
- for(Cluster cluster: service.getClusters()){
- if(cluster.memberExists(memberId)){
- return cluster.getMember(memberId).getNetworkPartitionId();
- }
- }
- }
- return null;
- }
-
- private Member findMember(String memberId) {
- try {
- TopologyManager.acquireReadLock();
- for(Service service : TopologyManager.getTopology().getServices()) {
- for(Cluster cluster : service.getClusters()) {
- if(cluster.memberExists(memberId)) {
- return cluster.getMember(memberId);
- }
- }
- }
- return null;
- }
- finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- private void handleMemberFaultEvent(String clusterId, String memberId) {
- try {
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.monitorExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMonitorExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- if(log.isDebugEnabled()){
- log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
- }
- return;
- }
-
- NetworkPartitionContext nwPartitionCtxt;
- try{
- TopologyManager.acquireReadLock();
- Member member = findMember(memberId);
-
- if(null == member){
- return;
- }
- if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
- " the member fault health stat", memberId));
- }
- return;
- }
-
- nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
-
- }finally{
- TopologyManager.releaseReadLock();
- }
- // start a new member in the same Partition
- String partitionId = monitor.getPartitionOfMember(memberId);
- Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
- if(!partitionCtxt.activeMemberExist(memberId)){
- if(log.isDebugEnabled()){
- log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId));
- }
- return;
- }
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // remove from active member list
- partitionCtxt.removeActiveMemberById(memberId);
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ",
- memberId, partitionId, clusterId));
- }
-
-
- } catch (TerminationException e) {
- log.error(e);
- }
- }
-
- public void terminate(){
- this.terminated = true;
- }
-}