You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 15:56:17 UTC

[14/14] stratos git commit: adding comments for StratosEventReceiver abstraction, starting the event reseivers from messaging activator and adding shutdown for tenant, application and signup synchronizers

adding comments for StratosEventReceiver abstraction, starting the event reseivers from messaging activator and adding shutdown for tenant, application and signup synchronizers

Conflicts:
	components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
	components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c90eb9a7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c90eb9a7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c90eb9a7

Branch: refs/heads/master
Commit: c90eb9a728ca6b9f2ef33dbd6ceece635ddb77ab
Parents: 905e140
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Thu Dec 24 16:55:07 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 20:12:43 2015 +0530

----------------------------------------------------------------------
 .../CloudControllerServiceComponent.java        |  3 +
 .../common/threading/StratosThreadPool.java     | 35 +++++++++--
 .../StratosManagerServiceComponent.java         |  3 +
 .../internal/MessagingServiceComponent.java     | 21 ++++++-
 .../message/receiver/StratosEventReceiver.java  | 61 +++++++++++++++++++-
 .../application/ApplicationsEventReceiver.java  |  4 +-
 .../ApplicationSignUpEventMessageDelegator.java |  4 ++
 .../signup/ApplicationSignUpEventReceiver.java  |  6 +-
 .../ClusterStatusEventMessageDelegator.java     |  4 ++
 .../status/ClusterStatusEventReceiver.java      |  6 +-
 .../DomainMappingEventMessageDelegator.java     |  4 ++
 .../mapping/DomainMappingEventReceiver.java     |  6 +-
 .../stat/HealthStatEventMessageDelegator.java   |  4 ++
 .../health/stat/HealthStatEventReceiver.java    |  5 +-
 .../InitializerEventMessageDelegator.java       |  4 ++
 .../initializer/InitializerEventReceiver.java   |  7 ++-
 .../InstanceNotifierEventMessageDelegator.java  |  4 ++
 .../notifier/InstanceNotifierEventReceiver.java |  8 +--
 .../InstanceStatusEventMessageDelegator.java    |  4 ++
 .../status/InstanceStatusEventReceiver.java     |  6 +-
 .../tenant/TenantEventMessageDelegator.java     |  4 ++
 .../receiver/tenant/TenantEventReceiver.java    |  6 +-
 .../topology/TopologyEventMessageDelegator.java |  4 ++
 .../topology/TopologyEventReceiver.java         |  6 +-
 24 files changed, 186 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c30fc63..5b01330 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -229,5 +229,8 @@ public class CloudControllerServiceComponent {
         } catch (Exception e) {
             log.warn("An error occurred while closing cloud controller topology event publisher", e);
         }
+
+        // shutdown TopologyEventSync task
+        StratosThreadPool.shutdown(THREAD_POOL_ID);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 687cec2..8037ce3 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
 
 /**
  * Utility class for Stratos thread pool
@@ -37,7 +34,7 @@ public class StratosThreadPool {
     private static final Log log = LogFactory.getLog(StratosThreadPool.class);
 
     private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
-    private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+    private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<>();
     private static Object executorServiceMapLock = new Object();
     private static Object scheduledServiceMapLock = new Object();
 
@@ -84,4 +81,32 @@ public class StratosThreadPool {
         }
         return scheduledExecutorService;
     }
+
+    public static void shutdown (String identifier) {
+
+        ExecutorService executorService = executorServiceMap.get(identifier);
+        if (executorService == null) {
+            log.warn("No executor service found for id " + identifier + ", unable to shut down");
+            return;
+        }
+
+        // try to shut down gracefully
+        executorService.shutdown();
+        // wait 10 secs till terminated
+        try {
+            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " +
+                        "timeout, forcefully shutting down");
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            // interrupted, shutdown now
+            executorService.shutdownNow();
+        }
+
+        // remove from the map
+        executorServiceMap.remove(identifier);
+
+        log.info("Successfully shutdown thread pool associated with id: " + identifier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index aa7cc02..5bd3f76 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -328,5 +328,8 @@ public class StratosManagerServiceComponent {
         // Close event publisher connections to message broker
         EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
         EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
+
+        // shut down the scheduled thread pool
+        StratosThreadPool.shutdown(THREAD_POOL_ID);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
index c97125b..b582d56 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.internal;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
 import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
 import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
@@ -40,8 +42,20 @@ public class MessagingServiceComponent {
     private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);
 
     protected void activate(ComponentContext context) {
+        // activate all message receivers
         try {
-            log.info("Messaging Service bundle activated");
+            ApplicationSignUpEventReceiver.getInstance();
+            ApplicationsEventReceiver.getInstance();
+            ClusterStatusEventReceiver.getInstance();
+            DomainMappingEventReceiver.getInstance();
+            HealthStatEventReceiver.getInstance();
+            InitializerEventReceiver.getInstance();
+            TenantEventReceiver.getInstance();
+            TopologyEventReceiver.getInstance();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Messaging Service bundle activated");
+            }
         } catch (Exception e) {
             log.error("Could not activate Messaging Service component", e);
         }
@@ -58,7 +72,10 @@ public class MessagingServiceComponent {
             InitializerEventReceiver.getInstance().terminate();
             TenantEventReceiver.getInstance().terminate();
             TopologyEventReceiver.getInstance().terminate();
-            log.info("Messaging Service component is deactivated");
+            StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID);
+            if (log.isDebugEnabled()) {
+                log.debug("Messaging Service component is deactivated");
+            }
         } catch (Exception e) {
             log.error("Could not de-activate Messaging Service component", e);
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..8c29816 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -19,12 +19,71 @@
 
 package org.apache.stratos.messaging.message.receiver;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.listener.EventListener;
+
 import java.util.concurrent.ExecutorService;
 
-public class StratosEventReceiver {
+/**
+ * Abstraction for Event Receivers used in Stratos
+ */
+public abstract class StratosEventReceiver {
+
+    private static final Log log = LogFactory.getLog(StratosEventReceiver.class);
+
+    /**
+     * Thread pool information for all StratosEventReceiver implementations
+     */
+
+    public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = "stratos-event-receiver-pool";
+    private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = "stratos.event.receiver.pool.size";
 
+    // thread pool id
+    protected String threadPoolId;
+    // executor service used
     protected ExecutorService executorService;
+    // pool size
+    protected static int threadPoolSize = 15;
+
+    static {
+        // check if the thread pool size is given as a system parameter
+        String poolSize = System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE);
+        if (poolSize != null) {
+            try {
+                threadPoolSize = Integer.parseInt(poolSize);
+            } catch (NumberFormatException e) {
+                log.error("Invalid configuration found for StratosEventReceiver thread pool size", e);
+                threadPoolSize = 15;
+            }
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Number of threads used in pool " + STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize);
+        }
+    }
 
     public StratosEventReceiver () {
+        this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID;
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize);
     }
+
+    /**
+     * Adds an EventListener to this StratosEventReceiver instance
+     *
+     * @param eventListener EventListener instance to add
+     */
+    public abstract void addEventListener(EventListener eventListener);
+
+    /**
+     * Removed an EventListener from this StratosEventReceiver instance
+     *
+     * @param eventListener EventListener instance to remove
+     */
+    public abstract void removeEventListener(EventListener eventListener);
+
+    /**
+     * Terminates this StratosEventReceiver instance
+     */
+    public abstract void terminate();
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 69dba01..89dd73e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -38,8 +38,6 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
     private static volatile ApplicationsEventReceiver instance;
 
     private ApplicationsEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100);
         ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
         this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
         this.messageListener = new ApplicationsEventMessageListener(messageQueue);
@@ -66,7 +64,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
         messageDelegator.removeEventListener(eventListener);
     }
 
-    public void execute() {
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
index adf805d..59374bb 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ApplicationSignUpEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index df90cf9..5ad6070 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -43,8 +43,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
     private static volatile ApplicationSignUpEventReceiver instance;
 
     private ApplicationSignUpEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
         ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
         this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
         this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
@@ -67,6 +65,10 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
index 5c9c502..954d9be 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ClusterStatusEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index be42b43..9de351b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -39,8 +39,6 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
     private static volatile ClusterStatusEventReceiver instance;
 
     private ClusterStatusEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
         ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
         this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
         this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
@@ -51,6 +49,10 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     public static ClusterStatusEventReceiver getInstance () {
         if (instance == null) {
             synchronized (ClusterStatusEventReceiver.class) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
index fa783a9..03154f2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
@@ -46,6 +46,10 @@ class DomainMappingEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6de99c0..6c88f73 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -40,8 +40,6 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
     private static volatile DomainMappingEventReceiver instance;
 
     private DomainMappingEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
         DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
         this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
         this.messageListener = new DomainMappingEventMessageListener(messageQueue);
@@ -52,6 +50,10 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     public static DomainMappingEventReceiver getInstance () {
         if (instance == null) {
             synchronized (DomainMappingEventReceiver.class) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 2cde2a9..29fb47b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -48,6 +48,10 @@ class HealthStatEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index a9d2602..442bdb6 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -39,8 +39,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
     private static volatile HealthStatEventReceiver instance;
 
     private HealthStatEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
         HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
         this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
         this.messageListener = new HealthStatEventMessageListener(messageQueue);
@@ -63,6 +61,9 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
 
     private void execute() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
index ffd2ae4..baca350 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -41,6 +41,10 @@ public class InitializerEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 805a8bf..c7e5daf 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -26,7 +26,6 @@ import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
-import java.util.concurrent.ExecutorService;
 
 public class InitializerEventReceiver extends StratosEventReceiver {
     private static final Log log = LogFactory.getLog(InitializerEventReceiver.class);
@@ -38,8 +37,6 @@ public class InitializerEventReceiver extends StratosEventReceiver {
     //private ExecutorService executorService;
 
     private InitializerEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
         InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
         this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
         this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
@@ -62,6 +59,10 @@ public class InitializerEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 73ef9fe..b695db7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceNotifierEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index e0b8e9f..5bcd75a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -39,8 +39,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
     //private boolean terminated;
 
     private InstanceNotifierEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
         this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
         this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
@@ -63,6 +61,10 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -94,7 +96,5 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
     public synchronized void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        //terminated = true;
-        log.info("InstanceNotifierEventReceiver terminated");
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
index 9f754b0..e5df65e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceStatusEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a565ea9..3d9f793 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -38,8 +38,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
     private static volatile InstanceStatusEventReceiver instance;
 
     private InstanceStatusEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
         this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
         this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
@@ -62,6 +60,9 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
 
     private void execute() {
         try {
@@ -91,6 +92,5 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-       // terminated = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index c735d9b..cd8724c 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -48,6 +48,10 @@ class TenantEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index a52cb20..e30d3ab 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -42,8 +42,6 @@ public class TenantEventReceiver extends StratosEventReceiver {
     private static volatile TenantEventReceiver instance;
 
     private TenantEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
         TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
         this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
         this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -66,6 +64,10 @@ public class TenantEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 8508d91..d2664f4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -47,6 +47,10 @@ class TopologyEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index bfa3950..4f1f254 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -44,8 +44,6 @@ public class TopologyEventReceiver extends StratosEventReceiver {
     private static volatile TopologyEventReceiver instance;
 
     private TopologyEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
         this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
         this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -68,6 +66,10 @@ public class TopologyEventReceiver extends StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread