You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:44 UTC
[7/9] stratos git commit: Adding executor service for threads and
remove unnecessary threads
Adding executor service for threads and remove unnecessary threads
Conflicts:
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4e733930
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4e733930
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4e733930
Branch: refs/heads/master
Commit: 4e733930ba33a87c78cd398a2573f6b545a8d9f0
Parents: ab1ed3c
Author: gayan <ga...@puppet.gayan.org>
Authored: Mon Dec 1 19:05:20 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:43:07 2014 +0530
----------------------------------------------------------------------
.../AutoscalerHealthStatEventReceiver.java | 19 +++--
.../internal/AutoscalerServerComponent.java | 7 +-
.../stratos/cartridge/agent/CartridgeAgent.java | 4 +-
.../CloudControllerServiceComponent.java | 28 ++++++++
.../application/ApplicationTopicReceiver.java | 12 ++++
.../status/ClusterStatusTopicReceiver.java | 12 ++++
.../status/InstanceStatusTopicReceiver.java | 10 +++
components/org.apache.stratos.common/pom.xml | 5 ++
.../apache/stratos/common/util/ConfUtil.java | 73 ++++++++++++++++++++
.../internal/ADCManagementServerComponent.java | 4 +-
.../applications/ApplicationsEventReceiver.java | 26 ++++---
.../status/ClusterStatusEventReceiver.java | 28 ++++----
.../health/stat/HealthStatEventReceiver.java | 11 +++
.../status/InstanceStatusEventReceiver.java | 19 +++--
14 files changed, 216 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 0e45ee1..9e440c7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -63,24 +63,25 @@ import org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfReque
import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import java.util.concurrent.ExecutorService;
/**
* A thread for processing topology messages and updating the topology data structure.
*/
-public class AutoscalerHealthStatEventReceiver implements Runnable {
+public class AutoscalerHealthStatEventReceiver {
private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
private boolean terminated = false;
private HealthStatEventReceiver healthStatEventReceiver;
+ private ExecutorService executorService;
public AutoscalerHealthStatEventReceiver() {
this.healthStatEventReceiver = new HealthStatEventReceiver();
addEventListeners();
}
- @Override
- public void run() {
+ public void execute() {
//FIXME this activated before autoscaler deployer activated.
try {
Thread.sleep(15000);
@@ -92,11 +93,7 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
log.info("Autoscaler health stat event receiver thread started");
}
- // Keep the thread live until terminated
- if(log.isInfoEnabled()) {
- log.info("Autoscaler health stat event receiver thread terminated");
- }
}
private void addEventListeners() {
@@ -519,4 +516,12 @@ public class AutoscalerHealthStatEventReceiver implements Runnable {
public void terminate() {
this.terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 2e443de..bb5e167 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -66,13 +66,14 @@ public class AutoscalerServerComponent {
private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
- private static final String COMPONENTS_CONFIG = "components-config";
+ private static final String COMPONENTS_CONFIG = "stratos-config";
private static final int THREAD_POOL_SIZE = 10;
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
private AutoscalerTopologyEventReceiver asTopologyReceiver;
private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+
protected void activate(ComponentContext componentContext) throws Exception {
<<<<<<< HEAD
try {
@@ -214,8 +215,8 @@ public class AutoscalerServerComponent {
// Start health stat receiver
autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
- Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
- healthDelegatorThread.start();
+ autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+ autoscalerHealthStatEventReceiver.execute();
if (log.isDebugEnabled()) {
log.debug("Health statistics receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index e275db5..53fd658 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -415,8 +415,8 @@ public class CartridgeAgent implements Runnable {
}
});
-// Thread thread = new Thread(topologyEventReceiver);
-// thread.start();
+ topologyEventReceiver.execute();
+
if (log.isDebugEnabled()) {
log.info("Cartridge Agent topology receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 6773b4a..a413218 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -23,6 +23,7 @@ package org.apache.stratos.cloud.controller.internal;
<<<<<<< HEAD
<<<<<<< HEAD
+<<<<<<< HEAD
import com.hazelcast.core.HazelcastInstance;
=======
@@ -30,17 +31,30 @@ import com.hazelcast.core.HazelcastInstance;
=======
>>>>>>> ad3e45c... Remove unnessary threads in messaging model
+=======
+import org.apache.commons.configuration.XMLConfiguration;
+>>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+<<<<<<< HEAD
import org.apache.stratos.cloud.controller.services.CloudControllerService;
import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
import org.apache.stratos.common.clustering.DistributedObjectProvider;
+=======
+import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
+import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
+import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
+import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
+import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.common.util.ConfUtil;
+>>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.util.Util;
import org.osgi.framework.BundleContext;
@@ -52,6 +66,8 @@ import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.session.UserRegistry;
import org.wso2.carbon.utils.ConfigurationContextService;
+import java.util.concurrent.ExecutorService;
+
/**
* Registering Cloud Controller Service.
*
@@ -90,10 +106,21 @@ public class CloudControllerServiceComponent {
private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
private ApplicationTopicReceiver applicationTopicReceiver;
+ private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
+ private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
+ private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
+ private static final String COMPONENTS_CONFIG = "stratos-config";
+ private static final int THREAD_POOL_SIZE = 10;
protected void activate(ComponentContext context) {
try {
+
+ XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+ int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+ String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+ ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
applicationTopicReceiver = new ApplicationTopicReceiver();
+ applicationTopicReceiver.setExecutorService(executorService);
applicationTopicReceiver.execute();
if (log.isInfoEnabled()) {
@@ -106,6 +133,7 @@ public class CloudControllerServiceComponent {
}
=======
clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
+ clusterStatusTopicReceiver.setExecutorService(executorService);
clusterStatusTopicReceiver.execute();
>>>>>>> ddf277b... Remove unnessary threads in messaging model
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index d65b7f5..9df4f3a 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -26,6 +26,8 @@ import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEven
import org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener;
import org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver;
+import java.util.concurrent.ExecutorService;
+
/**
* This is to receive the application topic messages.
*/
@@ -33,6 +35,7 @@ public class ApplicationTopicReceiver {
private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class);
private ApplicationsEventReceiver applicationsEventReceiver;
private boolean terminated;
+ private ExecutorService executorService;
public ApplicationTopicReceiver() {
this.applicationsEventReceiver = new ApplicationsEventReceiver();
@@ -46,6 +49,7 @@ public class ApplicationTopicReceiver {
log.info("Cloud controller application status thread started");
}
applicationsEventReceiver.execute();
+ applicationsEventReceiver.setExecutorService(executorService);
if (log.isInfoEnabled()) {
log.info("Cloud controller application status thread terminated");
@@ -69,4 +73,12 @@ public class ApplicationTopicReceiver {
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index ca6d4ad..d54063c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,14 +26,18 @@ import org.apache.stratos.messaging.event.cluster.status.*;
import org.apache.stratos.messaging.listener.cluster.status.*;
import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
+import java.util.concurrent.ExecutorService;
+
public class ClusterStatusTopicReceiver {
private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
private ClusterStatusEventReceiver statusEventReceiver;
private boolean terminated;
+ private ExecutorService executorService;
public ClusterStatusTopicReceiver() {
this.statusEventReceiver = new ClusterStatusEventReceiver();
+ this.statusEventReceiver.setExecutorService(executorService);
addEventListeners();
}
@@ -101,4 +105,12 @@ public class ClusterStatusTopicReceiver {
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index 42aabed..dc21735 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -32,6 +32,8 @@ import org.apache.stratos.messaging.listener.instance.status.InstanceReadyToShut
import org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
import org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
+import java.util.concurrent.ExecutorService;
+
/**
* This will handle the instance status events
*/
@@ -40,6 +42,7 @@ public class InstanceStatusTopicReceiver {
private InstanceStatusEventReceiver statusEventReceiver;
private boolean terminated;
+ private ExecutorService executorService;
public InstanceStatusTopicReceiver() {
this.statusEventReceiver = new InstanceStatusEventReceiver();
@@ -98,4 +101,11 @@ public class InstanceStatusTopicReceiver {
}
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 6c33f0d..6ed9228 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -97,6 +97,11 @@
<version>3.1</version>
</dependency>
<dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.9</version>
+ </dependency>
+ <dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
<version>4.2.0</version>
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
new file mode 100644
index 0000000..7f9d665
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.stratos.common.util;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.utils.CarbonUtils;
+
+import java.io.File;
+
+/**
+ * This class contains utility methods for read Autoscaler configuration file.
+ */
+public class ConfUtil {
+
+ private static final String CONFIG_FILE_NAME ="stratos-config" ;
+ private static Log log = LogFactory.getLog(ConfUtil.class);
+
+ private XMLConfiguration config;
+
+ private static ConfUtil instance = null;
+
+ private ConfUtil(String configFilePath) {
+ log.info("Loading configuration.....");
+ try {
+
+ File confFile;
+ if (configFilePath != null && !configFilePath.isEmpty()) {
+ confFile = new File(configFilePath);
+
+ } else {
+ confFile = new File(CarbonUtils.getCarbonConfigDirPath(),CONFIG_FILE_NAME);
+ }
+
+ config = new XMLConfiguration(confFile);
+ } catch (ConfigurationException e) {
+ log.error("Unable to load autoscaler configuration file",e);
+ config = new XMLConfiguration(); // continue with default values
+ }
+ }
+
+ public static ConfUtil getInstance(String configFilePath) {
+ if (instance == null) {
+ instance = new ConfUtil (configFilePath);
+ }
+ return instance;
+ }
+
+ public XMLConfiguration getConfiguration(){
+ return config;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index efce585..e4ffccc 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -68,7 +68,7 @@ import java.util.concurrent.ExecutorService;
public class ADCManagementServerComponent {
private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class);
- private static final String STRATOS_MANAGER = "Stratos_manager";
+ private static final String IDENTIFIER = "Stratos_manager";
private static final int THREAD_POOL_SIZE = 20;
private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver;
private ExecutorService executorService;
@@ -76,7 +76,7 @@ public class ADCManagementServerComponent {
protected void activate(ComponentContext componentContext) throws Exception {
try {
CartridgeConfigFileReader.readProperties();
- executorService=StratosThreadPool.getExecutorService(STRATOS_MANAGER, THREAD_POOL_SIZE);
+ executorService=StratosThreadPool.getExecutorService(IDENTIFIER, THREAD_POOL_SIZE);
// Schedule complete tenant event synchronizer
if(log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
index cc86c29..82d8c83 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
@@ -24,6 +24,8 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.Util;
+import java.util.concurrent.ExecutorService;
+
public class ApplicationsEventReceiver {
private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class);
@@ -31,6 +33,7 @@ public class ApplicationsEventReceiver {
private ApplicationsEventMessageListener messageListener;
private Subscriber subscriber;
private boolean terminated;
+ private ExecutorService executorService;
public ApplicationsEventReceiver() {
ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
@@ -47,27 +50,20 @@ public class ApplicationsEventReceiver {
try {
// Start topic subscriber thread
subscriber = new Subscriber(Util.Topics.APPLICATIONS_TOPIC.getTopicName(), messageListener);
+ executorService.execute(subscriber);
- Thread subscriberThread = new Thread(subscriber);
- subscriberThread.start();
if (log.isDebugEnabled()) {
log.debug("Application status event message receiver thread started");
}
// Start Application status event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
+ executorService.execute(messageDelegator);
+
if (log.isDebugEnabled()) {
log.debug("Application status event message delegator thread started");
}
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Application status failed", e);
@@ -80,4 +76,12 @@ public class ApplicationsEventReceiver {
messageDelegator.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 38184aa..a6de430 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -25,6 +25,9 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.Util;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
/**
* A thread for receiving instance notifier information from message broker.
*/
@@ -34,6 +37,7 @@ public class ClusterStatusEventReceiver{
private final ClusterStatusEventMessageListener messageListener;
private Subscriber subscriber;
private boolean terminated;
+ private ExecutorService executorService;
public ClusterStatusEventReceiver() {
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
@@ -50,27 +54,19 @@ public class ClusterStatusEventReceiver{
try {
// Start topic subscriber thread
subscriber = new Subscriber(Util.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
-// subscriber.setMessageListener(messageListener);
- Thread subscriberThread = new Thread(subscriber);
- subscriberThread.start();
+ executorService.execute(subscriber);
+
if (log.isDebugEnabled()) {
log.debug("InstanceNotifier event message receiver thread started");
}
// Start instance notifier event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
+ executorService.execute(messageDelegator);
if (log.isDebugEnabled()) {
log.debug("InstanceNotifier event message delegator thread started");
}
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("InstanceNotifier receiver failed", e);
@@ -87,4 +83,12 @@ public class ClusterStatusEventReceiver{
messageDelegator.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index 14c7346..d324c7e 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -25,6 +25,8 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.Util;
+import java.util.concurrent.ExecutorService;
+
/**
* A thread for receiving health stat information from message broker
*/
@@ -35,6 +37,7 @@ public class HealthStatEventReceiver {
private final HealthStatEventMessageListener messageListener;
private Subscriber subscriber;
private boolean terminated;
+ private ExecutorService executorService;
public HealthStatEventReceiver() {
HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
@@ -84,4 +87,12 @@ public class HealthStatEventReceiver {
messageDelegator.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a8f1d96..af9319f 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -25,6 +25,8 @@ import org.apache.stratos.messaging.broker.subscribe.Subscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.Util;
+import java.util.concurrent.ExecutorService;
+
/**
* A thread for receiving instance notifier information from message broker.
*/
@@ -34,6 +36,7 @@ public class InstanceStatusEventReceiver {
private final InstanceStatusEventMessageListener messageListener;
private Subscriber subscriber;
private boolean terminated;
+ private ExecutorService executorService;
public InstanceStatusEventReceiver() {
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
@@ -51,15 +54,13 @@ public class InstanceStatusEventReceiver {
// Start topic subscriber thread
subscriber = new Subscriber(Util.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
// subscriber.setMessageListener(messageListener);
- Thread subscriberThread = new Thread(subscriber);
- subscriberThread.start();
+ executorService.submit(subscriber);
if (log.isDebugEnabled()) {
log.debug("InstanceNotifier event message receiver thread started");
}
- // Start instance notifier event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
+ // Start instance notifier event message delegate thread
+ executorService.submit(messageDelegator);
if (log.isDebugEnabled()) {
log.debug("InstanceNotifier event message delegator thread started");
}
@@ -81,4 +82,12 @@ public class InstanceStatusEventReceiver {
messageDelegator.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}