You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2016/01/05 16:47:11 UTC

[02/50] [abbrv] stratos git commit: making TopologyEventReceiver and ApplicationSignupEventReceiver a singleton

making TopologyEventReceiver and ApplicationSignupEventReceiver a singleton


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/033ab1f6
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/033ab1f6
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/033ab1f6

Branch: refs/heads/stratos-4.1.x
Commit: 033ab1f6c355c1847168da54393c73e22ad5c2ac
Parents: bb22134
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 15:33:21 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Mon Dec 7 18:48:09 2015 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 18 +++----
 .../stratos/cartridge/agent/CartridgeAgent.java | 54 ++++++++++----------
 .../agent/CartridgeAgentEventListeners.java     | 43 ++++++++--------
 .../agent/test/JavaCartridgeAgentTest.java      |  6 +--
 ...cerCommonApplicationSignUpEventReceiver.java | 11 ++--
 ...LoadBalancerCommonTopologyEventReceiver.java | 37 +++++++-------
 .../extension/api/LoadBalancerExtension.java    |  2 -
 .../internal/LoadBalancerServiceComponent.java  | 18 +++----
 .../StratosManagerServiceComponent.java         |  4 +-
 .../StratosManagerTopologyEventReceiver.java    | 18 +++----
 .../message/receiver/StratosEventReceiver.java  | 30 +++++++++++
 .../signup/ApplicationSignUpEventReceiver.java  | 37 ++++++++++----
 .../topology/TopologyEventReceiver.java         | 39 ++++++++++----
 .../mock/iaas/services/impl/MockInstance.java   |  6 +--
 14 files changed, 193 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 8336f86..6fd64a7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -60,18 +60,18 @@ public class AutoscalerTopologyEventReceiver {
     private ExecutorService executorService;
 
     public AutoscalerTopologyEventReceiver() {
-        this.topologyEventReceiver = new TopologyEventReceiver();
+        this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-    public void execute() {
-        //FIXME this activated before autoscaler deployer activated.
-        topologyEventReceiver.setExecutorService(getExecutorService());
-        topologyEventReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread started");
-        }
-    }
+//    public void execute() {
+//        //FIXME this activated before autoscaler deployer activated.
+//       // topologyEventReceiver.setExecutorService(getExecutorService());
+//        //topologyEventReceiver.execute();
+//        if (log.isInfoEnabled()) {
+//            log.info("Autoscaler topology receiver thread started");
+//        }
+//    }
 
     private void addEventListeners() {
         // Listen to topology events that affect clusters

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 91f596e..18e6e0a 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -103,10 +103,10 @@ public class CartridgeAgent implements Runnable {
         } */
 
         // Start application event receiver thread
-        registerApplicationEventListeners();
-        if (log.isInfoEnabled()) {
-            log.info("Cartridge agent registering all event listeners ... done");
-        }
+        //registerApplicationEventListeners();
+//        if (log.isInfoEnabled()) {
+//            log.info("Cartridge agent registering all event listeners ... done");
+//        }
 
         // Execute instance started shell script
         extensionHandler.onInstanceStartedEvent();
@@ -197,29 +197,29 @@ public class CartridgeAgent implements Runnable {
         }
     }
 
-    protected void registerTenantEventListeners() {
-        if (log.isDebugEnabled()) {
-            log.debug("registerTenantEventListeners before");
-        }
-
-        eventListenerns.startTenantEventReceiver();
-
-        if (log.isDebugEnabled()) {
-            log.debug("registerTenantEventListeners after");
-        }
-    }
-
-    protected void registerApplicationEventListeners() {
-        if (log.isDebugEnabled()) {
-            log.debug("registerApplicationListeners before");
-        }
-
-        eventListenerns.startApplicationsEventReceiver();
-
-        if (log.isDebugEnabled()) {
-            log.debug("registerApplicationEventListeners after");
-        }
-    }
+//    protected void registerTenantEventListeners() {
+//        if (log.isDebugEnabled()) {
+//            log.debug("registerTenantEventListeners before");
+//        }
+//
+//        eventListenerns.startTenantEventReceiver();
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("registerTenantEventListeners after");
+//        }
+//    }
+
+//    protected void registerApplicationEventListeners() {
+//        if (log.isDebugEnabled()) {
+//            log.debug("registerApplicationListeners before");
+//        }
+//
+//        eventListenerns.startApplicationsEventReceiver();
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("registerApplicationEventListeners after");
+//        }
+//    }
 
     protected void validateRequiredSystemProperties() {
         String jndiPropertiesDir = System.getProperty(CartridgeAgentConstants.JNDI_PROPERTIES_DIR);

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index e6bb41b..103d2c7 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -72,11 +72,10 @@ public class CartridgeAgentEventListeners {
         if (log.isDebugEnabled()) {
             log.debug("Creating cartridge agent event listeners...");
         }
-        this.applicationsEventReceiver = new ApplicationSignUpEventReceiver();
-        this.applicationsEventReceiver.setExecutorService(eventListenerExecutorService);
+        this.applicationsEventReceiver = ApplicationSignUpEventReceiver.getInstance();
 
-        this.topologyEventReceiver = new TopologyEventReceiver();
-        this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+        this.topologyEventReceiver = TopologyEventReceiver.getInstance();
+        //this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
 
         this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
 
@@ -151,24 +150,24 @@ public class CartridgeAgentEventListeners {
 
     }
 
-    public void startApplicationsEventReceiver() {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Starting cartridge agent application event message receiver");
-        }
-
-        eventListenerExecutorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                applicationsEventReceiver.execute();
-            }
-        });
-
-        if (log.isInfoEnabled()) {
-            log.info("Cartridge agent application receiver thread started, waiting for event messages ...");
-        }
-
-    }
+//    public void startApplicationsEventReceiver() {
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("Starting cartridge agent application event message receiver");
+//        }
+//
+//        eventListenerExecutorService.submit(new Runnable() {
+//            @Override
+//            public void run() {
+//                applicationsEventReceiver.execute();
+//            }
+//        });
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Cartridge agent application receiver thread started, waiting for event messages ...");
+//        }
+//
+//    }
 
 
     private void addInstanceNotifierEventListeners() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index 0427eb6..a501507 100644
--- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -122,9 +122,9 @@ public class JavaCartridgeAgentTest {
         String agentHome = setupJavaAgent();
 
         ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", 5);
-        topologyEventReceiver = new TopologyEventReceiver();
-        topologyEventReceiver.setExecutorService(executorService);
-        topologyEventReceiver.execute();
+        topologyEventReceiver = TopologyEventReceiver.getInstance();
+        //topologyEventReceiver.setExecutorService(executorService);
+        //topologyEventReceiver.execute();
 
         instanceStatusEventReceiver = new InstanceStatusEventReceiver();
         instanceStatusEventReceiver.setExecutorService(executorService);

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
index d5819dc..95c4867 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonApplicationSignUpEventReceiver.java
@@ -41,19 +41,20 @@ import org.apache.stratos.messaging.message.receiver.application.signup.Applicat
  * Load balancer common application signup event receiver updates the topology in the given topology provider
  * with the hostnames found in application signup events.
  */
-public class LoadBalancerCommonApplicationSignUpEventReceiver extends ApplicationSignUpEventReceiver {
+public class LoadBalancerCommonApplicationSignUpEventReceiver {
 
     private static final Log log = LogFactory.getLog(LoadBalancerCommonApplicationSignUpEventReceiver.class);
-
+    private ApplicationSignUpEventReceiver applicationSignUpEventReceiver;
     private TopologyProvider topologyProvider;
 
     public LoadBalancerCommonApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
+        this.applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance();
         this.topologyProvider = topologyProvider;
         addEventListeners();
     }
 
     private void addEventListeners() {
-        addEventListener(new CompleteApplicationSignUpsEventListener() {
+        applicationSignUpEventReceiver.addEventListener(new CompleteApplicationSignUpsEventListener() {
             private boolean initialized = false;
 
             @Override
@@ -96,7 +97,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio
             }
         });
 
-        addEventListener(new ApplicationSignUpAddedEventListener() {
+        applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpAddedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {
@@ -110,7 +111,7 @@ public class LoadBalancerCommonApplicationSignUpEventReceiver extends Applicatio
             }
         });
 
-        addEventListener(new ApplicationSignUpRemovedEventListener() {
+        applicationSignUpEventReceiver.addEventListener(new ApplicationSignUpRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 4fb45a9..85142e3 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -38,15 +38,17 @@ import java.util.Properties;
  * Load balancer common topology receiver updates the topology in the given topology provider
  * according to topology events.
  */
-public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiver {
+public class LoadBalancerCommonTopologyEventReceiver {
 
     private static final Log log = LogFactory.getLog(LoadBalancerCommonTopologyEventReceiver.class);
 
     private TopologyProvider topologyProvider;
     private boolean initialized;
+    private TopologyEventReceiver topologyEventReceiver;
 
     public LoadBalancerCommonTopologyEventReceiver(TopologyProvider topologyProvider) {
         this.topologyProvider = topologyProvider;
+        this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
@@ -57,12 +59,12 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
         }
     }
 
-    public void execute() {
-        super.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Load balancer topology receiver thread started");
-        }
-    }
+//    public void execute() {
+//        super.execute();
+//        if (log.isInfoEnabled()) {
+//            log.info("Load balancer topology receiver thread started");
+//        }
+//    }
 
     public void initializeTopology() {
         if (initialized) {
@@ -115,7 +117,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
      * Add default event listeners for updating the topology on topology events
      */
     public void addEventListeners() {
-        addEventListener(new CompleteTopologyEventListener() {
+        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             @Override
             protected void onEvent(Event event) {
                 if (!initialized) {
@@ -124,7 +126,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
         });
 
-        addEventListener(new MemberActivatedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
 
@@ -142,11 +144,10 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
                     if (networkPartitionIdFilter != null && !networkPartitionIdFilter.equals("")) {
                         if (memberActivatedEvent.getNetworkPartitionId().equals(networkPartitionIdFilter)) {
                             addMember(serviceName, clusterId, memberId);
-                        }
-                        else{
+                        } else {
                             log.debug(String.format("Member exists in a different network partition." +
-                                    "[member id] %s [member network partition] %s [filter network partition] %s ",
-                                    memberId,memberActivatedEvent.getNetworkPartitionId(),networkPartitionIdFilter));
+                                            "[member id] %s [member network partition] %s [filter network partition] %s ",
+                                    memberId, memberActivatedEvent.getNetworkPartitionId(), networkPartitionIdFilter));
                         }
                     } else {
                         addMember(serviceName, clusterId, memberId);
@@ -159,7 +160,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
         });
 
-        addEventListener(new MemberMaintenanceListener() {
+        topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
             @Override
             protected void onEvent(Event event) {
 
@@ -181,7 +182,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
         });
 
-        addEventListener(new MemberSuspendedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
             @Override
             protected void onEvent(Event event) {
 
@@ -203,7 +204,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
         });
 
-        addEventListener(new MemberTerminatedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
 
@@ -224,7 +225,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
         });
 
-        addEventListener(new ClusterRemovedEventListener() {
+        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
 
@@ -253,7 +254,7 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
         });
 
-        addEventListener(new ServiceRemovedEventListener() {
+        topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index ae2b6dd..e7a2071 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -201,8 +201,6 @@ public class LoadBalancerExtension {
      */
     private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
         applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
-        applicationSignUpEventReceiver.setExecutorService(executorService);
-        applicationSignUpEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Application signup event receiver thread started");
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index a7761cd..cb74984 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -186,8 +186,8 @@ public class LoadBalancerServiceComponent {
         }
 
         applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
-        applicationSignUpEventReceiver.setExecutorService(executorService);
-        applicationSignUpEventReceiver.execute();
+//        applicationSignUpEventReceiver.setExecutorService(executorService);
+//        applicationSignUpEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Application signup event receiver thread started");
         }
@@ -266,13 +266,13 @@ public class LoadBalancerServiceComponent {
         }
 
         // Terminate application signup event receiver
-        if (applicationSignUpEventReceiver != null) {
-            try {
-                applicationSignUpEventReceiver.terminate();
-            } catch (Exception e) {
-                log.warn("An error occurred while terminating application signup event receiver", e);
-            }
-        }
+//        if (applicationSignUpEventReceiver != null) {
+//            try {
+//                applicationSignUpEventReceiver.terminate();
+//            } catch (Exception e) {
+//                log.warn("An error occurred while terminating application signup event receiver", e);
+//            }
+//        }
 
         // Terminate domain mapping event receiver
         if (domainMappingEventReceiver != null) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 47f401a..76d39a7 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -211,8 +211,8 @@ public class StratosManagerServiceComponent {
      */
     private void initializeTopologyEventReceiver() {
         topologyEventReceiver = new StratosManagerTopologyEventReceiver();
-        topologyEventReceiver.setExecutorService(executorService);
-        topologyEventReceiver.execute();
+//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.execute();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
index 08ca3d6..51b21ac 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerTopologyEventReceiver.java
@@ -23,19 +23,19 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 
-public class StratosManagerTopologyEventReceiver extends TopologyEventReceiver {
+public class StratosManagerTopologyEventReceiver {
 
     private static final Log log = LogFactory.getLog(StratosManagerTopologyEventReceiver.class);
 
     public StratosManagerTopologyEventReceiver() {
     }
 
-    @Override
-    public void execute() {
-        super.execute();
-
-        if (log.isInfoEnabled()) {
-            log.info("Stratos manager topology event receiver thread started");
-        }
-    }
+//    @Override
+//    public void execute() {
+//        super.execute();
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Stratos manager topology event receiver thread started");
+//        }
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
new file mode 100644
index 0000000..5ac89e6
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver;
+
+import java.util.concurrent.ExecutorService;
+
+public class StratosEventReceiver {
+
+    protected ExecutorService executorService;
+
+    public StratosEventReceiver () {
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index 55e3fd1..dde214d 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.application.signup;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.event.initializer.CompleteApplicationSignUpsRequestEvent;
 import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
 import java.util.concurrent.ExecutorService;
@@ -33,26 +35,41 @@ import java.util.concurrent.ExecutorService;
 /**
  * Application signup event receiver.
  */
-public class ApplicationSignUpEventReceiver {
+public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
 
     private static final Log log = LogFactory.getLog(ApplicationSignUpEventReceiver.class);
 
     private ApplicationSignUpEventMessageDelegator messageDelegator;
     private ApplicationSignUpEventMessageListener messageListener;
     private EventSubscriber eventSubscriber;
-    private ExecutorService executorService;
+    private static volatile ApplicationSignUpEventReceiver instance;
 
-    public ApplicationSignUpEventReceiver() {
+    private ApplicationSignUpEventReceiver() {
+        // TODO: make pool size configurable
+        this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
         ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
         this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
         this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
+        execute();
+    }
+
+    public static ApplicationSignUpEventReceiver getInstance () {
+        if (instance == null) {
+            synchronized (ApplicationSignUpEventReceiver.class) {
+                if (instance == null) {
+                    instance = new ApplicationSignUpEventReceiver();
+                }
+            }
+        }
+
+        return instance;
     }
 
     public void addEventListener(EventListener eventListener) {
         messageDelegator.addEventListener(eventListener);
     }
 
-    public void execute() {
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(),
@@ -103,11 +120,11 @@ public class ApplicationSignUpEventReceiver {
         messageDelegator.terminate();
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
 
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index b841d0a..50e078a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -21,11 +21,13 @@ package org.apache.stratos.messaging.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.event.initializer.CompleteTopologyRequestEvent;
 import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
 import java.util.concurrent.ExecutorService;
@@ -34,26 +36,41 @@ import java.util.concurrent.ExecutorService;
  * A thread for receiving topology information from message broker and
  * build topology in topology manager.
  */
-public class TopologyEventReceiver {
+public class TopologyEventReceiver extends StratosEventReceiver {
 
     private static final Log log = LogFactory.getLog(TopologyEventReceiver.class);
 
     private TopologyEventMessageDelegator messageDelegator;
     private TopologyEventMessageListener messageListener;
     private EventSubscriber eventSubscriber;
-    private ExecutorService executorService;
+    private static volatile TopologyEventReceiver instance;
 
-    public TopologyEventReceiver() {
+    private TopologyEventReceiver() {
+        // TODO: make pool size configurable
+        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
         this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
         this.messageListener = new TopologyEventMessageListener(messageQueue);
+        execute();
+    }
+
+    public static TopologyEventReceiver getInstance () {
+        if (instance == null) {
+            synchronized (TopologyEventReceiver.class) {
+                if (instance == null) {
+                    instance = new TopologyEventReceiver();
+                }
+            }
+        }
+
+        return instance;
     }
 
     public void addEventListener(EventListener eventListener) {
         messageDelegator.addEventListener(eventListener);
     }
 
-    public void execute() {
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener);
@@ -101,11 +118,11 @@ public class TopologyEventReceiver {
         });
     }
 
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/033ab1f6/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index 7b31861..c752f9e 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -108,7 +108,7 @@ public class MockInstance implements Serializable {
     }
 
     private void startTopologyEventReceiver() {
-        topologyEventReceiver = new TopologyEventReceiver();
+        topologyEventReceiver = TopologyEventReceiver.getInstance();
         topologyEventReceiver.addEventListener(new MemberInitializedEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -151,8 +151,8 @@ public class MockInstance implements Serializable {
                 }
             }
         });
-        topologyEventReceiver.setExecutorService(eventListenerExecutorService);
-        topologyEventReceiver.execute();
+//        topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+//        topologyEventReceiver.execute();
         if (log.isDebugEnabled()) {
             log.debug(String.format(
                     "Mock instance topology event message receiver started for mock member [member-id] %s",