You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/17 21:26:00 UTC

[4/4] git commit: Updated TopologyEventReceiver, TenantEventReceiver, InstanceNotifierEventReceiver, HealthStatEventReceiver and implemented separate message queues for each receiver instance

Updated TopologyEventReceiver, TenantEventReceiver, InstanceNotifierEventReceiver, HealthStatEventReceiver and implemented separate message queues for each receiver instance


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/bccad5be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/bccad5be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/bccad5be

Branch: refs/heads/master
Commit: bccad5be0cb6cd09940c13784a4e1634713fbb30
Parents: a04e657
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Apr 18 00:46:03 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Apr 18 00:46:03 2014 +0530

----------------------------------------------------------------------
 .../internal/AutoscalerServerComponent.java     |  16 +-
 .../AutoscalerHealthStatEventReceiver.java      | 763 ++++++++++++++++++
 .../health/AutoscalerHealthStatReceiver.java    | 774 -------------------
 .../AutoscalerTopologyEventReceiver.java        | 494 ++++++++++++
 .../topology/AutoscalerTopologyReceiver.java    | 502 ------------
 .../topology/TopologyEventMessageListener.java  |  53 --
 .../topology/TopologyEventMessageQueue.java     |  38 -
 .../stratos/cartridge/agent/CartridgeAgent.java |  29 +-
 .../extension/api/LoadBalancerExtension.java    |  36 +-
 .../LoadBalancerTenantEventReceiver.java        | 202 +++++
 .../balancer/LoadBalancerTenantReceiver.java    | 210 -----
 .../LoadBalancerTopologyEventReceiver.java      | 208 +++++
 .../balancer/LoadBalancerTopologyReceiver.java  | 216 ------
 .../internal/LoadBalancerServiceComponent.java  |  12 +-
 .../internal/ADCManagementServerComponent.java  |  10 +-
 .../StratosManagerTopologyEventReceiver.java    | 297 +++++++
 .../StratosManagerTopologyReceiver.java         | 322 --------
 .../stat/HealthStatEventMessageDelegator.java   |  14 +-
 .../stat/HealthStatEventMessageListener.java    |   9 +-
 .../stat/HealthStatEventMessageQueue.java       |  17 +-
 .../health/stat/HealthStatEventReceiver.java    |  87 +++
 .../health/stat/HealthStatReceiver.java         |  82 --
 .../InstanceNotifierEventMessageDelegator.java  |  13 +-
 .../InstanceNotifierEventMessageListener.java   |  10 +-
 .../InstanceNotifierEventMessageQueue.java      |  17 +-
 .../InstanceNotifierEventMessageReceiver.java   |  86 ---
 .../notifier/InstanceNotifierEventReceiver.java |  90 +++
 .../tenant/TenantEventMessageDelegator.java     |  14 +-
 .../tenant/TenantEventMessageListener.java      |  10 +-
 .../tenant/TenantEventMessageQueue.java         |  17 +-
 .../receiver/tenant/TenantEventReceiver.java    |  87 +++
 .../message/receiver/tenant/TenantReceiver.java |  83 --
 .../topology/TopologyEventMessageDelegator.java |  27 +-
 .../topology/TopologyEventMessageListener.java  |  17 +-
 .../topology/TopologyEventMessageQueue.java     |  20 +-
 .../topology/TopologyEventReceiver.java         |  87 +++
 .../receiver/topology/TopologyReceiver.java     |  99 ---
 37 files changed, 2436 insertions(+), 2632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index ac637ab..4823057 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -23,8 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
 import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.AutoScalerException;
-import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatReceiver;
-import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyReceiver;
+import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver;
+import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyEventReceiver;
 import org.apache.stratos.autoscaler.partition.PartitionManager;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
@@ -51,14 +51,14 @@ import java.util.List;
 public class AutoscalerServerComponent {
 
     private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
-    AutoscalerTopologyReceiver asTopologyReceiver;
+    AutoscalerTopologyEventReceiver asTopologyReceiver;
 //    TopicSubscriber healthStatTopicSubscriber;
-    AutoscalerHealthStatReceiver autoscalerHealthStatReceiver;
+    AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
 
     protected void activate(ComponentContext componentContext) throws Exception {
         try {
             // Start topology receiver
-        	asTopologyReceiver = new AutoscalerTopologyReceiver();
+        	asTopologyReceiver = new AutoscalerTopologyEventReceiver();
             Thread topologyTopicSubscriberThread = new Thread(asTopologyReceiver);
             topologyTopicSubscriberThread.start();
             if (log.isDebugEnabled()) {
@@ -74,8 +74,8 @@ public class AutoscalerServerComponent {
 
 
             // Start health stat receiver
-            autoscalerHealthStatReceiver = new AutoscalerHealthStatReceiver();
-            Thread healthDelegatorThread = new Thread(autoscalerHealthStatReceiver);
+            autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
+            Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
             healthDelegatorThread.start();
             if (log.isDebugEnabled()) {
                 log.debug("Health message processor thread started");
@@ -121,7 +121,7 @@ public class AutoscalerServerComponent {
 
     protected void deactivate(ComponentContext context) {
     	asTopologyReceiver.terminate();
-    	autoscalerHealthStatReceiver.terminate();
+    	autoscalerHealthStatEventReceiver.terminate();
     }
     
     protected void setRegistryService(RegistryService registryService) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
new file mode 100644
index 0000000..2bcfb52
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.message.receiver.health;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
+import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.listener.health.stat.*;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+
+/**
+ * A thread for processing topology messages and updating the topology data structure.
+ */
+public class AutoscalerHealthStatEventReceiver implements Runnable {
+
+    private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
+    private boolean terminated = false;
+
+    private HealthStatEventReceiver healthStatEventReceiver;
+
+    public AutoscalerHealthStatEventReceiver() {
+		this.healthStatEventReceiver = new HealthStatEventReceiver();
+        addEventListeners();
+    }
+
+    @Override
+    public void run() {
+        //FIXME this activated before autoscaler deployer activated.
+        try {
+            Thread.sleep(15000);
+        } catch (InterruptedException ignore) {
+        }
+        Thread thread = new Thread(healthStatEventReceiver);
+        thread.start();
+        if(log.isInfoEnabled()) {
+            log.info("Autoscaler health stat event receiver thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated){
+        	try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if(log.isInfoEnabled()) {
+            log.info("Autoscaler health stat event receiver thread terminated");
+        }
+    }
+
+    private void addEventListeners() {
+        // Listen to health stat events that affect clusters
+        healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setAverageLoadAverage(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+                AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setAverageMemoryConsumption(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+                AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+                Float floatValue = e.getValue();
+
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setAverageRequestsInFlight(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setLoadAverageGradient(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+                GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Grad of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                };
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setRequestsInFlightGradient(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
+                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+                if(loadAverage != null) {
+
+                    Float floatValue = e.getValue();
+                    loadAverage.setAverage(floatValue);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", e.getMemberId()
+                                , floatValue));
+                    }
+                }
+
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
+                MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+                if(memoryConsumption != null) {
+
+                    Float floatValue = e.getValue();
+                    memoryConsumption.setAverage(floatValue);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member avg Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
+                                floatValue));
+                    }
+                }
+
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberFaultEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                MemberFaultEvent e = (MemberFaultEvent) event;
+                String clusterId = e.getClusterId();
+                String memberId = e.getMemberId();
+
+                if (memberId == null || memberId.isEmpty()) {
+                    if(log.isErrorEnabled()) {
+                        log.error("Member id not found in received message");
+                    }
+                } else {
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member fault event: [member] %s ", e.getMemberId()));
+                    }
+                    handleMemberFaultEvent(clusterId, memberId);
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
+                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+                if(loadAverage != null) {
+
+                    Float floatValue = e.getValue();
+                    loadAverage.setGradient(floatValue);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member grad of load avg event: [member] %s [value] %s", e.getMemberId(),
+                                floatValue));
+                    }
+                }
+
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
+                MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+                if(memoryConsumption != null) {
+
+                    Float floatValue = e.getValue();
+                    memoryConsumption.setGradient(floatValue);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member grad of Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
+                                floatValue));
+                    }
+                }
+
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
+                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+                if(loadAverage != null) {
+
+                    Float floatValue = e.getValue();
+                    loadAverage.setSecondDerivative(floatValue);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Member Second Derivation of load avg event: [member] %s [value] %s", e.getMemberId()
+                                , floatValue));
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+                SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Second Derivation of load avg event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+                SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+
+            }
+
+        });
+        healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
+            @Override
+            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+                SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
+                String clusterId = e.getClusterId();
+                String networkPartitionId = e.getNetworkPartitionId();
+                Float floatValue = e.getValue();
+
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Second derivative of Rif event: [cluster] %s [network-partition] %s [value] %s",
+                            clusterId, networkPartitionId, floatValue));
+                }
+                AutoscalerContext asCtx = AutoscalerContext.getInstance();
+                AbstractMonitor monitor;
+
+                if(asCtx.monitorExist(clusterId)){
+                    monitor = asCtx.getMonitor(clusterId);
+                }else if(asCtx.lbMonitorExist(clusterId)){
+                    monitor = asCtx.getLBMonitor(clusterId);
+                }else{
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                    }
+                    return;
+                }
+                if(null != monitor){
+                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+                    if(null != networkPartitionContext){
+                        networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+                    } else {
+                        if(log.isDebugEnabled()) {
+                           log.debug(String.format("Network partition context is not available for :" +
+                                   " [network partition] %s", networkPartitionId));
+                        }
+                    }
+                }
+            }
+
+        });
+    }
+
+
+    private LoadAverage findLoadAverage(String memberId) {
+//        String memberId = event.getProperties().get("member_id");
+        Member member = findMember(memberId);
+        
+        if(null == member){
+        	if(log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
+            }
+        	return null;
+        }
+        String clusterId = member.getClusterId();
+
+        AutoscalerContext asCtx = AutoscalerContext.getInstance();
+        AbstractMonitor monitor;
+
+        if(asCtx.monitorExist(clusterId)){
+            monitor = asCtx.getMonitor(clusterId);
+        }else if(asCtx.lbMonitorExist(clusterId)){
+            monitor = asCtx.getLBMonitor(clusterId);
+        }else{
+            if(log.isDebugEnabled()){
+                log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+            }
+            return null;
+        }
+        String networkPartitionId = findNetworkPartitionId(memberId);
+        MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+                        .getPartitionCtxt(member.getPartitionId())
+                        .getMemberStatsContext(memberId);
+        if(null == memberStatsContext){
+            if(log.isDebugEnabled()) {
+               log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return null;
+        }
+        else if(!member.isActive()){
+            if(log.isDebugEnabled()){
+                log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+                        " the health stat", memberId));
+            }
+            return null;
+        }
+
+        LoadAverage loadAverage = memberStatsContext.getLoadAverage();
+        return loadAverage;
+    }
+
+    private MemoryConsumption findMemoryConsumption(String memberId) {
+//        String memberId = event.getProperties().get("member_id");
+        Member member = findMember(memberId);
+        
+        if(null == member){
+        	if(log.isDebugEnabled()) {
+                log.debug(String.format("Member not found in the Topology : [member] %s", memberId));
+            }
+        	return null;
+        }
+        String clusterId = member.getClusterId();
+        AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
+        if(null == monitor){
+
+            monitor = AutoscalerContext.getInstance().getLBMonitor(member.getClusterId());
+            if(null == monitor){
+                if(log.isDebugEnabled()) {
+                   log.debug(String.format("Cluster monitor is not available for : [member] %s", memberId));
+                }
+            }
+            return null;
+        }
+
+        String networkPartitionId = findNetworkPartitionId(memberId);
+        MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+                        .getPartitionCtxt(member.getPartitionId())
+                        .getMemberStatsContext(memberId);
+        if(null == memberStatsContext){
+            if(log.isDebugEnabled()) {
+               log.debug(String.format("Member context is not available for : [member] %s", memberId));
+            }
+            return null;
+        }else if(!member.isActive()){
+            if(log.isDebugEnabled()){
+                log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+                        " the health stat", memberId));
+            }
+            return null;
+        }
+        MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
+
+        return memoryConsumption;
+    }
+
+    private String findNetworkPartitionId(String memberId) {
+        for(Service service: TopologyManager.getTopology().getServices()){
+            for(Cluster cluster: service.getClusters()){
+                if(cluster.memberExists(memberId)){
+                    return cluster.getMember(memberId).getNetworkPartitionId();
+                }
+            }
+        }
+        return null;
+    }
+
+    private Member findMember(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for(Service service : TopologyManager.getTopology().getServices()) {
+                for(Cluster cluster : service.getClusters()) {
+                    if(cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        }
+        finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    private void handleMemberFaultEvent(String clusterId, String memberId) {
+        try {
+        	AutoscalerContext asCtx = AutoscalerContext.getInstance();
+        	AbstractMonitor monitor;
+        	
+        	if(asCtx.monitorExist(clusterId)){
+        		monitor = asCtx.getMonitor(clusterId);
+        	}else if(asCtx.lbMonitorExist(clusterId)){
+        		monitor = asCtx.getLBMonitor(clusterId);
+        	}else{
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
+                }
+                return;
+        	}
+        	
+        	NetworkPartitionContext nwPartitionCtxt;
+            try{
+            	TopologyManager.acquireReadLock();
+            	Member member = findMember(memberId);
+            	
+            	if(null == member){
+            		return;
+            	}
+                if(!member.isActive()){
+                    if(log.isDebugEnabled()){
+                        log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+                                " the member fault health stat", memberId));
+                    }
+                    return;
+                }
+	            
+	            nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
+	            
+            }finally{
+            	TopologyManager.releaseReadLock();
+            }
+            // start a new member in the same Partition
+            String partitionId = monitor.getPartitionOfMember(memberId);
+            Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
+            PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+            if(!partitionCtxt.activeMemberExist(memberId)){
+                if(log.isDebugEnabled()){
+                    log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId));
+                }
+                return;
+            }
+            // terminate the faulty member
+            CloudControllerClient ccClient = CloudControllerClient.getInstance();
+            ccClient.terminate(memberId);
+
+            // remove from active member list
+            partitionCtxt.removeActiveMemberById(memberId);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ",
+                                       memberId, partitionId, clusterId));
+            }
+
+
+        } catch (TerminationException e) {
+            log.error(e);
+        }
+    }
+
+    public void terminate(){
+    	this.terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
deleted file mode 100644
index 288410b..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
+++ /dev/null
@@ -1,774 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.SpawningException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
-import org.apache.stratos.autoscaler.policy.model.LoadAverage;
-import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.health.stat.*;
-import org.apache.stratos.messaging.listener.health.stat.*;
-import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatReceiver;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-
-/**
- * A thread for processing topology messages and updating the topology data structure.
- */
-public class AutoscalerHealthStatReceiver implements Runnable {
-
-    private static final Log log = LogFactory.getLog(AutoscalerHealthStatReceiver.class);
-    private boolean terminated = false;
-
-    private HealthStatReceiver healthStatReceiver;
-
-    public AutoscalerHealthStatReceiver() {
-		this.healthStatReceiver = new HealthStatReceiver(createMessageDelegator());
-    }
-
-    @Override
-    public void run() {
-        //FIXME this activated before autoscaler deployer activated.
-        try {
-            Thread.sleep(15000);
-        } catch (InterruptedException ignore) {
-        }
-        Thread thread = new Thread(healthStatReceiver);
-        thread.start();
-        if(log.isInfoEnabled()) {
-            log.info("Autoscaler health stat receiver thread started");
-        }
-
-        // Keep the thread live until terminated
-        while (!terminated){
-        	try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        if(log.isInfoEnabled()) {
-            log.info("Autoscaler health stat receiver thread terminated");
-        }
-    }
-
-    private HealthStatEventMessageDelegator createMessageDelegator() {
-        HealthStatMessageProcessorChain processorChain = createEventProcessorChain();
-        return new HealthStatEventMessageDelegator(processorChain);
-    }
-
-    private HealthStatMessageProcessorChain createEventProcessorChain() {
-        // Listen to health stat events that affect clusters
-        HealthStatMessageProcessorChain processorChain = new HealthStatMessageProcessorChain();
-        processorChain.addEventListener(new AverageLoadAverageEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setAverageLoadAverage(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-
-            }
-
-        });
-        processorChain.addEventListener(new AverageMemoryConsumptionEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setAverageMemoryConsumption(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new AverageRequestsInFlightEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-                Float floatValue = e.getValue();
-
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setAverageRequestsInFlight(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new GradientOfLoadAverageEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setLoadAverageGradient(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new GradientOfMemoryConsumptionEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Grad of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                };
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setMemoryConsumptionGradient(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new GradientOfRequestsInFlightEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setRequestsInFlightGradient(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new MemberAverageLoadAverageEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
-                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
-                if(loadAverage != null) {
-
-                    Float floatValue = e.getValue();
-                    loadAverage.setAverage(floatValue);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", e.getMemberId()
-                                , floatValue));
-                    }
-                }
-
-            }
-
-        });
-        processorChain.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
-                MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
-                if(memoryConsumption != null) {
-
-                    Float floatValue = e.getValue();
-                    memoryConsumption.setAverage(floatValue);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member avg Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
-                                floatValue));
-                    }
-                }
-
-            }
-
-        });
-        processorChain.addEventListener(new MemberFaultEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberFaultEvent e = (MemberFaultEvent) event;
-                String clusterId = e.getClusterId();
-                String memberId = e.getMemberId();
-
-                if (memberId == null || memberId.isEmpty()) {
-                    if(log.isErrorEnabled()) {
-                        log.error("Member id not found in received message");
-                    }
-                } else {
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member fault event: [member] %s ", e.getMemberId()));
-                    }
-                    handleMemberFaultEvent(clusterId, memberId);
-                }
-            }
-
-        });
-        processorChain.addEventListener(new MemberGradientOfLoadAverageEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
-                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
-                if(loadAverage != null) {
-
-                    Float floatValue = e.getValue();
-                    loadAverage.setGradient(floatValue);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member grad of load avg event: [member] %s [value] %s", e.getMemberId(),
-                                floatValue));
-                    }
-                }
-
-            }
-
-        });
-        processorChain.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
-                MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
-                if(memoryConsumption != null) {
-
-                    Float floatValue = e.getValue();
-                    memoryConsumption.setGradient(floatValue);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member grad of Memory Consumption event: [member] %s [value] %s", e.getMemberId(),
-                                floatValue));
-                    }
-                }
-
-            }
-
-        });
-        processorChain.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
-                LoadAverage loadAverage = findLoadAverage(e.getMemberId());
-                if(loadAverage != null) {
-
-                    Float floatValue = e.getValue();
-                    loadAverage.setSecondDerivative(floatValue);
-
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("Member Second Derivation of load avg event: [member] %s [value] %s", e.getMemberId()
-                                , floatValue));
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-            }
-
-        });
-        processorChain.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Second Derivation of load avg event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-        processorChain.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-
-                SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-
-            }
-
-        });
-        processorChain.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
-            @Override
-            protected void onEvent(org.apache.stratos.messaging.event.Event event) {
-                SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
-                String clusterId = e.getClusterId();
-                String networkPartitionId = e.getNetworkPartitionId();
-                Float floatValue = e.getValue();
-
-                if (log.isDebugEnabled()) {
-                    log.debug(String.format("Second derivative of Rif event: [cluster] %s [network-partition] %s [value] %s",
-                            clusterId, networkPartitionId, floatValue));
-                }
-                AutoscalerContext asCtx = AutoscalerContext.getInstance();
-                AbstractMonitor monitor;
-
-                if(asCtx.monitorExist(clusterId)){
-                    monitor = asCtx.getMonitor(clusterId);
-                }else if(asCtx.lbMonitorExist(clusterId)){
-                    monitor = asCtx.getLBMonitor(clusterId);
-                }else{
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                    }
-                    return;
-                }
-                if(null != monitor){
-                    NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-                    if(null != networkPartitionContext){
-                        networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
-                    } else {
-                        if(log.isDebugEnabled()) {
-                           log.debug(String.format("Network partition context is not available for :" +
-                                   " [network partition] %s", networkPartitionId));
-                        }
-                    }
-                }
-            }
-
-        });
-
-        return processorChain;
-    }
-
-
-    private LoadAverage findLoadAverage(String memberId) {
-//        String memberId = event.getProperties().get("member_id");
-        Member member = findMember(memberId);
-        
-        if(null == member){
-        	if(log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
-            }
-        	return null;
-        }
-        String clusterId = member.getClusterId();
-
-        AutoscalerContext asCtx = AutoscalerContext.getInstance();
-        AbstractMonitor monitor;
-
-        if(asCtx.monitorExist(clusterId)){
-            monitor = asCtx.getMonitor(clusterId);
-        }else if(asCtx.lbMonitorExist(clusterId)){
-            monitor = asCtx.getLBMonitor(clusterId);
-        }else{
-            if(log.isDebugEnabled()){
-                log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-            }
-            return null;
-        }
-        String networkPartitionId = findNetworkPartitionId(memberId);
-        MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
-                        .getPartitionCtxt(member.getPartitionId())
-                        .getMemberStatsContext(memberId);
-        if(null == memberStatsContext){
-            if(log.isDebugEnabled()) {
-               log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return null;
-        }
-        else if(!member.isActive()){
-            if(log.isDebugEnabled()){
-                log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
-                        " the health stat", memberId));
-            }
-            return null;
-        }
-
-        LoadAverage loadAverage = memberStatsContext.getLoadAverage();
-        return loadAverage;
-    }
-
-    private MemoryConsumption findMemoryConsumption(String memberId) {
-//        String memberId = event.getProperties().get("member_id");
-        Member member = findMember(memberId);
-        
-        if(null == member){
-        	if(log.isDebugEnabled()) {
-                log.debug(String.format("Member not found in the Topology : [member] %s", memberId));
-            }
-        	return null;
-        }
-        String clusterId = member.getClusterId();
-        AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
-        if(null == monitor){
-
-            monitor = AutoscalerContext.getInstance().getLBMonitor(member.getClusterId());
-            if(null == monitor){
-                if(log.isDebugEnabled()) {
-                   log.debug(String.format("Cluster monitor is not available for : [member] %s", memberId));
-                }
-            }
-            return null;
-        }
-
-        String networkPartitionId = findNetworkPartitionId(memberId);
-        MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
-                        .getPartitionCtxt(member.getPartitionId())
-                        .getMemberStatsContext(memberId);
-        if(null == memberStatsContext){
-            if(log.isDebugEnabled()) {
-               log.debug(String.format("Member context is not available for : [member] %s", memberId));
-            }
-            return null;
-        }else if(!member.isActive()){
-            if(log.isDebugEnabled()){
-                log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
-                        " the health stat", memberId));
-            }
-            return null;
-        }
-        MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
-        return memoryConsumption;
-    }
-
-    private String findNetworkPartitionId(String memberId) {
-        for(Service service: TopologyManager.getTopology().getServices()){
-            for(Cluster cluster: service.getClusters()){
-                if(cluster.memberExists(memberId)){
-                    return cluster.getMember(memberId).getNetworkPartitionId();
-                }
-            }
-        }
-        return null;
-    }
-
-    private Member findMember(String memberId) {
-        try {
-            TopologyManager.acquireReadLock();
-            for(Service service : TopologyManager.getTopology().getServices()) {
-                for(Cluster cluster : service.getClusters()) {
-                    if(cluster.memberExists(memberId)) {
-                        return cluster.getMember(memberId);
-                    }
-                }
-            }
-            return null;
-        }
-        finally {
-            TopologyManager.releaseReadLock();
-        }
-    }
-
-    private void handleMemberFaultEvent(String clusterId, String memberId) {
-        try {
-        	AutoscalerContext asCtx = AutoscalerContext.getInstance();
-        	AbstractMonitor monitor;
-        	
-        	if(asCtx.monitorExist(clusterId)){
-        		monitor = asCtx.getMonitor(clusterId);
-        	}else if(asCtx.lbMonitorExist(clusterId)){
-        		monitor = asCtx.getLBMonitor(clusterId);
-        	}else{
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId));
-                }
-                return;
-        	}
-        	
-        	NetworkPartitionContext nwPartitionCtxt;
-            try{
-            	TopologyManager.acquireReadLock();
-            	Member member = findMember(memberId);
-            	
-            	if(null == member){
-            		return;
-            	}
-                if(!member.isActive()){
-                    if(log.isDebugEnabled()){
-                        log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
-                                " the member fault health stat", memberId));
-                    }
-                    return;
-                }
-	            
-	            nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
-	            
-            }finally{
-            	TopologyManager.releaseReadLock();
-            }
-            // start a new member in the same Partition
-            String partitionId = monitor.getPartitionOfMember(memberId);
-            Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
-            PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
-            if(!partitionCtxt.activeMemberExist(memberId)){
-                if(log.isDebugEnabled()){
-                    log.debug(String.format("Could not find the active member in partition context, [member] %s ", memberId));
-                }
-                return;
-            }
-            // terminate the faulty member
-            CloudControllerClient ccClient = CloudControllerClient.getInstance();
-            ccClient.terminate(memberId);
-
-            // remove from active member list
-            partitionCtxt.removeActiveMemberById(memberId);
-
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Faulty member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ",
-                                       memberId, partitionId, clusterId));
-            }
-
-
-        } catch (TerminationException e) {
-            log.error(e);
-        }
-    }
-
-    public void terminate(){
-    	this.terminated = true;
-    }
-}