You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:26 UTC

[09/19] stratos git commit: fixing conflicts in StratosThreadPool

fixing conflicts in StratosThreadPool


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/82980e64
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/82980e64
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/82980e64

Branch: refs/heads/stratos-4.1.x
Commit: 82980e64a3b935a93e9cfad0916c32cc8bebc76b
Parents: 23741d8
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Fri Dec 4 05:22:40 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 17 11:39:26 2015 +0530

----------------------------------------------------------------------
 .../stratos/cartridge/agent/CartridgeAgent.java |  30 ++---
 .../agent/CartridgeAgentEventListeners.java     |  46 ++++----
 .../agent/test/JavaCartridgeAgentTest.java      |  10 +-
 .../CloudControllerServiceComponent.java        |   4 +-
 .../status/InstanceStatusTopicReceiver.java     |  24 ++--
 .../common/threading/StratosThreadPool.java     |   4 +-
 .../StratosManagerServiceComponent.java         |   4 +-
 ...ratosManagerInstanceStatusEventReceiver.java |  22 ++--
 .../notifier/InstanceNotifierEventReceiver.java | 109 +++++++++++++------
 .../status/InstanceStatusEventReceiver.java     |  44 +++++---
 .../mock/iaas/services/impl/MockInstance.java   |  32 +++---
 .../tests/PythonAgentIntegrationTest.java       |   6 +-
 .../integration/tests/adc/GitHookTestCase.java  |  22 ++--
 13 files changed, 208 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index b0bf326..c498caa 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -90,10 +90,10 @@ public class CartridgeAgent implements Runnable {
         }
 
         // Start instance notifier listener thread
-        registerInstanceNotifierEventListeners();
-        if (log.isInfoEnabled()) {
-            log.info("Cartridge agent registerInstanceNotifierEventListeners done");
-        }
+//        registerInstanceNotifierEventListeners();
+//        if (log.isInfoEnabled()) {
+//            log.info("Cartridge agent registerInstanceNotifierEventListeners done");
+//        }
 
         // Start tenant event receiver thread
         /*
@@ -174,17 +174,17 @@ public class CartridgeAgent implements Runnable {
         logPublisherManager.stop();
     }
 
-    protected void registerInstanceNotifierEventListeners() {
-        if (log.isDebugEnabled()) {
-            log.debug("SsubscribeToTopicsAndRegisterListeners before");
-        }
-
-        eventListenerns.startInstanceNotifierReceiver();
-
-        if (log.isDebugEnabled()) {
-            log.debug("SsubscribeToTopicsAndRegisterListeners after");
-        }
-    }
+//    protected void registerInstanceNotifierEventListeners() {
+//        if (log.isDebugEnabled()) {
+//            log.debug("SsubscribeToTopicsAndRegisterListeners before");
+//        }
+//
+//        eventListenerns.startInstanceNotifierReceiver();
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("SsubscribeToTopicsAndRegisterListeners after");
+//        }
+//    }
 
 //    protected void registerTopologyEventListeners() {
 //        if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
index ffa3750..5954b76 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java
@@ -65,8 +65,8 @@ public class CartridgeAgentEventListeners {
     private ApplicationSignUpEventReceiver applicationsEventReceiver;
 
     private ExtensionHandler extensionHandler;
-    private static final ExecutorService eventListenerExecutorService =
-            StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10);
+//    private static final ExecutorService eventListenerExecutorService =
+//            StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10);
 
     public CartridgeAgentEventListeners() {
         if (log.isDebugEnabled()) {
@@ -77,7 +77,7 @@ public class CartridgeAgentEventListeners {
         this.topologyEventReceiver = TopologyEventReceiver.getInstance();
         //this.topologyEventReceiver.setExecutorService(eventListenerExecutorService);
 
-        this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
+        this.instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
 
         this.tenantEventReceiver = TenantEventReceiver.getInstance();
 //        this.tenantEventReceiver.setExecutorService(eventListenerExecutorService);
@@ -113,23 +113,23 @@ public class CartridgeAgentEventListeners {
 //
 //    }
 
-    public void startInstanceNotifierReceiver() {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Starting cartridge agent instance notifier event message receiver");
-        }
-
-        eventListenerExecutorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                instanceNotifierEventReceiver.execute();
-            }
-        });
-
-        if (log.isDebugEnabled()) {
-            log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ...");
-        }
-    }
+//    public void startInstanceNotifierReceiver() {
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("Starting cartridge agent instance notifier event message receiver");
+//        }
+//
+//        eventListenerExecutorService.submit(new Runnable() {
+//            @Override
+//            public void run() {
+//                instanceNotifierEventReceiver.execute();
+//            }
+//        });
+//
+//        if (log.isDebugEnabled()) {
+//            log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ...");
+//        }
+//    }
 
 //    public void startTenantEventReceiver() {
 //
@@ -521,9 +521,9 @@ public class CartridgeAgentEventListeners {
      * Terminate load balancer topology receiver thread.
      */
 
-    public void terminate() {
-        topologyEventReceiver.terminate();
-    }
+//    public void terminate() {
+//        topologyEventReceiver.terminate();
+//    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
index a501507..903fa58 100644
--- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
+++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java
@@ -126,9 +126,9 @@ public class JavaCartridgeAgentTest {
         //topologyEventReceiver.setExecutorService(executorService);
         //topologyEventReceiver.execute();
 
-        instanceStatusEventReceiver = new InstanceStatusEventReceiver();
-        instanceStatusEventReceiver.setExecutorService(executorService);
-        instanceStatusEventReceiver.execute();
+        instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
+//        instanceStatusEventReceiver.setExecutorService(executorService);
+//        instanceStatusEventReceiver.execute();
 
         instanceStarted = false;
         instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
@@ -191,8 +191,8 @@ public class JavaCartridgeAgentTest {
         } catch (Exception ignore) {
         }
 
-        this.instanceStatusEventReceiver.terminate();
-        this.topologyEventReceiver.terminate();
+        //this.instanceStatusEventReceiver.terminate();
+      //  this.topologyEventReceiver.terminate();
 
         this.instanceActivated = false;
         this.instanceStarted = false;

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c4c0336..267d5a8 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -162,8 +162,8 @@ public class CloudControllerServiceComponent {
         }
 
         instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
-        instanceStatusTopicReceiver.setExecutorService(executorService);
-        instanceStatusTopicReceiver.execute();
+//        instanceStatusTopicReceiver.setExecutorService(executorService);
+//        instanceStatusTopicReceiver.execute();
 
         if (log.isInfoEnabled()) {
             log.info("Instance status event receiver thread started");

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index e212fea..bfa205b 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -46,21 +46,21 @@ public class InstanceStatusTopicReceiver {
     private ExecutorService executorService;
 
     public InstanceStatusTopicReceiver() {
-        this.statusEventReceiver = new InstanceStatusEventReceiver();
+        this.statusEventReceiver = InstanceStatusEventReceiver.getInstance();
         addEventListeners();
     }
 
-    public void execute() {
-        statusEventReceiver.setExecutorService(executorService);
-        statusEventReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread started");
-        }
-
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread terminated");
-        }
-    }
+//    public void execute() {
+//        statusEventReceiver.setExecutorService(executorService);
+//        statusEventReceiver.execute();
+//        if (log.isInfoEnabled()) {
+//            log.info("Cloud controller application status thread started");
+//        }
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Cloud controller application status thread terminated");
+//        }
+//    }
 
     private void addEventListeners() {
         statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index c0ae8ae..687cec2 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -36,8 +36,8 @@ public class StratosThreadPool {
 
     private static final Log log = LogFactory.getLog(StratosThreadPool.class);
 
-    private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String, ExecutorService>();
-    private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+    private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
+    private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
     private static Object executorServiceMapLock = new Object();
     private static Object scheduledServiceMapLock = new Object();
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 573c19d..c4d68ae 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -202,8 +202,8 @@ public class StratosManagerServiceComponent {
      */
     private void initializeInstanceStatusEventReceiver() {
         instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver();
-        instanceStatusEventReceiver.setExecutorService(executorService);
-        instanceStatusEventReceiver.execute();
+//        instanceStatusEventReceiver.setExecutorService(executorService);
+//        instanceStatusEventReceiver.execute();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
index 1da448e..ab92d1b 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java
@@ -37,32 +37,34 @@ import java.util.List;
 /**
  * Stratos manager instance status event receiver.
  */
-public class StratosManagerInstanceStatusEventReceiver extends InstanceStatusEventReceiver {
+public class StratosManagerInstanceStatusEventReceiver  {
 
     private static final Log log = LogFactory.getLog(StratosManagerInstanceStatusEventReceiver.class);
 
     private ApplicationSignUpHandler signUpManager;
     private ArtifactDistributionCoordinator artifactDistributionCoordinator;
+    private InstanceStatusEventReceiver instanceStatusEventReceiver;
 
 
     public StratosManagerInstanceStatusEventReceiver() {
         signUpManager = new ApplicationSignUpHandler();
         artifactDistributionCoordinator = new ArtifactDistributionCoordinator();
+        instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
 
         addEventListeners();
     }
 
-    @Override
-    public void execute() {
-        super.execute();
-
-        if (log.isInfoEnabled()) {
-            log.info("Stratos manager instance status event receiver thread started");
-        }
-    }
+//    @Override
+//    public void execute() {
+//        super.execute();
+//
+//        if (log.isInfoEnabled()) {
+//            log.info("Stratos manager instance status event receiver thread started");
+//        }
+//    }
 
     private void addEventListeners() {
-        addEventListener(new InstanceStartedEventListener() {
+        instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
             @Override
             protected void onEvent(Event event) {
                 InstanceStartedEvent instanceStartedEvent = (InstanceStartedEvent) event;

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 4ad6572..cfc7f11 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -21,64 +21,107 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
 /**
  * A thread for receiving instance notifier information from message broker.
  */
-public class InstanceNotifierEventReceiver {
+public class InstanceNotifierEventReceiver extends StratosEventReceiver {
     private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class);
     private final InstanceNotifierEventMessageDelegator messageDelegator;
     private EventSubscriber eventSubscriber;
-    private boolean terminated;
+    private InstanceNotifierEventMessageListener messageListener;
+    private static volatile InstanceNotifierEventReceiver instance;
+    //private boolean terminated;
 
-    public InstanceNotifierEventReceiver() {
+    private InstanceNotifierEventReceiver() {
+        // TODO: make pool size configurable
+        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
         this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
-        InstanceNotifierEventMessageListener messageListener = new InstanceNotifierEventMessageListener(messageQueue);
+        messageListener = new InstanceNotifierEventMessageListener(messageQueue);
         // Start topic subscriber thread
         eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
                 messageListener);
+        execute();
+    }
+
+    public static InstanceNotifierEventReceiver getInstance () {
+        if (instance == null) {
+            synchronized (InstanceNotifierEventReceiver.class) {
+                if (instance == null) {
+                    instance = new InstanceNotifierEventReceiver();
+                }
+            }
+        }
+
+        return instance;
     }
 
     public void addEventListener(EventListener eventListener) {
         messageDelegator.addEventListener(eventListener);
     }
 
-    public void execute() {
-        synchronized (this) {
-            if (terminated) {
-                log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
-                return;
+//    public void execute() {
+//        synchronized (this) {
+//            if (terminated) {
+//                log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created.");
+//                return;
+//            }
+//            try {
+//                Thread subscriberThread = new Thread(eventSubscriber);
+//                subscriberThread.start();
+//                if (log.isDebugEnabled()) {
+//                    log.debug("InstanceNotifier event message receiver thread started");
+//                }
+//
+//                // Start instance notifier event message delegator thread
+//                Thread receiverThread = new Thread(messageDelegator);
+//                receiverThread.start();
+//                if (log.isDebugEnabled()) {
+//                    log.debug("InstanceNotifier event message delegator thread started");
+//                }
+//            } catch (Exception e) {
+//                if (log.isErrorEnabled()) {
+//                    log.error("InstanceNotifier receiver failed", e);
+//                }
+//            }
+//        }
+//        log.info("InstanceNotifierEventReceiver started");
+//
+//        // Keep the thread live until terminated
+//        while (!terminated) {
+//            try {
+//                Thread.sleep(2000);
+//            } catch (InterruptedException ignore) {
+//            }
+//        }
+//    }
+
+    private void execute() {
+        try {
+            // Start topic subscriber thread
+            eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(),
+                    messageListener);
+            executorService.execute(eventSubscriber);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Instance Notifier event message receiver thread started");
             }
-            try {
-                Thread subscriberThread = new Thread(eventSubscriber);
-                subscriberThread.start();
-                if (log.isDebugEnabled()) {
-                    log.debug("InstanceNotifier event message receiver thread started");
-                }
 
-                // Start instance notifier event message delegator thread
-                Thread receiverThread = new Thread(messageDelegator);
-                receiverThread.start();
-                if (log.isDebugEnabled()) {
-                    log.debug("InstanceNotifier event message delegator thread started");
-                }
-            } catch (Exception e) {
-                if (log.isErrorEnabled()) {
-                    log.error("InstanceNotifier receiver failed", e);
-                }
+            // Start topology event message delegator thread
+            executorService.execute(messageDelegator);
+            if (log.isDebugEnabled()) {
+                log.debug("Instance Notifier  event message delegator thread started");
             }
-        }
-        log.info("InstanceNotifierEventReceiver started");
 
-        // Keep the thread live until terminated
-        while (!terminated) {
-            try {
-                Thread.sleep(2000);
-            } catch (InterruptedException ignore) {
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Instance Notifier receiver failed", e);
             }
         }
     }
@@ -90,7 +133,7 @@ public class InstanceNotifierEventReceiver {
     public synchronized void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        terminated = true;
+        //terminated = true;
         log.info("InstanceNotifierEventReceiver terminated");
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index 41f444e..a2a1623 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -21,27 +21,41 @@ package org.apache.stratos.messaging.message.receiver.instance.status;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
-import java.util.concurrent.ExecutorService;
-
 /**
  * A thread for receiving instance notifier information from message broker.
  */
-public class InstanceStatusEventReceiver {
+public class InstanceStatusEventReceiver extends StratosEventReceiver {
     private static final Log log = LogFactory.getLog(InstanceStatusEventReceiver.class);
     private final InstanceStatusEventMessageDelegator messageDelegator;
     private final InstanceStatusEventMessageListener messageListener;
     private EventSubscriber eventSubscriber;
-    private boolean terminated;
-    private ExecutorService executorService;
+    private static volatile InstanceStatusEventReceiver instance;
 
-    public InstanceStatusEventReceiver() {
+    private InstanceStatusEventReceiver() {
+        // TODO: make pool size configurable
+        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
         this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
         this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
+        execute();
+    }
+
+    public static InstanceStatusEventReceiver getInstance () {
+        if (instance == null) {
+            synchronized (InstanceStatusEventReceiver.class) {
+                if (instance == null) {
+                    instance = new InstanceStatusEventReceiver();
+                }
+            }
+        }
+
+        return instance;
     }
 
     public void addEventListener(EventListener eventListener) {
@@ -49,7 +63,7 @@ public class InstanceStatusEventReceiver {
     }
 
 
-    public void execute() {
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
@@ -77,14 +91,14 @@ public class InstanceStatusEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        terminated = true;
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
+       // terminated = true;
     }
 
-    public void setExecutorService(ExecutorService executorService) {
-        this.executorService = executorService;
-    }
+//    public ExecutorService getExecutorService() {
+//        return executorService;
+//    }
+//
+//    public void setExecutorService(ExecutorService executorService) {
+//        this.executorService = executorService;
+//    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index c752f9e..9886335 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -161,7 +161,7 @@ public class MockInstance implements Serializable {
     }
 
     private void startInstanceNotifierEventReceiver() {
-        instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
+        instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
         instanceNotifierEventReceiver.addEventListener(new InstanceCleanupClusterEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -185,17 +185,17 @@ public class MockInstance implements Serializable {
         });
         // TODO: Fix InstanceNotifierEventReceiver to use executor service
         // do not remove this since execute() is a blocking call
-        eventListenerExecutorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                instanceNotifierEventReceiver.execute();
-            }
-        });
-        if (log.isDebugEnabled()) {
-            log.debug(String.format(
-                    "Mock instance instance notifier event message receiver started for mock member [member-id] %s",
-                    mockInstanceContext.getMemberId()));
-        }
+//        eventListenerExecutorService.submit(new Runnable() {
+//            @Override
+//            public void run() {
+//                instanceNotifierEventReceiver.execute();
+//            }
+//        });
+//        if (log.isDebugEnabled()) {
+//            log.debug(String.format(
+//                    "Mock instance instance notifier event message receiver started for mock member [member-id] %s",
+//                    mockInstanceContext.getMemberId()));
+//        }
     }
 
     private void handleMemberTermination() {
@@ -213,9 +213,9 @@ public class MockInstance implements Serializable {
         healthStatNotifierScheduledFuture.cancel(true);
     }
 
-    private void stopInstanceNotifierReceiver() {
-        instanceNotifierEventReceiver.terminate();
-    }
+//    private void stopInstanceNotifierReceiver() {
+//        instanceNotifierEventReceiver.terminate();
+//    }
 
     public MockInstanceContext getMockInstanceContext() {
         return mockInstanceContext;
@@ -223,7 +223,7 @@ public class MockInstance implements Serializable {
 
     public synchronized void terminate() {
         if (MemberStatus.Initialized.equals(memberStatus)) {
-            stopInstanceNotifierReceiver();
+            //stopInstanceNotifierReceiver();
             stopHealthStatisticsPublisher();
             memberStatus = MemberStatus.Terminated;
             if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index f31583c..308dde0 100644
--- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -128,9 +128,9 @@ public abstract class PythonAgentIntegrationTest {
 //        topologyEventReceiver.setExecutorService(executorService);
 //        topologyEventReceiver.execute();
 
-        instanceStatusEventReceiver = new InstanceStatusEventReceiver();
-        instanceStatusEventReceiver.setExecutorService(executorService);
-        instanceStatusEventReceiver.execute();
+        instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance();
+//        instanceStatusEventReceiver.setExecutorService(executorService);
+//        instanceStatusEventReceiver.execute();
 
         instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() {
             @Override

http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
----------------------------------------------------------------------
diff --git a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
index 3f708db..7412540 100644
--- a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
+++ b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java
@@ -56,19 +56,19 @@ public class GitHookTestCase extends StratosIntegrationTest {
     private static final String appPolicyId = "application-policy-git-hook-test";
     private static final String GIT_HOOK_ARTIFACT_FILENAME = "hook-req.json";
     private static final int ARTIFACT_UPDATED_EXPECTED_COUNT = 2;
-    private ExecutorService eventListenerExecutorService = StratosThreadPool
-            .getExecutorService("stratos.integration.test.git.thread.pool", 5);
+//    private ExecutorService eventListenerExecutorService = StratosThreadPool
+//            .getExecutorService("stratos.integration.test.git.thread.pool", 5);
 
     @Test(timeOut = DEFAULT_TEST_TIMEOUT)
     public void sendRepoNotify() throws Exception {
         deployArtifacts();
-        final InstanceNotifierEventReceiver instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
-        eventListenerExecutorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                instanceNotifierEventReceiver.execute();
-            }
-        });
+        final InstanceNotifierEventReceiver instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance();
+//        eventListenerExecutorService.submit(new Runnable() {
+//            @Override
+//            public void run() {
+//                instanceNotifierEventReceiver.execute();
+//            }
+//        });
 
         ArtifactUpdateEventListener artifactUpdateEventListener = new ArtifactUpdateEventListener() {
             @Override
@@ -86,8 +86,8 @@ public class GitHookTestCase extends StratosIntegrationTest {
             Thread.sleep(1000);
         }
         TopologyHandler.getInstance().assertApplicationActiveStatus(applicationId);
-        instanceNotifierEventReceiver.terminate();
-        eventListenerExecutorService.shutdownNow();
+        //instanceNotifierEventReceiver.terminate();
+       // eventListenerExecutorService.shutdownNow();
         undeployArtifacts();
     }