You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:44 UTC

[7/9] stratos git commit: Adding executor service for threads and remove unnecessary threads

Adding executor service for threads and remove unnecessary threads

Conflicts:
	components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4e733930
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4e733930
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4e733930

Branch: refs/heads/master
Commit: 4e733930ba33a87c78cd398a2573f6b545a8d9f0
Parents: ab1ed3c
Author: gayan <ga...@puppet.gayan.org>
Authored: Mon Dec 1 19:05:20 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:43:07 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerHealthStatEventReceiver.java      | 19 +++--
 .../internal/AutoscalerServerComponent.java     |  7 +-
 .../stratos/cartridge/agent/CartridgeAgent.java |  4 +-
 .../CloudControllerServiceComponent.java        | 28 ++++++++
 .../application/ApplicationTopicReceiver.java   | 12 ++++
 .../status/ClusterStatusTopicReceiver.java      | 12 ++++
 .../status/InstanceStatusTopicReceiver.java     | 10 +++
 components/org.apache.stratos.common/pom.xml    |  5 ++
 .../apache/stratos/common/util/ConfUtil.java    | 73 ++++++++++++++++++++
 .../internal/ADCManagementServerComponent.java  |  4 +-
 .../applications/ApplicationsEventReceiver.java | 26 ++++---
 .../status/ClusterStatusEventReceiver.java      | 28 ++++----
 .../health/stat/HealthStatEventReceiver.java    | 11 +++
 .../status/InstanceStatusEventReceiver.java     | 19 +++--
 14 files changed, 216 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 0e45ee1..9e440c7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -63,24 +63,25 @@ import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfReque
 import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
+import java.util.concurrent.ExecutorService;
 
 /**
  * A thread for processing topology messages and updating the topology data structure.
  */
-public class AutoscalerHealthStatEventReceiver implements Runnable {
+public class AutoscalerHealthStatEventReceiver {
 
     private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
     private boolean terminated = false;
 
     private HealthStatEventReceiver healthStatEventReceiver;
+	private ExecutorService executorService;
 
     public AutoscalerHealthStatEventReceiver() {
 		this.healthStatEventReceiver = new HealthStatEventReceiver();
         addEventListeners();
     }
 
-    @Override
-    public void run() {
+    public void execute() {
         //FIXME this activated before autoscaler deployer activated.
         try {
             Thread.sleep(15000);
@@ -92,11 +93,7 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
             log.info("Autoscaler health stat event receiver thread started");
         }
 
-        // Keep the thread live until terminated
 
-        if(log.isInfoEnabled()) {
-            log.info("Autoscaler health stat event receiver thread terminated");
-        }
     }
 
     private void addEventListeners() {
@@ -519,4 +516,12 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
     public void terminate() {
         this.terminated = true;
     }
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 2e443de..bb5e167 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -66,13 +66,14 @@ public class AutoscalerServerComponent {
 	private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
 	private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
 	private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
-	private static final String COMPONENTS_CONFIG = "components-config";
+	private static final String COMPONENTS_CONFIG = "stratos-config";
 	private static final int THREAD_POOL_SIZE = 10;
 	private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
 
 	private AutoscalerTopologyEventReceiver asTopologyReceiver;
 	private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
 
+
 	protected void activate(ComponentContext componentContext) throws Exception {
 <<<<<<< HEAD
         try {
@@ -214,8 +215,8 @@ public class AutoscalerServerComponent {
 
 			// Start health stat receiver
 			autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
-			Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
-			healthDelegatorThread.start();
+			autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+			autoscalerHealthStatEventReceiver.execute();
 			if (log.isDebugEnabled()) {
 				log.debug("Health statistics receiver thread started");
 			}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index e275db5..53fd658 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -415,8 +415,8 @@ public class CartridgeAgent implements Runnable {
             }
         });
 
-//        Thread thread = new Thread(topologyEventReceiver);
-//        thread.start();
+	    topologyEventReceiver.execute();
+
         if (log.isDebugEnabled()) {
             log.info("Cartridge Agent topology receiver thread started");
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 6773b4a..a413218 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -23,6 +23,7 @@ package org.apache.stratos.cloud.controller.internal;
 <<<<<<< HEAD
 <<<<<<< HEAD
 
+<<<<<<< HEAD
 import com.hazelcast.core.HazelcastInstance;
 
 =======
@@ -30,17 +31,30 @@ import com.hazelcast.core.HazelcastInstance;
 =======
 
 >>>>>>> ad3e45c... Remove unnessary threads in messaging model
+=======
+import org.apache.commons.configuration.XMLConfiguration;
+>>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+<<<<<<< HEAD
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
 import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
 import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
+=======
+import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
+import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
+import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
+import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
+import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.common.util.ConfUtil;
+>>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.Util;
 import org.osgi.framework.BundleContext;
@@ -52,6 +66,8 @@ import org.wso2.carbon.registry.core.service.RegistryService;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 import org.wso2.carbon.utils.ConfigurationContextService;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * Registering Cloud Controller Service.
  *
@@ -90,10 +106,21 @@ public class CloudControllerServiceComponent {
 	private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
 	private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
 	private ApplicationTopicReceiver applicationTopicReceiver;
+	private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
+	private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
+	private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
+	private static final String COMPONENTS_CONFIG = "stratos-config";
+	private static final int THREAD_POOL_SIZE = 10;
 
 	protected void activate(ComponentContext context) {
 		try {
+
+			XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+			int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+			String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+			ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
 			applicationTopicReceiver = new ApplicationTopicReceiver();
+			applicationTopicReceiver.setExecutorService(executorService);
 			applicationTopicReceiver.execute();
 
 			if (log.isInfoEnabled()) {
@@ -106,6 +133,7 @@ public class CloudControllerServiceComponent {
             }
 =======
 			clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
+			clusterStatusTopicReceiver.setExecutorService(executorService);
 			clusterStatusTopicReceiver.execute();
 >>>>>>> ddf277b... Remove unnessary threads in messaging model
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index d65b7f5..9df4f3a 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -26,6 +26,8 @@ import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEven
 import org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener;
 import org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * This is to receive the application topic messages.
  */
@@ -33,6 +35,7 @@ public class ApplicationTopicReceiver {
 	private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class);
 	private ApplicationsEventReceiver applicationsEventReceiver;
 	private boolean terminated;
+	private ExecutorService executorService;
 
 	public ApplicationTopicReceiver() {
 		this.applicationsEventReceiver = new ApplicationsEventReceiver();
@@ -46,6 +49,7 @@ public class ApplicationTopicReceiver {
 			log.info("Cloud controller application status thread started");
 		}
 		applicationsEventReceiver.execute();
+		applicationsEventReceiver.setExecutorService(executorService);
 
 		if (log.isInfoEnabled()) {
 			log.info("Cloud controller application status thread terminated");
@@ -69,4 +73,12 @@ public class ApplicationTopicReceiver {
 	public void setTerminated(boolean terminated) {
 		this.terminated = terminated;
 	}
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index ca6d4ad..d54063c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,14 +26,18 @@ import org.apache.stratos.messaging.event.cluster.status.*;
 import org.apache.stratos.messaging.listener.cluster.status.*;
 import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
 
+import java.util.concurrent.ExecutorService;
+
 public class ClusterStatusTopicReceiver {
 	private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
 
 	private ClusterStatusEventReceiver statusEventReceiver;
 	private boolean terminated;
+	private ExecutorService executorService;
 
 	public ClusterStatusTopicReceiver() {
 		this.statusEventReceiver = new ClusterStatusEventReceiver();
+		this.statusEventReceiver.setExecutorService(executorService);
 		addEventListeners();
 	}
 
@@ -101,4 +105,12 @@ public class ClusterStatusTopicReceiver {
 	public void setTerminated(boolean terminated) {
 		this.terminated = terminated;
 	}
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index 42aabed..dc21735 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -32,6 +32,8 @@ import org.apache.stratos.messaging.listener.instance.status.InstanceReadyToShut
 import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
 import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * This will handle the instance status events
  */
@@ -40,6 +42,7 @@ public class InstanceStatusTopicReceiver {
 
 	private InstanceStatusEventReceiver statusEventReceiver;
 	private boolean terminated;
+	private ExecutorService executorService;
 
 	public InstanceStatusTopicReceiver() {
 		this.statusEventReceiver = new InstanceStatusEventReceiver();
@@ -98,4 +101,11 @@ public class InstanceStatusTopicReceiver {
 
 	}
 
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 6c33f0d..6ed9228 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -97,6 +97,11 @@
             <version>3.1</version>
         </dependency>
         <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
             <groupId>org.wso2.carbon</groupId>
             <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
             <version>4.2.0</version>

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
new file mode 100644
index 0000000..7f9d665
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.stratos.common.util;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.utils.CarbonUtils;
+
+import java.io.File;
+
+/**
+ * This class contains utility methods for read Autoscaler configuration file.
+ */
+public class ConfUtil {
+
+	private static final String CONFIG_FILE_NAME ="stratos-config" ;
+	private static Log log = LogFactory.getLog(ConfUtil.class);
+
+    private XMLConfiguration config;
+
+    private static ConfUtil instance = null;
+
+    private ConfUtil(String configFilePath) {
+        log.info("Loading configuration.....");
+        try {
+
+            File confFile;
+            if (configFilePath != null && !configFilePath.isEmpty()) {
+                confFile = new File(configFilePath);
+
+            } else {
+                confFile = new File(CarbonUtils.getCarbonConfigDirPath(),CONFIG_FILE_NAME);
+            }
+
+            config = new XMLConfiguration(confFile);
+        } catch (ConfigurationException e) {
+            log.error("Unable to load autoscaler configuration file",e);
+            config = new XMLConfiguration();  // continue with default values
+        }
+    }
+
+    public static ConfUtil getInstance(String configFilePath) {
+        if (instance == null) {
+            instance = new ConfUtil (configFilePath);
+        }
+        return instance;
+    }
+
+    public XMLConfiguration getConfiguration(){
+        return config;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index efce585..e4ffccc 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -68,7 +68,7 @@ import java.util.concurrent.ExecutorService;
 public class ADCManagementServerComponent {
 
 	private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class);
-	private static final String STRATOS_MANAGER = "Stratos_manager";
+	private static final String IDENTIFIER = "Stratos_manager";
 	private static final int THREAD_POOL_SIZE = 20;
 	private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver;
 	private ExecutorService executorService;
@@ -76,7 +76,7 @@ public class ADCManagementServerComponent {
     protected void activate(ComponentContext componentContext) throws Exception {
 		try {
 			CartridgeConfigFileReader.readProperties();
-			executorService=StratosThreadPool.getExecutorService(STRATOS_MANAGER, THREAD_POOL_SIZE);
+			executorService=StratosThreadPool.getExecutorService(IDENTIFIER, THREAD_POOL_SIZE);
 			
             // Schedule complete tenant event synchronizer
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
index cc86c29..82d8c83 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
@@ -24,6 +24,8 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+
 public class ApplicationsEventReceiver {
     private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class);
 
@@ -31,6 +33,7 @@ public class ApplicationsEventReceiver {
     private ApplicationsEventMessageListener messageListener;
     private Subscriber subscriber;
     private boolean terminated;
+	private ExecutorService executorService;
 
     public ApplicationsEventReceiver() {
         ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
@@ -47,27 +50,20 @@ public class ApplicationsEventReceiver {
         try {
             // Start topic subscriber thread
             subscriber = new Subscriber(Util.Topics.APPLICATIONS_TOPIC.getTopicName(), messageListener);
+			executorService.execute(subscriber);
 
-            Thread subscriberThread = new Thread(subscriber);
-            subscriberThread.start();
             if (log.isDebugEnabled()) {
                 log.debug("Application status event message receiver thread started");
             }
 
             // Start Application status event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
+	        executorService.execute(messageDelegator);
+
             if (log.isDebugEnabled()) {
                 log.debug("Application status event message delegator thread started");
             }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
+
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Application status failed", e);
@@ -80,4 +76,12 @@ public class ApplicationsEventReceiver {
         messageDelegator.terminate();
         terminated = true;
     }
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 38184aa..a6de430 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -25,6 +25,9 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
 /**
  * A thread for receiving instance notifier information from message broker.
  */
@@ -34,6 +37,7 @@ public class ClusterStatusEventReceiver{
     private final ClusterStatusEventMessageListener messageListener;
     private Subscriber subscriber;
     private boolean terminated;
+	private ExecutorService executorService;
 
     public ClusterStatusEventReceiver() {
         ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
@@ -50,27 +54,19 @@ public class ClusterStatusEventReceiver{
         try {
             // Start topic subscriber thread
             subscriber = new Subscriber(Util.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
-//            subscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(subscriber);
-            subscriberThread.start();
+            executorService.execute(subscriber);
+
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message receiver thread started");
             }
 
             // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
+	        executorService.execute(messageDelegator);
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message delegator thread started");
             }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
+
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("InstanceNotifier receiver failed", e);
@@ -87,4 +83,12 @@ public class ClusterStatusEventReceiver{
         messageDelegator.terminate();
         terminated = true;
     }
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index 14c7346..d324c7e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -25,6 +25,8 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * A thread for receiving health stat information from message broker
  */
@@ -35,6 +37,7 @@ public class HealthStatEventReceiver {
 	private final HealthStatEventMessageListener messageListener;
 	private Subscriber subscriber;
 	private boolean terminated;
+	private ExecutorService executorService;
 
 	public HealthStatEventReceiver() {
 		HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
@@ -84,4 +87,12 @@ public class HealthStatEventReceiver {
 		messageDelegator.terminate();
 		terminated = true;
 	}
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a8f1d96..af9319f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -25,6 +25,8 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * A thread for receiving instance notifier information from message broker.
  */
@@ -34,6 +36,7 @@ public class InstanceStatusEventReceiver {
     private final InstanceStatusEventMessageListener messageListener;
     private Subscriber subscriber;
     private boolean terminated;
+	private ExecutorService executorService;
 
     public InstanceStatusEventReceiver() {
         InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
@@ -51,15 +54,13 @@ public class InstanceStatusEventReceiver {
             // Start topic subscriber thread
             subscriber = new Subscriber(Util.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
 //            subscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(subscriber);
-            subscriberThread.start();
+            executorService.submit(subscriber);
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message receiver thread started");
             }
 
-            // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
+            // Start instance notifier event message delegate thread
+            executorService.submit(messageDelegator);
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message delegator thread started");
             }
@@ -81,4 +82,12 @@ public class InstanceStatusEventReceiver {
         messageDelegator.terminate();
         terminated = true;
     }
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }