You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/12/23 10:31:42 UTC

git commit: Moving the AutoscalerTopologyReceiver.java to correct package

Updated Branches:
  refs/heads/master a189459ce -> aa539dbdf


Moving the AutoscalerTopologyReceiver.java to correct package


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/aa539dbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/aa539dbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/aa539dbd

Branch: refs/heads/master
Commit: aa539dbdfd726fbe97e505e1135c8e3bdbe0ceeb
Parents: a189459
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Mon Dec 23 15:06:22 2013 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Mon Dec 23 15:06:22 2013 +0530

----------------------------------------------------------------------
 .../topology/AutoscalerTopologyReceiver.java    | 401 +++++++++++++++++++
 .../topology/AutoscalerTopologyReceiver.java    | 401 -------------------
 2 files changed, 401 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/aa539dbd/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
new file mode 100644
index 0000000..b216df1
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.exception.PartitionValidationException;
+import org.apache.stratos.autoscaler.exception.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
+import org.apache.stratos.autoscaler.monitor.ClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.LbClusterMonitor;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
+
+import java.util.Collection;
+
+/**
+ * Load balancer topology receiver.
+ */
+public class AutoscalerTopologyReceiver implements Runnable {
+
+    private static final Log log = LogFactory.getLog(AutoscalerTopologyReceiver.class);
+
+    private TopologyReceiver topologyReceiver;
+    private boolean terminated;
+
+    public AutoscalerTopologyReceiver() {
+		this.topologyReceiver = new TopologyReceiver(createMessageDelegator());
+    }
+
+    @Override
+    public void run() {
+        //FIXME this activated before autoscaler deployer activated.
+        try {
+            Thread.sleep(15000);
+        } catch (InterruptedException ignore) {
+        }
+        Thread thread = new Thread(topologyReceiver);
+        thread.start();
+        if(log.isInfoEnabled()) {
+            log.info("Autoscaler topology receiver thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated);
+        if(log.isInfoEnabled()) {
+            log.info("Autoscaler topology receiver thread terminated");
+        }
+    }
+
+    private TopologyEventMessageDelegator createMessageDelegator() {
+        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                try {
+                    TopologyManager.acquireReadLock();
+                    for(Service service : TopologyManager.getTopology().getServices()) {
+                        for(Cluster cluster : service.getClusters()) {
+                        	    Thread th; 
+                        		if(cluster.isLbCluster()){
+                        			th = new Thread(new LBClusterMonitorAdder(cluster));
+                        		}else{
+                        			th = new Thread(new ClusterMonitorAdder(cluster));
+                        		}
+                                
+                                th.start();
+                        }
+                    }
+                }
+                finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+
+        });
+        return new TopologyEventMessageDelegator(processorChain);
+    }
+
+    private TopologyMessageProcessorChain createEventProcessorChain() {
+        // Listen to topology events that affect clusters
+        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
+        processorChain.addEventListener(new ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    ClusterCreatedEvent e = (ClusterCreatedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
+                    Cluster cluster = service.getCluster(e.getClusterId());
+                    if (cluster.isLbCluster()) {
+                        Thread th = new Thread(new LBClusterMonitorAdder(cluster));
+                        th.start();
+                    } else {
+                        Thread th = new Thread(new ClusterMonitorAdder(cluster));
+                        th.start();
+                    }
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+
+        });
+        
+        processorChain.addEventListener(new ClusterRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    String serviceName = e.getServiceName();
+                    String clusterId = e.getClusterId();
+
+                    AbstractMonitor monitor;
+
+                    if(TopologyManager.getTopology().getService(serviceName).getCluster(clusterId).isLbCluster()){
+                        monitor = AutoscalerContext.getInstance().removeLbMonitor(clusterId);
+
+                    } else {
+                        monitor = AutoscalerContext.getInstance().removeMonitor(clusterId);
+                    }
+
+                    monitor.destroy();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId));
+                    }
+                }
+                finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+
+        });
+        
+        processorChain.addEventListener(new MemberStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+            		
+            }
+
+        });
+        
+        processorChain.addEventListener(new MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+             
+            	try {
+            		TopologyManager.acquireReadLock();
+					MemberTerminatedEvent e = (MemberTerminatedEvent) event;
+                    String networkPartitionId = e.getNetworkPartitionId();
+                    String clusterId = e.getClusterId();
+                    AbstractMonitor monitor;
+
+                    if(AutoscalerContext.getInstance().moniterExist(clusterId)){
+
+                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+                    } else {
+
+                        //This is LB member
+                        monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+                    }
+
+					NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+
+                    networkPartitionContext.getPartitionCtxt(e.getPartitionId())
+                            .removeMemberStatsContext(e.getMemberId());
+                    networkPartitionContext.decreaseMemberCountInPartitionBy(e.getPartitionId(), 1);
+//					ClusterContext clusCtx = monitor.getClusterCtxt();
+//					String networkPartitionId = monitor.
+//                    if (networkPartitionId != null) {
+//                        NetworkPartitionContext networkPartContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+//                        networkPartContext.decrementCurrentMemberCount(1);
+//                    }
+
+				} finally {
+					TopologyManager.releaseReadLock();
+				}
+            }
+
+        });
+        
+        processorChain.addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+            	try {
+					TopologyManager.acquireReadLock();
+
+					MemberActivatedEvent e = (MemberActivatedEvent)event;
+                    String memberId = e.getMemberId();
+                    String partitionId = e.getPartitionId();
+                    String networkPartitionId = e.getNetworkPartitionId();
+
+                    String serviceName = e.getServiceName();
+                    PartitionContext partitionContext;
+					String clusterId = e.getClusterId();
+                    AbstractMonitor monitor;
+
+					if(AutoscalerContext.getInstance().moniterExist(clusterId)) {
+                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+					    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+					} else {
+					    monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
+					    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
+					}
+//					ClusterContext clusCtx = monitor.getClusterCtxt();
+//                    monitor.getNetworkPartitionCtxt(e.getId()).getPartitionCtxt(partitionId);
+//                            .addMemberStatsContext(new MemberStatsContext(e.getMemberId()));
+                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
+//					PartitionContext partCtxt = monitor.getNetworkPartitionCtxt(e.getId())
+//                            .getPartitionCtxt(partitionId);
+					partitionContext.incrementCurrentMemberCount(1);
+					partitionContext.removePendingMember(memberId);
+
+				}
+                finally{
+                	TopologyManager.releaseReadLock();
+                }
+            }
+        });
+        
+        processorChain.addEventListener(new ServiceRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+//                try {
+//                    TopologyManager.acquireReadLock();
+//
+//                    // Remove all clusters of given service from context
+//                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
+//                    for(Service service : TopologyManager.getTopology().getServices()) {
+//                        for(Cluster cluster : service.getClusters()) {
+//                            removeMonitor(cluster.getHostName());
+//                        }
+//                    }
+//                }
+//                finally {
+//                    TopologyManager.releaseReadLock();
+//                }
+            }
+        });
+        return processorChain;
+    }
+    
+    private class LBClusterMonitorAdder implements Runnable {
+        private Cluster cluster;
+        
+        public LBClusterMonitorAdder(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        public void run() {
+                LbClusterMonitor monitor;
+                try {
+                    monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
+
+                } catch (PolicyValidationException e) {
+                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
+                    log.error(msg, e);
+                    throw new RuntimeException(msg, e);
+
+                } catch(PartitionValidationException e){
+                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
+                    log.error(msg, e);
+                    throw new RuntimeException(msg, e);
+                }
+
+                Thread th = new Thread(monitor);
+                th.start();
+                AutoscalerContext.getInstance().addLbMonitor(monitor);
+                log.info(String.format("LB Cluster monitor has been added: [cluster] %s",
+                                                        cluster.getClusterId()));
+//                if (log.isDebugEnabled()) {
+//                    log.debug(String.format("Cluster monitor has been added: [cluster] %s",
+//                                            cluster.getClusterId()));
+//                }
+        }
+    }
+    
+    private class ClusterMonitorAdder implements Runnable {
+        private Cluster cluster;
+        
+        public ClusterMonitorAdder(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        public void run() {
+                ClusterMonitor monitor;
+                try {
+                    monitor = AutoscalerUtil.getClusterMonitor(cluster);
+
+                } catch (PolicyValidationException e) {
+                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
+                    log.error(msg, e);
+                    throw new RuntimeException(msg, e);
+
+                } catch(PartitionValidationException e){
+                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
+                    log.error(msg, e);
+                    throw new RuntimeException(msg, e);
+                }
+
+                Thread th = new Thread(monitor);
+                th.start();
+                AutoscalerContext.getInstance().addMonitor(monitor);
+                log.info(String.format("Cluster monitor has been added: [cluster] %s",
+                                                        cluster.getClusterId()));
+//                if (log.isDebugEnabled()) {
+//                    log.debug(String.format("Cluster monitor has been added: [cluster] %s",
+//                                            cluster.getClusterId()));
+//                }
+        }
+    }
+
+//    private void addClusterToContext(Cluster cluster) {
+//        ClusterContext ctxt;
+//        try {
+//            ctxt = AutoscalerUtil.getClusterMonitor(cluster);
+//        } catch (PolicyValidationException e) {
+//            String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
+//            log.error(msg, e);
+//            throw new RuntimeException(msg, e);
+//        }
+//        AutoscalerContext ruleCtxt = AutoscalerContext.getInstance();
+//        ClusterMonitor monitor =
+//                                 new ClusterMonitor(cluster.getClusterId(), ctxt,
+//                                                    ruleCtxt.getStatefulSession());
+//        Thread th = new Thread(monitor);
+//        th.start();
+//        AutoscalerContext.getInstance().addMonitor(monitor);
+//        if (log.isDebugEnabled()) {
+//            log.debug(String.format("Cluster monitor has been added: [cluster] %s",
+//                                    cluster.getClusterId()));
+//        }
+//    }
+
+    private void removeMonitor(String clusterId) {
+        ClusterMonitor monitor = AutoscalerContext.getInstance().removeMonitor(clusterId);
+//        monitor.unsubscribe();
+        monitor.destroy();
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId));
+            }
+    }
+
+    private Cluster findCluster(String clusterId) {
+        if(clusterId == null) {
+            return null;
+        }
+
+        Collection<Service> services = TopologyManager.getTopology().getServices();
+        for (Service service : services) {
+            for (Cluster cluster : service.getClusters()) {
+                if (clusterId.equals(cluster.getClusterId())) {
+                    return cluster;
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Terminate load balancer topology receiver thread.
+     */
+    public void terminate() {
+        topologyReceiver.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/aa539dbd/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java
deleted file mode 100644
index b216df1..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/AutoscalerTopologyReceiver.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.AutoscalerContext;
-import org.apache.stratos.autoscaler.MemberStatsContext;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.exception.PartitionValidationException;
-import org.apache.stratos.autoscaler.exception.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
-import org.apache.stratos.autoscaler.monitor.ClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.LbClusterMonitor;
-import org.apache.stratos.autoscaler.util.AutoscalerUtil;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
-
-import java.util.Collection;
-
-/**
- * Load balancer topology receiver.
- */
-public class AutoscalerTopologyReceiver implements Runnable {
-
-    private static final Log log = LogFactory.getLog(AutoscalerTopologyReceiver.class);
-
-    private TopologyReceiver topologyReceiver;
-    private boolean terminated;
-
-    public AutoscalerTopologyReceiver() {
-		this.topologyReceiver = new TopologyReceiver(createMessageDelegator());
-    }
-
-    @Override
-    public void run() {
-        //FIXME this activated before autoscaler deployer activated.
-        try {
-            Thread.sleep(15000);
-        } catch (InterruptedException ignore) {
-        }
-        Thread thread = new Thread(topologyReceiver);
-        thread.start();
-        if(log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread started");
-        }
-
-        // Keep the thread live until terminated
-        while (!terminated);
-        if(log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread terminated");
-        }
-    }
-
-    private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
-        processorChain.addEventListener(new CompleteTopologyEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                try {
-                    TopologyManager.acquireReadLock();
-                    for(Service service : TopologyManager.getTopology().getServices()) {
-                        for(Cluster cluster : service.getClusters()) {
-                        	    Thread th; 
-                        		if(cluster.isLbCluster()){
-                        			th = new Thread(new LBClusterMonitorAdder(cluster));
-                        		}else{
-                        			th = new Thread(new ClusterMonitorAdder(cluster));
-                        		}
-                                
-                                th.start();
-                        }
-                    }
-                }
-                finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-
-        });
-        return new TopologyEventMessageDelegator(processorChain);
-    }
-
-    private TopologyMessageProcessorChain createEventProcessorChain() {
-        // Listen to topology events that affect clusters
-        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
-        processorChain.addEventListener(new ClusterCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    ClusterCreatedEvent e = (ClusterCreatedEvent) event;
-                    TopologyManager.acquireReadLock();
-                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
-                    Cluster cluster = service.getCluster(e.getClusterId());
-                    if (cluster.isLbCluster()) {
-                        Thread th = new Thread(new LBClusterMonitorAdder(cluster));
-                        th.start();
-                    } else {
-                        Thread th = new Thread(new ClusterMonitorAdder(cluster));
-                        th.start();
-                    }
-                } finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-
-        });
-        
-        processorChain.addEventListener(new ClusterRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
-                    TopologyManager.acquireReadLock();
-                    String serviceName = e.getServiceName();
-                    String clusterId = e.getClusterId();
-
-                    AbstractMonitor monitor;
-
-                    if(TopologyManager.getTopology().getService(serviceName).getCluster(clusterId).isLbCluster()){
-                        monitor = AutoscalerContext.getInstance().removeLbMonitor(clusterId);
-
-                    } else {
-                        monitor = AutoscalerContext.getInstance().removeMonitor(clusterId);
-                    }
-
-                    monitor.destroy();
-                    if(log.isDebugEnabled()) {
-                        log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId));
-                    }
-                }
-                finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-
-        });
-        
-        processorChain.addEventListener(new MemberStartedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-            		
-            }
-
-        });
-        
-        processorChain.addEventListener(new MemberTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-             
-            	try {
-            		TopologyManager.acquireReadLock();
-					MemberTerminatedEvent e = (MemberTerminatedEvent) event;
-                    String networkPartitionId = e.getNetworkPartitionId();
-                    String clusterId = e.getClusterId();
-                    AbstractMonitor monitor;
-
-                    if(AutoscalerContext.getInstance().moniterExist(clusterId)){
-
-                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
-                    } else {
-
-                        //This is LB member
-                        monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
-                    }
-
-					NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-
-                    networkPartitionContext.getPartitionCtxt(e.getPartitionId())
-                            .removeMemberStatsContext(e.getMemberId());
-                    networkPartitionContext.decreaseMemberCountInPartitionBy(e.getPartitionId(), 1);
-//					ClusterContext clusCtx = monitor.getClusterCtxt();
-//					String networkPartitionId = monitor.
-//                    if (networkPartitionId != null) {
-//                        NetworkPartitionContext networkPartContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
-//                        networkPartContext.decrementCurrentMemberCount(1);
-//                    }
-
-				} finally {
-					TopologyManager.releaseReadLock();
-				}
-            }
-
-        });
-        
-        processorChain.addEventListener(new MemberActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-            	try {
-					TopologyManager.acquireReadLock();
-
-					MemberActivatedEvent e = (MemberActivatedEvent)event;
-                    String memberId = e.getMemberId();
-                    String partitionId = e.getPartitionId();
-                    String networkPartitionId = e.getNetworkPartitionId();
-
-                    String serviceName = e.getServiceName();
-                    PartitionContext partitionContext;
-					String clusterId = e.getClusterId();
-                    AbstractMonitor monitor;
-
-					if(AutoscalerContext.getInstance().moniterExist(clusterId)) {
-                        monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
-					    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-					} else {
-					    monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId);
-					    partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
-					}
-//					ClusterContext clusCtx = monitor.getClusterCtxt();
-//                    monitor.getNetworkPartitionCtxt(e.getId()).getPartitionCtxt(partitionId);
-//                            .addMemberStatsContext(new MemberStatsContext(e.getMemberId()));
-                    partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
-//					PartitionContext partCtxt = monitor.getNetworkPartitionCtxt(e.getId())
-//                            .getPartitionCtxt(partitionId);
-					partitionContext.incrementCurrentMemberCount(1);
-					partitionContext.removePendingMember(memberId);
-
-				}
-                finally{
-                	TopologyManager.releaseReadLock();
-                }
-            }
-        });
-        
-        processorChain.addEventListener(new ServiceRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-//                try {
-//                    TopologyManager.acquireReadLock();
-//
-//                    // Remove all clusters of given service from context
-//                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
-//                    for(Service service : TopologyManager.getTopology().getServices()) {
-//                        for(Cluster cluster : service.getClusters()) {
-//                            removeMonitor(cluster.getHostName());
-//                        }
-//                    }
-//                }
-//                finally {
-//                    TopologyManager.releaseReadLock();
-//                }
-            }
-        });
-        return processorChain;
-    }
-    
-    private class LBClusterMonitorAdder implements Runnable {
-        private Cluster cluster;
-        
-        public LBClusterMonitorAdder(Cluster cluster) {
-            this.cluster = cluster;
-        }
-
-        public void run() {
-                LbClusterMonitor monitor;
-                try {
-                    monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
-
-                } catch (PolicyValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-                    log.error(msg, e);
-                    throw new RuntimeException(msg, e);
-
-                } catch(PartitionValidationException e){
-                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-                    log.error(msg, e);
-                    throw new RuntimeException(msg, e);
-                }
-
-                Thread th = new Thread(monitor);
-                th.start();
-                AutoscalerContext.getInstance().addLbMonitor(monitor);
-                log.info(String.format("LB Cluster monitor has been added: [cluster] %s",
-                                                        cluster.getClusterId()));
-//                if (log.isDebugEnabled()) {
-//                    log.debug(String.format("Cluster monitor has been added: [cluster] %s",
-//                                            cluster.getClusterId()));
-//                }
-        }
-    }
-    
-    private class ClusterMonitorAdder implements Runnable {
-        private Cluster cluster;
-        
-        public ClusterMonitorAdder(Cluster cluster) {
-            this.cluster = cluster;
-        }
-
-        public void run() {
-                ClusterMonitor monitor;
-                try {
-                    monitor = AutoscalerUtil.getClusterMonitor(cluster);
-
-                } catch (PolicyValidationException e) {
-                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-                    log.error(msg, e);
-                    throw new RuntimeException(msg, e);
-
-                } catch(PartitionValidationException e){
-                    String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-                    log.error(msg, e);
-                    throw new RuntimeException(msg, e);
-                }
-
-                Thread th = new Thread(monitor);
-                th.start();
-                AutoscalerContext.getInstance().addMonitor(monitor);
-                log.info(String.format("Cluster monitor has been added: [cluster] %s",
-                                                        cluster.getClusterId()));
-//                if (log.isDebugEnabled()) {
-//                    log.debug(String.format("Cluster monitor has been added: [cluster] %s",
-//                                            cluster.getClusterId()));
-//                }
-        }
-    }
-
-//    private void addClusterToContext(Cluster cluster) {
-//        ClusterContext ctxt;
-//        try {
-//            ctxt = AutoscalerUtil.getClusterMonitor(cluster);
-//        } catch (PolicyValidationException e) {
-//            String msg = "Cluster monitor creation failed for cluster: "+cluster.getClusterId();
-//            log.error(msg, e);
-//            throw new RuntimeException(msg, e);
-//        }
-//        AutoscalerContext ruleCtxt = AutoscalerContext.getInstance();
-//        ClusterMonitor monitor =
-//                                 new ClusterMonitor(cluster.getClusterId(), ctxt,
-//                                                    ruleCtxt.getStatefulSession());
-//        Thread th = new Thread(monitor);
-//        th.start();
-//        AutoscalerContext.getInstance().addMonitor(monitor);
-//        if (log.isDebugEnabled()) {
-//            log.debug(String.format("Cluster monitor has been added: [cluster] %s",
-//                                    cluster.getClusterId()));
-//        }
-//    }
-
-    private void removeMonitor(String clusterId) {
-        ClusterMonitor monitor = AutoscalerContext.getInstance().removeMonitor(clusterId);
-//        monitor.unsubscribe();
-        monitor.destroy();
-            if(log.isDebugEnabled()) {
-                log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId));
-            }
-    }
-
-    private Cluster findCluster(String clusterId) {
-        if(clusterId == null) {
-            return null;
-        }
-
-        Collection<Service> services = TopologyManager.getTopology().getServices();
-        for (Service service : services) {
-            for (Cluster cluster : service.getClusters()) {
-                if (clusterId.equals(cluster.getClusterId())) {
-                    return cluster;
-                }
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Terminate load balancer topology receiver thread.
-     */
-    public void terminate() {
-        topologyReceiver.terminate();
-        terminated = true;
-    }
-}