You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/04/17 21:25:58 UTC

[2/4] Updated TopologyEventReceiver, TenantEventReceiver, InstanceNotifierEventReceiver, HealthStatEventReceiver and implemented separate message queues for each receiver instance

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
new file mode 100644
index 0000000..7efaa1c
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
+import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer topology receiver updates load balancer context according to
+ * incoming topology events.
+ */
+public class LoadBalancerTopologyEventReceiver implements Runnable {
+
+    private static final Log log = LogFactory.getLog(LoadBalancerTopologyEventReceiver.class);
+
+    private TopologyEventReceiver topologyEventReceiver;
+    private boolean terminated;
+
+    public LoadBalancerTopologyEventReceiver() {
+        this.topologyEventReceiver = new TopologyEventReceiver();
+        addEventListeners();
+    }
+
+    @Override
+    public void run() {
+        Thread thread = new Thread(topologyEventReceiver);
+        thread.start();
+        if (log.isInfoEnabled()) {
+            log.info("Load balancer topology receiver thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if (log.isInfoEnabled()) {
+            log.info("Load balancer topology receiver thread terminated");
+        }
+    }
+
+    private void addEventListeners() {
+        // Listen to topology events that affect clusters
+        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    TopologyManager.acquireReadLock();
+                    for (Service service : TopologyManager.getTopology().getServices()) {
+                        for (Cluster cluster : service.getClusters()) {
+                            if (clusterHasActiveMembers(cluster)) {
+                                LoadBalancerContextUtil.addClusterToLbContext(cluster);
+                            } else {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Cluster does not have any active members");
+                                }
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+
+            private boolean clusterHasActiveMembers(Cluster cluster) {
+                for (Member member : cluster.getMembers()) {
+                    if (member.getStatus() == MemberStatus.Activated) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+        });
+        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    TopologyManager.acquireReadLock();
+
+                    // Add cluster to load balancer context when its first member is activated
+                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+                    if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster(memberActivatedEvent.getClusterId())) {
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("Cluster exists in load balancer context: [service] %s [cluster] %s",
+                                    memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
+                        }
+                        return;
+                    }
+                    // Cluster not found in load balancer context, add it
+                    Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
+                    if (service != null) {
+                        Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
+                        if (cluster != null) {
+                            LoadBalancerContextUtil.addClusterToLbContext(cluster);
+                        } else {
+                            if (log.isErrorEnabled()) {
+                                log.error(String.format("Cluster not found in topology: [service] %s [cluster] %s",
+                                        memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
+                            }
+                        }
+                    } else {
+                        if (log.isErrorEnabled()) {
+                            log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName()));
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    TopologyManager.acquireReadLock();
+
+                    // Remove cluster from context
+                    ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+                    Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
+                    if (cluster != null) {
+                        LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
+                    } else {
+                        if (log.isWarnEnabled()) {
+                            log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s",
+                                    clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()));
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+        topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    TopologyManager.acquireReadLock();
+
+                    // Remove all clusters of given service from context
+                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
+                    Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
+                    if (service != null) {
+                        for (Cluster cluster : service.getClusters()) {
+                            LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
+                        }
+                    } else {
+                        if (log.isWarnEnabled()) {
+                            log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
+                        }
+                    }
+                } catch (Exception e) {
+                    log.error("Error processing event", e);
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+    }
+
+    /**
+     * Terminate load balancer topology receiver thread.
+     */
+    public void terminate() {
+        topologyEventReceiver.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
deleted file mode 100644
index c039f1b..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.context.LoadBalancerContext;
-import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
-import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
-import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
-
-/**
- * Load balancer topology receiver updates load balancer context according to
- * incoming topology events.
- */
-public class LoadBalancerTopologyReceiver implements Runnable {
-
-    private static final Log log = LogFactory.getLog(LoadBalancerTopologyReceiver.class);
-
-    private TopologyReceiver topologyReceiver;
-    private boolean terminated;
-
-    public LoadBalancerTopologyReceiver() {
-        this.topologyReceiver = new TopologyReceiver(createMessageDelegator());
-    }
-
-    @Override
-    public void run() {
-        Thread thread = new Thread(topologyReceiver);
-        thread.start();
-        if (log.isInfoEnabled()) {
-            log.info("Load balancer topology receiver thread started");
-        }
-
-        // Keep the thread live until terminated
-        while (!terminated) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        if (log.isInfoEnabled()) {
-            log.info("Load balancer topology receiver thread terminated");
-        }
-    }
-
-    private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
-        return new TopologyEventMessageDelegator(processorChain);
-    }
-
-    private TopologyMessageProcessorChain createEventProcessorChain() {
-        // Listen to topology events that affect clusters
-        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
-        processorChain.addEventListener(new CompleteTopologyEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    TopologyManager.acquireReadLock();
-                    for (Service service : TopologyManager.getTopology().getServices()) {
-                        for (Cluster cluster : service.getClusters()) {
-                            if (clusterHasActiveMembers(cluster)) {
-                                LoadBalancerContextUtil.addClusterToLbContext(cluster);
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Cluster does not have any active members");
-                                }
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-
-            private boolean clusterHasActiveMembers(Cluster cluster) {
-                for (Member member : cluster.getMembers()) {
-                    if (member.getStatus() == MemberStatus.Activated) {
-                        return true;
-                    }
-                }
-                return false;
-            }
-        });
-        processorChain.addEventListener(new MemberActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    TopologyManager.acquireReadLock();
-
-                    // Add cluster to load balancer context when its first member is activated
-                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
-                    if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster(memberActivatedEvent.getClusterId())) {
-                        if (log.isDebugEnabled()) {
-                            log.debug(String.format("Cluster exists in load balancer context: [service] %s [cluster] %s",
-                                    memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
-                        }
-                        return;
-                    }
-                    // Cluster not found in load balancer context, add it
-                    Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName());
-                    if (service != null) {
-                        Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId());
-                        if (cluster != null) {
-                            LoadBalancerContextUtil.addClusterToLbContext(cluster);
-                        } else {
-                            if (log.isErrorEnabled()) {
-                                log.error(String.format("Cluster not found in topology: [service] %s [cluster] %s",
-                                        memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()));
-                            }
-                        }
-                    } else {
-                        if (log.isErrorEnabled()) {
-                            log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName()));
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-        });
-        processorChain.addEventListener(new ClusterRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    TopologyManager.acquireReadLock();
-
-                    // Remove cluster from context
-                    ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
-                    Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId());
-                    if (cluster != null) {
-                        LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
-                    } else {
-                        if (log.isWarnEnabled()) {
-                            log.warn(String.format("Cluster not found in load balancer context: [service] %s [cluster] %s",
-                                    clusterRemovedEvent.getServiceName(), clusterRemovedEvent.getClusterId()));
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-        });
-        processorChain.addEventListener(new ServiceRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    TopologyManager.acquireReadLock();
-
-                    // Remove all clusters of given service from context
-                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
-                    Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
-                    if (service != null) {
-                        for (Cluster cluster : service.getClusters()) {
-                            LoadBalancerContextUtil.removeClusterFromLbContext(cluster.getClusterId());
-                        }
-                    } else {
-                        if (log.isWarnEnabled()) {
-                            log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("Error processing event", e);
-                } finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-        });
-        return processorChain;
-    }
-
-    /**
-     * Terminate load balancer topology receiver thread.
-     */
-    public void terminate() {
-        topologyReceiver.terminate();
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index da7f3de..f7158a7 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -24,8 +24,8 @@ import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.EndpointDeployer;
-import org.apache.stratos.load.balancer.LoadBalancerTenantReceiver;
-import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
+import org.apache.stratos.load.balancer.LoadBalancerTenantEventReceiver;
+import org.apache.stratos.load.balancer.LoadBalancerTopologyEventReceiver;
 import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
 import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
@@ -101,8 +101,8 @@ public class LoadBalancerServiceComponent {
     private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class);
 
     private boolean activated = false;
-    private LoadBalancerTopologyReceiver topologyReceiver;
-    private LoadBalancerTenantReceiver tenantReceiver;
+    private LoadBalancerTopologyEventReceiver topologyReceiver;
+    private LoadBalancerTenantEventReceiver tenantReceiver;
     private LoadBalancerStatisticsNotifier statisticsNotifier;
 
     protected void activate(ComponentContext ctxt) {
@@ -127,7 +127,7 @@ public class LoadBalancerServiceComponent {
                 // Configure jndi.properties
                 JndiConfigurator.configure(configuration);
 
-                tenantReceiver = new LoadBalancerTenantReceiver();
+                tenantReceiver = new LoadBalancerTenantEventReceiver();
                 Thread tenantReceiverThread = new Thread(tenantReceiver);
                 tenantReceiverThread.start();
                 if (log.isInfoEnabled()) {
@@ -142,7 +142,7 @@ public class LoadBalancerServiceComponent {
                 }
 
                 // Start topology receiver
-                topologyReceiver = new LoadBalancerTopologyReceiver();
+                topologyReceiver = new LoadBalancerTopologyEventReceiver();
                 Thread topologyReceiverThread = new Thread(topologyReceiver);
                 topologyReceiverThread.start();
                 if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index 81c1dbe..c843fdb 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -24,7 +24,7 @@ import org.apache.stratos.manager.listener.InstanceStatusListener;
 import org.apache.stratos.manager.publisher.TenantEventPublisher;
 import org.apache.stratos.manager.publisher.TenantSynchronizerTaskScheduler;
 import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
-import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyReceiver;
+import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
 import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
@@ -60,7 +60,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
 public class ADCManagementServerComponent {
 
     private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class);
-    private StratosManagerTopologyReceiver stratosManagerTopologyReceiver;
+    private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver;
 
     protected void activate(ComponentContext componentContext) throws Exception {
 		try {
@@ -102,8 +102,8 @@ public class ADCManagementServerComponent {
             Thread topologyReceiverThread = new Thread(topologyReceiver);
             topologyReceiverThread.start();*/
 
-            stratosManagerTopologyReceiver = new StratosManagerTopologyReceiver();
-            Thread topologyReceiverThread = new Thread(stratosManagerTopologyReceiver);
+            stratosManagerTopologyEventReceiver = new StratosManagerTopologyEventReceiver();
+            Thread topologyReceiverThread = new Thread(stratosManagerTopologyEventReceiver);
             topologyReceiverThread.start();
             log.info("Topology receiver thread started");
 
@@ -174,6 +174,6 @@ public class ADCManagementServerComponent {
     protected void deactivate(ComponentContext context) {
 
         //terminate Stratos Manager Topology Receiver
-        stratosManagerTopologyReceiver.terminate();
+        stratosManagerTopologyEventReceiver.terminate();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
new file mode 100644
index 0000000..94ba486
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.topology.receiver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+public class StratosManagerTopologyEventReceiver implements Runnable {
+
+    private static final Log log = LogFactory.getLog(StratosManagerTopologyEventReceiver.class);
+
+    private TopologyEventReceiver topologyEventReceiver;
+    private boolean terminated;
+
+    public StratosManagerTopologyEventReceiver() {
+        this.terminated = false;
+        this.topologyEventReceiver = new TopologyEventReceiver();
+        addEventListeners();
+    }
+
+    private void addEventListeners() {
+        //add listner to Complete Topology Event
+        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                if (TopologyClusterInformationModel.getInstance().isInitialized()) {
+                    return;
+                }
+
+                log.info("[CompleteTopologyEventListener] Received: " + event.getClass());
+
+                try {
+                    TopologyManager.acquireReadLock();
+
+                    for (Service service : TopologyManager.getTopology()
+                            .getServices()) {
+                        // iterate through all clusters
+                        for (Cluster cluster : service.getClusters()) {
+                            TopologyClusterInformationModel.getInstance()
+                                    .addCluster(cluster);
+                        }
+                    }
+
+                    TopologyClusterInformationModel.getInstance().setInitialized(true);
+
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+
+        //Cluster Created event listner
+        topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ClusterCreatedEventListener] Received: " + event.getClass());
+
+                ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event;
+
+                String serviceType = clustercreatedEvent.getServiceName();
+                //acquire read lock
+                TopologyManager.acquireReadLock();
+
+                try {
+                    Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
+                    TopologyClusterInformationModel.getInstance().addCluster(cluster);
+
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLock();
+                }
+
+            }
+        });
+
+
+        //Cluster Removed event listner
+        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[ClusterRemovedEventListener] Received: " + event.getClass());
+
+                ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+                TopologyClusterInformationModel.getInstance().removeCluster(clusterRemovedEvent.getClusterId());
+            }
+        });
+        
+        
+      //Instance Spawned event listner
+        topologyEventReceiver.addEventListener(new InstanceSpawnedEventListener() {
+
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[InstanceSpawnedEventListener] Received: " + event.getClass());
+
+                InstanceSpawnedEvent instanceSpawnedEvent = (InstanceSpawnedEvent) event;
+
+                String clusterDomain = instanceSpawnedEvent.getClusterId();
+
+                String serviceType = instanceSpawnedEvent.getServiceName();
+                //acquire read lock
+                TopologyManager.acquireReadLock();
+
+                try {
+                    Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+                    TopologyClusterInformationModel.getInstance().addCluster(cluster);
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+
+        //Member Started event listner
+        topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[MemberStartedEventListener] Received: " + event.getClass());
+
+                MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event;
+
+                String clusterDomain = memberStartedEvent.getClusterId();
+
+                String serviceType = memberStartedEvent.getServiceName();
+                //acquire read lock
+                TopologyManager.acquireReadLock();
+
+                try {
+                    Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+                    TopologyClusterInformationModel.getInstance().addCluster(cluster);
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLock();
+                }
+
+            }
+        });
+
+        //Member Activated event listner
+        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[MemberActivatedEventListener] Received: " + event.getClass());
+
+                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                String clusterDomain = memberActivatedEvent.getClusterId();
+
+                String serviceType = memberActivatedEvent.getServiceName();
+                //acquire read lock
+                TopologyManager.acquireReadLock();
+
+                try {
+                    Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+                    TopologyClusterInformationModel.getInstance().addCluster(cluster);
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+
+        //Member Suspended event listner
+        topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[MemberSuspendedEventListener] Received: " + event.getClass());
+
+                MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+
+                String clusterDomain = memberSuspendedEvent.getClusterId();
+
+                String serviceType = memberSuspendedEvent.getServiceName();
+                //acquire read lock
+                TopologyManager.acquireReadLock();
+
+                try {
+                    Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+                    TopologyClusterInformationModel.getInstance().addCluster(cluster);
+
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+
+        //Member Terminated event listner
+        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                log.info("[MemberTerminatedEventListener] Received: " + event.getClass());
+
+                MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+
+                String clusterDomain = memberTerminatedEvent.getClusterId();
+
+                String serviceType = memberTerminatedEvent.getServiceName();
+                //acquire read lock
+                TopologyManager.acquireReadLock();
+
+                try {
+                    Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
+
+                    // check and remove terminated member
+                    if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
+                        // release the read lock and acquire the write lock
+                        TopologyManager.releaseReadLock();
+                        TopologyManager.acquireWriteLock();
+
+                        try {
+                            // re-check the state; another thread might have acquired the write lock and modified
+                            if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
+                                // remove the member from the cluster
+                                Member terminatedMember = cluster.getMember(memberTerminatedEvent.getMemberId());
+                                cluster.removeMember(terminatedMember);
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Removed the terminated member with id " + memberTerminatedEvent.getMemberId() + " from the cluster");
+                                }
+                            }
+
+                            // downgrade to read lock - 1. acquire read lock, 2. release write lock
+                            // acquire read lock
+                            TopologyManager.acquireReadLock();
+
+                        } finally {
+                            // release the write lock
+                            TopologyManager.releaseWriteLock();
+                        }
+                    }
+                    TopologyClusterInformationModel.getInstance().addCluster(cluster);
+                } finally {
+                    //release read lock
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+    }
+
+
+    @Override
+    public void run() {
+        Thread thread = new Thread(topologyEventReceiver);
+        thread.start();
+        log.info("Stratos Manager topology receiver thread started");
+
+        //Keep running till terminate is set from deactivate method of the component
+        while (!terminated) {
+            //loop while terminate = false
+        	try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        log.info("Stratos Manager topology receiver thread terminated");
+    }
+
+    //terminate Topology Receiver
+    public void terminate () {
+        topologyEventReceiver.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
deleted file mode 100644
index 46b3313..0000000
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyReceiver.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.manager.topology.receiver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
-import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
-import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent;
-import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
-import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
-import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
-import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
-import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
-import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
-import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
-import org.apache.stratos.messaging.listener.topology.InstanceSpawnedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberSuspendedEventListener;
-import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
-import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
-
-public class StratosManagerTopologyReceiver implements Runnable {
-
-    private static final Log log = LogFactory.getLog(StratosManagerTopologyReceiver.class);
-
-    private TopologyReceiver stratosManagerTopologyReceiver;
-    private boolean terminate;
-
-    public StratosManagerTopologyReceiver() {
-        this.terminate = false;
-        this.stratosManagerTopologyReceiver = new TopologyReceiver(createMessageDelegator());
-    }
-
-    private TopologyEventMessageDelegator createMessageDelegator() {
-        TopologyMessageProcessorChain processorChain = createEventProcessorChain();
-        return new TopologyEventMessageDelegator(processorChain);
-    }
-
-    private TopologyMessageProcessorChain createEventProcessorChain() {
-
-        TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
-
-        //add listner to Complete Topology Event
-        processorChain.addEventListener(new CompleteTopologyEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-            	if (TopologyClusterInformationModel.getInstance().isInitialized()) {
-            		return;
-            	}
-            	
-                log.info("[CompleteTopologyEventListener] Received: " + event.getClass());
-
-                try {
-                    TopologyManager.acquireReadLock();
-
-					for (Service service : TopologyManager.getTopology()
-							.getServices()) {
-						// iterate through all clusters
-						for (Cluster cluster : service.getClusters()) {
-							TopologyClusterInformationModel.getInstance()
-									.addCluster(cluster);
-						}
-					}
-					
-					TopologyClusterInformationModel.getInstance().setInitialized(true);
-                
-                } finally {
-                    TopologyManager.releaseReadLock();
-                }
-            }
-        });
-
-        //Cluster Created event listner
-        processorChain.addEventListener(new ClusterCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[ClusterCreatedEventListener] Received: " + event.getClass());
-
-                ClusterCreatedEvent clustercreatedEvent = (ClusterCreatedEvent) event;
-
-                    String serviceType = clustercreatedEvent.getServiceName();
-                    //acquire read lock
-                    TopologyManager.acquireReadLock();
-
-                    try {
-                        Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clustercreatedEvent.getClusterId());
-                        TopologyClusterInformationModel.getInstance().addCluster(cluster);
-
-                    } finally {
-                        //release read lock
-                        TopologyManager.releaseReadLock();
-                    }
-                
-            }
-        });
-
-
-        //Cluster Removed event listner
-        processorChain.addEventListener(new ClusterRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[ClusterRemovedEventListener] Received: " + event.getClass());
-
-                ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
-                TopologyClusterInformationModel.getInstance().removeCluster(clusterRemovedEvent.getClusterId());
-            }
-        });
-        
-        
-      //Instance Spawned event listner
-        processorChain.addEventListener(new InstanceSpawnedEventListener() {
-        	
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[InstanceSpawnedEventListener] Received: " + event.getClass());
-
-                InstanceSpawnedEvent instanceSpawnedEvent = (InstanceSpawnedEvent) event;
-
-                String clusterDomain = instanceSpawnedEvent.getClusterId();
-                
-                    String serviceType = instanceSpawnedEvent.getServiceName();
-                    //acquire read lock
-                    TopologyManager.acquireReadLock();
-
-                    try {
-                        Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
-                        TopologyClusterInformationModel.getInstance().addCluster(cluster);
-                    } finally {
-                        //release read lock
-                        TopologyManager.releaseReadLock();
-                    }                
-            }
-        });
-
-        //Member Started event listner
-        processorChain.addEventListener(new MemberStartedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[MemberStartedEventListener] Received: " + event.getClass());
-
-                MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event;
-
-                String clusterDomain = memberStartedEvent.getClusterId();
-          
-                    String serviceType = memberStartedEvent.getServiceName();
-                    //acquire read lock
-                    TopologyManager.acquireReadLock();
-
-                    try {
-                        Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
-                        TopologyClusterInformationModel.getInstance().addCluster(cluster);
-                    } finally {
-                        //release read lock
-                        TopologyManager.releaseReadLock();
-                    }
-
-            }
-        });
-
-        //Member Activated event listner
-        processorChain.addEventListener(new MemberActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[MemberActivatedEventListener] Received: " + event.getClass());
-
-                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
-
-                String clusterDomain = memberActivatedEvent.getClusterId();
-
-                    String serviceType = memberActivatedEvent.getServiceName();
-                    //acquire read lock
-                    TopologyManager.acquireReadLock();
-
-                    try {
-                        Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
-                        TopologyClusterInformationModel.getInstance().addCluster(cluster);
-                    } finally {
-                        //release read lock
-                        TopologyManager.releaseReadLock();
-                    }                
-            }
-        });
-
-        //Member Suspended event listner
-        processorChain.addEventListener(new MemberSuspendedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[MemberSuspendedEventListener] Received: " + event.getClass());
-
-                MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
-
-                String clusterDomain = memberSuspendedEvent.getClusterId();
-
-                    String serviceType = memberSuspendedEvent.getServiceName();
-                    //acquire read lock
-                    TopologyManager.acquireReadLock();
-
-                    try {
-                        Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
-                        TopologyClusterInformationModel.getInstance().addCluster(cluster);
-
-                    } finally {
-                        //release read lock
-                        TopologyManager.releaseReadLock();
-                    }
-            }
-        });
-
-        //Member Terminated event listner
-        processorChain.addEventListener(new MemberTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                log.info("[MemberTerminatedEventListener] Received: " + event.getClass());
-
-                MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
-
-                String clusterDomain = memberTerminatedEvent.getClusterId();
-
-                    String serviceType = memberTerminatedEvent.getServiceName();
-                    //acquire read lock
-                    TopologyManager.acquireReadLock();
-
-                    try {
-                        Cluster cluster = TopologyManager.getTopology().getService(serviceType).getCluster(clusterDomain);
-
-                        // check and remove terminated member
-                        if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
-                            // release the read lock and acquire the write lock
-                            TopologyManager.releaseReadLock();
-                            TopologyManager.acquireWriteLock();
-
-                                try {
-                                    // re-check the state; another thread might have acquired the write lock and modified
-                                    if (cluster.memberExists(memberTerminatedEvent.getMemberId())) {
-                                        // remove the member from the cluster
-                                        Member terminatedMember = cluster.getMember(memberTerminatedEvent.getMemberId());
-                                        cluster.removeMember(terminatedMember);
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("Removed the terminated member with id " + memberTerminatedEvent.getMemberId() + " from the cluster");
-                                        }
-                                    }
-
-                                    // downgrade to read lock - 1. acquire read lock, 2. release write lock
-                                    // acquire read lock
-                                    TopologyManager.acquireReadLock();
-
-                                } finally {
-                                    // release the write lock
-                                    TopologyManager.releaseWriteLock();
-                                }
-                        }
-                        TopologyClusterInformationModel.getInstance().addCluster(cluster);
-                    } finally {
-                        //release read lock
-                        TopologyManager.releaseReadLock();
-                    }
-            }
-        });
-
-        return processorChain;
-    }
-
-
-    @Override
-    public void run() {
-
-        Thread thread = new Thread(stratosManagerTopologyReceiver);
-        thread.start();
-        log.info("Stratos Manager topology receiver thread started");
-
-        //Keep running till terminate is set from deactivate method of the component
-        while (!terminate) {
-            //loop while terminate = false
-        	try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        log.info("Stratos Manager topology receiver thread terminated");
-    }
-
-    //terminate Topology Receiver
-    public void terminate () {
-        stratosManagerTopologyReceiver.terminate();
-        terminate = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 7786c39..17727ed 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.receiver.health.stat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
 
@@ -31,18 +32,21 @@ import javax.jms.TextMessage;
  * Implements logic for processing health stat event messages based on a given
  * topology process chain.
  */
-public class HealthStatEventMessageDelegator implements Runnable {
+class HealthStatEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(HealthStatEventMessageDelegator.class);
+
+    private HealthStatEventMessageQueue messageQueue;
     private MessageProcessorChain processorChain;
     private boolean terminated;
 
-    public HealthStatEventMessageDelegator() {
+    public HealthStatEventMessageDelegator(HealthStatEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
         this.processorChain = new HealthStatMessageProcessorChain();
     }
 
-    public HealthStatEventMessageDelegator(MessageProcessorChain processorChain) {
-        this.processorChain = processorChain;
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
     }
 
     @Override
@@ -54,7 +58,7 @@ public class HealthStatEventMessageDelegator implements Runnable {
 
             while (!terminated) {
                 try {
-                    TextMessage message = HealthStatEventMessageQueue.getInstance().take();
+                    TextMessage message = messageQueue.take();
 
                     String messageText = message.getText();
                     if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
index 733521e..5e818bc 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.health.stat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.message.receiver.tenant.TenantEventMessageQueue;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -36,6 +35,12 @@ public class HealthStatEventMessageListener implements MessageListener {
 
     private static final Log log = LogFactory.getLog(HealthStatEventMessageListener.class);
 
+    private HealthStatEventMessageQueue messageQueue;
+
+    public HealthStatEventMessageListener(HealthStatEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
     @Override
     public void onMessage(Message message) {
         if (message instanceof TextMessage) {
@@ -45,7 +50,7 @@ public class HealthStatEventMessageListener implements MessageListener {
                     log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText()));
                 }
                 // Add received message to the queue
-                HealthStatEventMessageQueue.getInstance().add(receivedMessage);
+                messageQueue.add(receivedMessage);
 
             } catch (JMSException e) {
                 log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
index 801667c..d9fea8f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageQueue.java
@@ -26,20 +26,5 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * Implements a blocking queue for managing instance notifier event messages.
  */
-public class HealthStatEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
-    private static volatile HealthStatEventMessageQueue instance;
-
-    private HealthStatEventMessageQueue(){
-    }
-
-    public static HealthStatEventMessageQueue getInstance() {
-        if (instance == null) {
-            synchronized (HealthStatEventMessageQueue.class){
-                if (instance == null) {
-                    instance = new HealthStatEventMessageQueue();
-                }
-            }
-        }
-        return instance;
-    }
+class HealthStatEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
new file mode 100644
index 0000000..8b07180
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving health stat information from message broker
+ */
+public class HealthStatEventReceiver implements Runnable {
+    private static final Log log = LogFactory.getLog(HealthStatEventReceiver.class);
+
+    private HealthStatEventMessageDelegator messageDelegator;
+    private HealthStatEventMessageListener messageListener;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public HealthStatEventReceiver() {
+        HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
+        this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
+        this.messageListener = new HealthStatEventMessageListener(messageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+            topicSubscriber.setMessageListener(messageListener);
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Health stats event message receiver thread started");
+            }
+
+            // Start health stat event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Health stats event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated) {
+            	try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Topology receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
deleted file mode 100644
index 0b33abc..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.health.stat;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * A thread for receiving health stat information from message broker
- */
-public class HealthStatReceiver implements Runnable {
-    private static final Log log = LogFactory.getLog(HealthStatReceiver.class);
-    private HealthStatEventMessageDelegator messageDelegator;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
-
-    public HealthStatReceiver() {
-        this.messageDelegator = new HealthStatEventMessageDelegator();
-    }
-
-    public HealthStatReceiver(HealthStatEventMessageDelegator messageDelegator) {
-        this.messageDelegator = messageDelegator;
-    }
-
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
-            topicSubscriber.setMessageListener(new HealthStatEventMessageListener());
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Health stats event message receiver thread started");
-            }
-
-            // Start health stat event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Health stats event message delegator thread started");
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated) {
-            	try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Topology receiver failed", e);
-            }
-        }
-    }
-
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 3ad3015..f086b8c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
 import org.apache.stratos.messaging.util.Constants;
@@ -32,18 +33,20 @@ import javax.jms.TextMessage;
  * Implements logic for processing instance notifier event messages based on a given
  * topology process chain.
  */
-public class InstanceNotifierEventMessageDelegator implements Runnable {
+class InstanceNotifierEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageDelegator.class);
+    private InstanceNotifierEventMessageQueue messageQueue;
     private MessageProcessorChain processorChain;
     private boolean terminated;
 
-    public InstanceNotifierEventMessageDelegator() {
+    public InstanceNotifierEventMessageDelegator(InstanceNotifierEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
         this.processorChain = new InstanceNotifierMessageProcessorChain();
     }
 
-    public InstanceNotifierEventMessageDelegator(MessageProcessorChain processorChain) {
-        this.processorChain = processorChain;
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
     }
 
     @Override
@@ -55,7 +58,7 @@ public class InstanceNotifierEventMessageDelegator implements Runnable {
 
             while (!terminated) {
                 try {
-                    TextMessage message = InstanceNotifierEventMessageQueue.getInstance().take();
+                    TextMessage message = messageQueue.take();
 
                     // Retrieve the header
                     String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
index 4e134ac..d8cc6b5 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -31,10 +31,16 @@ import javax.jms.TextMessage;
  * Implements functionality for receiving text based event messages from the instance notifier
  * message broker topic and add them to the event queue.
  */
-public class InstanceNotifierEventMessageListener implements MessageListener {
+class InstanceNotifierEventMessageListener implements MessageListener {
 
     private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageListener.class);
 
+    private InstanceNotifierEventMessageQueue messageQueue;
+
+    public InstanceNotifierEventMessageListener(InstanceNotifierEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
     @Override
     public void onMessage(Message message) {
         if (message instanceof TextMessage) {
@@ -44,7 +50,7 @@ public class InstanceNotifierEventMessageListener implements MessageListener {
                     log.debug(String.format("Instance notifier message received: %s", ((TextMessage) message).getText()));
                 }
                 // Add received message to the queue
-                InstanceNotifierEventMessageQueue.getInstance().add(receivedMessage);
+                messageQueue.add(receivedMessage);
 
             } catch (JMSException e) {
                 log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
index 1a49fac..a27f586 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
@@ -25,20 +25,5 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * Implements a blocking queue for managing instance notifier event messages.
  */
-public class InstanceNotifierEventMessageQueue extends LinkedBlockingQueue<TextMessage>{
-    private static volatile InstanceNotifierEventMessageQueue instance;
-
-    private InstanceNotifierEventMessageQueue(){
-    }
-
-    public static InstanceNotifierEventMessageQueue getInstance() {
-        if (instance == null) {
-            synchronized (InstanceNotifierEventMessageQueue.class){
-                if (instance == null) {
-                    instance = new InstanceNotifierEventMessageQueue();
-                }
-            }
-        }
-        return instance;
-    }
+class InstanceNotifierEventMessageQueue extends LinkedBlockingQueue<TextMessage> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
deleted file mode 100644
index 88f11f3..0000000
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.messaging.message.receiver.instance.notifier;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
-
-/**
- * A thread for receiving instance notifier information from message broker.
- */
-public class InstanceNotifierEventMessageReceiver implements Runnable {
-    private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageReceiver.class);
-    private InstanceNotifierEventMessageDelegator messageDelegator;
-    private TopicSubscriber topicSubscriber;
-    private boolean terminated;
-
-    public InstanceNotifierEventMessageReceiver() {
-        this.messageDelegator = new InstanceNotifierEventMessageDelegator();
-    }
-
-    public InstanceNotifierEventMessageReceiver(InstanceNotifierEventMessageDelegator messageDelegator) {
-        this.messageDelegator = messageDelegator;
-    }
-
-    @Override
-    public void run() {
-        try {
-            // Start topic subscriber thread
-            topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
-            topicSubscriber.setMessageListener(new InstanceNotifierEventMessageListener());
-            Thread subscriberThread = new Thread(topicSubscriber);
-            subscriberThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("InstanceNotifier event message receiver thread started");
-            }
-
-            // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("InstanceNotifier event message delegator thread started");
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated) {
-            	try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("InstanceNotifier receiver failed", e);
-            }
-        }
-    }
-
-    public boolean isSubscribed() {
-        return ((topicSubscriber != null) && (topicSubscriber.isSubscribed()));
-    }
-
-    public void terminate() {
-        topicSubscriber.terminate();
-        messageDelegator.terminate();
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
new file mode 100644
index 0000000..57fea76
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving instance notifier information from message broker.
+ */
+public class InstanceNotifierEventReceiver implements Runnable {
+    private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class);
+    private InstanceNotifierEventMessageDelegator messageDelegator;
+    private InstanceNotifierEventMessageListener messageListener;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public InstanceNotifierEventReceiver() {
+        InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
+        this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
+        this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        messageDelegator.addEventListener(eventListener);
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
+            topicSubscriber.setMessageListener(messageListener);
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("InstanceNotifier event message receiver thread started");
+            }
+
+            // Start instance notifier event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("InstanceNotifier event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated) {
+            	try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ignore) {
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("InstanceNotifier receiver failed", e);
+            }
+        }
+    }
+
+    public boolean isSubscribed() {
+        return ((topicSubscriber != null) && (topicSubscriber.isSubscribed()));
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index ef68da2..de05a34 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -21,6 +21,7 @@ package org.apache.stratos.messaging.message.receiver.tenant;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
 import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
 import org.apache.stratos.messaging.util.Constants;
@@ -32,18 +33,21 @@ import javax.jms.TextMessage;
  * Implements logic for processing topology event messages based on a given
  * topology process chain.
  */
-public class TenantEventMessageDelegator implements Runnable {
+class TenantEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(TenantEventMessageDelegator.class);
+
+    private TenantEventMessageQueue messageQueue;
     private MessageProcessorChain processorChain;
     private boolean terminated;
 
-    public TenantEventMessageDelegator() {
+    public TenantEventMessageDelegator(TenantEventMessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
         this.processorChain = new TenantMessageProcessorChain();
     }
 
-    public TenantEventMessageDelegator(MessageProcessorChain processorChain) {
-        this.processorChain = processorChain;
+    public void addEventListener(EventListener eventListener) {
+        processorChain.addEventListener(eventListener);
     }
 
     @Override
@@ -55,7 +59,7 @@ public class TenantEventMessageDelegator implements Runnable {
 
             while (!terminated) {
                 try {
-                    TextMessage message = TenantEventMessageQueue.getInstance().take();
+                    TextMessage message = messageQueue.take();
 
                     // Retrieve the header
                     String type = message.getStringProperty(Constants.EVENT_CLASS_NAME);