You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:19 UTC

[02/19] stratos git commit: making TopologyEventReceiver a singleton and fixing references

making TopologyEventReceiver a singleton and fixing references


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/20560136
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/20560136
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/20560136

Branch: refs/heads/stratos-4.1.x
Commit: 20560136a3ae5ca85959c4da9d544485837be4e5
Parents: 5ce932c
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Wed Dec 2 17:28:00 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 17 11:33:49 2015 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 24 +++----
 .../internal/AutoscalerServiceComponent.java    | 18 ++---
 .../stratos/cartridge/agent/CartridgeAgent.java | 28 ++++----
 .../agent/CartridgeAgentEventListeners.java     | 72 ++++++++++----------
 .../extension/api/LoadBalancerExtension.java    | 33 ++++-----
 .../internal/LoadBalancerServiceComponent.java  | 24 +++----
 .../service/MetadataTopologyEventReceiver.java  | 35 +++++-----
 .../service/registry/MetadataApiRegistry.java   |  8 +--
 .../cep/extension/CEPTopologyEventReceiver.java | 20 +++---
 .../extension/FaultHandlingWindowProcessor.java | 10 +--
 .../cep/extension/CEPTopologyEventReceiver.java | 20 +++---
 .../extension/FaultHandlingWindowProcessor.java | 10 +--
 .../tests/PythonAgentIntegrationTest.java       |  6 +-
 .../integration/common/TopologyHandler.java     | 12 ++--
 14 files changed, 163 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 6fd64a7..daa70ae 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -510,16 +510,16 @@ public class AutoscalerTopologyEventReceiver {
     /**
      * Terminate load balancer topology receiver thread.
      */
-    public void terminate() {
-        topologyEventReceiver.terminate();
-        terminated = true;
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public void terminate() {
+//        topologyEventReceiver.terminate();
+//        terminated = true;
+//    }
+//
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 76844a0..4d4c54f 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -173,8 +173,8 @@ public class AutoscalerServiceComponent {
 
         // Start topology receiver
         asTopologyReceiver = new AutoscalerTopologyEventReceiver();
-        asTopologyReceiver.setExecutorService(executorService);
-        asTopologyReceiver.execute();
+//        asTopologyReceiver.setExecutorService(executorService);
+        //asTopologyReceiver.execute();
         if (log.isDebugEnabled()) {
             log.debug("Topology receiver executor service started");
         }
@@ -245,13 +245,13 @@ public class AutoscalerServiceComponent {
     }
 
     protected void deactivate(ComponentContext context) {
-        if (asTopologyReceiver != null) {
-            try {
-                asTopologyReceiver.terminate();
-            } catch (Exception e) {
-                log.warn("An error occurred while terminating autoscaler topology event receiver", e);
-            }
-        }
+//        if (asTopologyReceiver != null) {
+//            try {
+//                asTopologyReceiver.terminate();
+//            } catch (Exception e) {
+//                log.warn("An error occurred while terminating autoscaler topology event receiver", e);
+//            }
+//        }
 
         if (autoscalerHealthStatEventReceiver != null) {
             try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 18e6e0a..b0bf326 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -60,10 +60,10 @@ public class CartridgeAgent implements Runnable {
         }
 
         // Start topology event receiver thread
-        registerTopologyEventListeners();
-        if (log.isInfoEnabled()) {
-            log.info("Cartridge agent registerTopologyEventListeners done");
-        }
+//        registerTopologyEventListeners();
+//        if (log.isInfoEnabled()) {
+//            log.info("Cartridge agent registerTopologyEventListeners done");
+//        }
 
         if (log.isInfoEnabled()) {
             log.info("Waiting for CompleteTopologyEvent..");
@@ -186,16 +186,16 @@ public class CartridgeAgent implements Runnable {
         }
     }
 
-    protected void registerTopologyEventListeners() {
-        if (log.isDebugEnabled()) {
-            log.debug("registerTopologyEventListeners before");
-        }
-        eventListenerns.startTopologyEventReceiver();
-
-        if (log.isDebugEnabled()) {
-            log.debug("registerTopologyEventListeners after");
-        }
-    }
+//    protected void registerTopologyEventListeners() {
+//        if (log.isDebugEnabled()) {
+//            log.debug("registerTopologyEventListeners before");
+//        }
+//        eventListenerns.startTopologyEventReceiver();
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("registerTopologyEventListeners after");
+//        }
+//    }
 
 //    protected void registerTenantEventListeners() {
 //        if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index 103d2c7..1d64ff0 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -94,24 +94,24 @@ public class CartridgeAgentEventListeners {
         }
     }
 
-    public void startTopologyEventReceiver() {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Starting cartridge agent topology event message receiver");
-        }
-
-        eventListenerExecutorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                topologyEventReceiver.execute();
-            }
-        });
-
-        if (log.isInfoEnabled()) {
-            log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
-        }
-
-    }
+//    public void startTopologyEventReceiver() {
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("Starting cartridge agent topology event message receiver");
+//        }
+//
+//        eventListenerExecutorService.submit(new Runnable() {
+//            @Override
+//            public void run() {
+//                topologyEventReceiver.execute();
+//            }
+//        });
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Cartridge agent topology receiver thread started, waiting for event messages ...");
+//        }
+//
+//    }
 
     public void startInstanceNotifierReceiver() {
 
@@ -131,24 +131,24 @@ public class CartridgeAgentEventListeners {
         }
     }
 
-    public void startTenantEventReceiver() {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Starting cartridge agent tenant event message receiver");
-        }
-
-        eventListenerExecutorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                topologyEventReceiver.execute();
-            }
-        });
-
-        if (log.isInfoEnabled()) {
-            log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
-        }
-
-    }
+//    public void startTenantEventReceiver() {
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("Starting cartridge agent tenant event message receiver");
+//        }
+//
+//        eventListenerExecutorService.submit(new Runnable() {
+//            @Override
+//            public void run() {
+//                topologyEventReceiver.execute();
+//            }
+//        });
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Cartridge agent tenant receiver thread started, waiting for event messages ...");
+//        }
+//
+//    }
 
 //    public void startApplicationsEventReceiver() {
 //

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e7a2071..d2a8cb3 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -39,6 +39,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 
 import java.util.concurrent.ExecutorService;
 
@@ -123,8 +124,8 @@ public class LoadBalancerExtension {
         addTopologyEventListeners(topologyEventReceiver);
         // Add default topology provider event listeners
         topologyEventReceiver.addEventListeners();
-        topologyEventReceiver.setExecutorService(executorService);
-        topologyEventReceiver.execute();
+//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Topology receiver thread started");
         }
@@ -212,12 +213,12 @@ public class LoadBalancerExtension {
      * @param topologyEventReceiver topology event receiver instance
      */
     private void addTopologyEventListeners(final LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) {
-        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new CompleteTopologyEventListener() {
 
             @Override
             protected void onEvent(Event event) {
                 try {
-                  if (!loadBalancerStarted) {
+                    if (!loadBalancerStarted) {
                         configureAndStart();
                     }
                 } catch (Exception e) {
@@ -228,37 +229,37 @@ public class LoadBalancerExtension {
                 }
             }
         });
-        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 reloadConfiguration();
             }
         });
-        topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new MemberSuspendedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 reloadConfiguration();
             }
         });
-        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 reloadConfiguration();
             }
         });
-        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new ClusterRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 reloadConfiguration();
             }
         });
-        topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 reloadConfiguration();
             }
         });
-        topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
+        TopologyEventReceiver.getInstance().addEventListener(new MemberMaintenanceListener() {
             @Override
             protected void onEvent(Event event) {
                 reloadConfiguration();
@@ -338,12 +339,12 @@ public class LoadBalancerExtension {
      * Stop load balancer instance.
      */
     public void stop() {
-        try {
-            if (topologyEventReceiver != null) {
-                topologyEventReceiver.terminate();
-            }
-        } catch (Exception ignore) {
-        }
+//        try {
+//            if (topologyEventReceiver != null) {
+//                topologyEventReceiver.terminate();
+//            }
+//        } catch (Exception ignore) {
+//        }
 
         try {
             if (statisticsNotifier != null) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index cb74984..442686a 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -199,11 +199,11 @@ public class LoadBalancerServiceComponent {
         }
 
         topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
-        topologyEventReceiver.setExecutorService(executorService);
-        topologyEventReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Topology receiver thread started");
-        }
+//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.execute();
+//        if (log.isInfoEnabled()) {
+//            log.info("Topology receiver thread started");
+//        }
 
         if (log.isInfoEnabled()) {
             if (TopologyServiceFilter.getInstance().isActive()) {
@@ -257,13 +257,13 @@ public class LoadBalancerServiceComponent {
         }
 
         // Terminate topology receiver
-        if (topologyEventReceiver != null) {
-            try {
-                topologyEventReceiver.terminate();
-            } catch (Exception e) {
-                log.warn("An error occurred while terminating topology event receiver", e);
-            }
-        }
+//        if (topologyEventReceiver != null) {
+//            try {
+//                topologyEventReceiver.terminate();
+//            } catch (Exception e) {
+//                log.warn("An error occurred while terminating topology event receiver", e);
+//            }
+//        }
 
         // Terminate application signup event receiver
 //        if (applicationSignUpEventReceiver != null) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
index e516271..f16282d 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java
@@ -41,8 +41,9 @@ public class MetadataTopologyEventReceiver {
     private ExecutorService executorService;
 
     public MetadataTopologyEventReceiver() {
-        this.topologyEventReceiver = new TopologyEventReceiver();
-        executorService = StratosThreadPool.getExecutorService(Constants.METADATA_SERVICE_THREAD_POOL_ID, 20);
+        this.topologyEventReceiver = TopologyEventReceiver.getInstance();
+//        //executorService = StratosThreadPool.getExecutorService(Constants
+//                .METADATA_SERVICE_THREAD_POOL_ID, 20);
         addEventListeners();
     }
 
@@ -67,21 +68,21 @@ public class MetadataTopologyEventReceiver {
         });
     }
 
-    public void execute() {
-        topologyEventReceiver.setExecutorService(getExecutorService());
-        topologyEventReceiver.execute();
-
-        if (log.isInfoEnabled()) {
-            log.info("Metadata service topology receiver started.");
-        }
-    }
-
-    public void terminate() {
-        topologyEventReceiver.terminate();
-        if (log.isInfoEnabled()) {
-            log.info("Metadata service topology receiver stopped.");
-        }
-    }
+//    public void execute() {
+//        topologyEventReceiver.setExecutorService(getExecutorService());
+//        topologyEventReceiver.execute();
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Metadata service topology receiver started.");
+//        }
+//    }
+//
+//    public void terminate() {
+//        topologyEventReceiver.terminate();
+//        if (log.isInfoEnabled()) {
+//            log.info("Metadata service topology receiver stopped.");
+//        }
+//    }
 
     public ExecutorService getExecutorService() {
         return executorService;

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
index 75ddbc7..47fc600 100644
--- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
+++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java
@@ -56,7 +56,7 @@ public class MetadataApiRegistry implements DataStore {
 
     public MetadataApiRegistry() {
         metadataTopologyEventReceiver = new MetadataTopologyEventReceiver();
-        metadataTopologyEventReceiver.execute();
+//        metadataTopologyEventReceiver.execute();
 
         metadataApplicationEventReceiver = new MetadataApplicationEventReceiver();
         metadataApplicationEventReceiver.execute();
@@ -417,9 +417,9 @@ public class MetadataApiRegistry implements DataStore {
         return applicationIdToReadWriteLockMap;
     }
 
-    public void stopTopologyReceiver() {
-        metadataTopologyEventReceiver.terminate();
-    }
+//    public void stopTopologyReceiver() {
+//        metadataTopologyEventReceiver.terminate();
+//    }
 
     public void stopApplicationReceiver() {
         metadataApplicationEventReceiver.terminate();

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 59c70c5..2696271 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 /**
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
-public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+public class CEPTopologyEventReceiver {
 
     private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
 
     private FaultHandlingWindowProcessor faultHandler;
+    private TopologyEventReceiver topologyEventReceiver;
 
     public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
+        this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-    @Override
-    public void execute() {
-        super.execute();
-        log.info("CEP topology event receiver thread started");
-    }
+//    @Override
+//    public void execute() {
+//        super.execute();
+//        log.info("CEP topology event receiver thread started");
+//    }
 
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
-        addEventListener(new CompleteTopologyEventListener() {
+        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             private boolean initialized;
 
             @Override
@@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
         });
 
         // Remove member from the time stamp map when MemberTerminated event is received.
-        addEventListener(new MemberTerminatedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
@@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
         });
 
         // Add member to time stamp map whenever member is activated
-        addEventListener(new MemberActivatedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 0526f6a..9e98288 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -286,10 +286,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
         MemberFaultEventMap
                 .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-        executorService = StratosThreadPool
-                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
-        cepTopologyEventReceiver.setExecutorService(executorService);
-        cepTopologyEventReceiver.execute();
+//        executorService = StratosThreadPool
+//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+//        cepTopologyEventReceiver.setExecutorService(executorService);
+//        cepTopologyEventReceiver.execute();
 
         //Ordinary scheduling
         window.schedule();
@@ -329,7 +329,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     @Override
     public void destroy() {
         // terminate topology listener thread
-        cepTopologyEventReceiver.terminate();
+//        cepTopologyEventReceiver.terminate();
         window = null;
 
         // Shutdown executor service

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 59c70c5..2696271 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 /**
  * CEP Topology Receiver for Fault Handling Window Processor.
  */
-public class CEPTopologyEventReceiver extends TopologyEventReceiver {
+public class CEPTopologyEventReceiver {
 
     private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
 
     private FaultHandlingWindowProcessor faultHandler;
+    private TopologyEventReceiver topologyEventReceiver;
 
     public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
         this.faultHandler = faultHandler;
+        this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         addEventListeners();
     }
 
-    @Override
-    public void execute() {
-        super.execute();
-        log.info("CEP topology event receiver thread started");
-    }
+//    @Override
+//    public void execute() {
+//        super.execute();
+//        log.info("CEP topology event receiver thread started");
+//    }
 
     private void addEventListeners() {
         // Load member time stamp map from the topology as a one time task
-        addEventListener(new CompleteTopologyEventListener() {
+        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
             private boolean initialized;
 
             @Override
@@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
         });
 
         // Remove member from the time stamp map when MemberTerminated event is received.
-        addEventListener(new MemberTerminatedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
@@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver {
         });
 
         // Add member to time stamp map whenever member is activated
-        addEventListener(new MemberActivatedEventListener() {
+        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 2fdce19..c3951e4 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -279,10 +279,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
         MemberFaultEventMap
                 .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-        executorService = StratosThreadPool
-                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
-        cepTopologyEventReceiver.setExecutorService(executorService);
-        cepTopologyEventReceiver.execute();
+//        executorService = StratosThreadPool
+//                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+//        cepTopologyEventReceiver.setExecutorService(executorService);
+//        cepTopologyEventReceiver.execute();
 
         //Ordinary scheduling
         window.schedule();
@@ -322,7 +322,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     @Override
     public void destroy() {
         // terminate topology listener thread
-        cepTopologyEventReceiver.terminate();
+//        cepTopologyEventReceiver.terminate();
         window = null;
 
         // Shutdown executor service

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 649430f..f31583c 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -124,9 +124,9 @@ public abstract class PythonAgentIntegrationTest {
         }
 
         ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
-        topologyEventReceiver = new TopologyEventReceiver();
-        topologyEventReceiver.setExecutorService(executorService);
-        topologyEventReceiver.execute();
+        topologyEventReceiver = TopologyEventReceiver.getInstance();
+//        topologyEventReceiver.setExecutorService(executorService);
+//        topologyEventReceiver.execute();
 
         instanceStatusEventReceiver = new InstanceStatusEventReceiver();
         instanceStatusEventReceiver.setExecutorService(executorService);

http://git-wip-us.apache.org/repos/asf/stratos/blob/20560136/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
index a0cc928..5a199c7 100644
--- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
+++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java
@@ -108,9 +108,9 @@ public class TopologyHandler {
     }
 
     private void initializeApplicationSignUpEventReceiver() {
-        applicationSignUpEventReceiver = new ApplicationSignUpEventReceiver();
-        applicationSignUpEventReceiver.setExecutorService(executorService);
-        applicationSignUpEventReceiver.execute();
+        applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance();
+//        applicationSignUpEventReceiver.setExecutorService(executorService);
+//        applicationSignUpEventReceiver.execute();
     }
 
     private void initializeTenantEventReceiver() {
@@ -171,8 +171,8 @@ public class TopologyHandler {
      * Initialize Topology event receiver
      */
     private void initializeTopologyEventReceiver() {
-        topologyEventReceiver = new TopologyEventReceiver();
-        topologyEventReceiver.setExecutorService(executorService);
+        topologyEventReceiver = TopologyEventReceiver.getInstance();
+//        topologyEventReceiver.setExecutorService(executorService);
         topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -206,7 +206,7 @@ public class TopologyHandler {
                         clusterInstanceInactivateEvent.getClusterId()));
             }
         });
-        topologyEventReceiver.execute();
+        //topologyEventReceiver.execute();
     }
 
     /**