You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:43 UTC
[6/9] stratos git commit: Remove unnessary threads in messaging model
Remove unnessary threads in messaging model
Conflicts:
components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ab1ed3c2
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ab1ed3c2
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ab1ed3c2
Branch: refs/heads/master
Commit: ab1ed3c242db69dbfef6f23a75311414393b79cb
Parents: 959c435
Author: gayan <ga...@puppet.gayan.org>
Authored: Mon Dec 1 17:40:11 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:41:34 2014 +0530
----------------------------------------------------------------------
.../stratos/cartridge/agent/CartridgeAgent.java | 3 +-
.../internal/LoadBalancerServiceComponent.java | 57 ++++++++++++++++++++
.../LoadBalancerTenantEventReceiver.java | 16 ++----
.../LoadBalancerTopologyEventReceiver.java | 20 ++-----
.../internal/ADCManagementServerComponent.java | 5 +-
.../receiver/tenant/TenantEventReceiver.java | 14 ++---
6 files changed, 74 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/ab1ed3c2/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 379df28..e275db5 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -538,8 +538,7 @@ public class CartridgeAgent implements Runnable {
}
});
- Thread tenantEventReceiverThread = new Thread(tenantEventReceiver);
- tenantEventReceiverThread.start();
+ tenantEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Tenant event message receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/ab1ed3c2/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index aa5e2ef..3aa77a8 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -60,6 +60,7 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.io.File;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true"
@@ -88,6 +89,9 @@ public class LoadBalancerServiceComponent {
private LoadBalancerTopologyEventReceiver topologyReceiver;
private LoadBalancerTenantEventReceiver tenantReceiver;
private LoadBalancerStatisticsNotifier statisticsNotifier;
+ private static final String STRATOS_MANAGER = "Stratos_manager";
+ private static final int THREAD_POOL_SIZE = 20;
+ private ExecutorService executorService;
protected void activate(ComponentContext ctxt) {
try {
@@ -116,13 +120,64 @@ public class LoadBalancerServiceComponent {
TopologyFilterConfigurator.configure(configuration);
if (configuration.isMultiTenancyEnabled()) {
+<<<<<<< HEAD
// Start tenant event receiver
startTenantEventReceiver();
+=======
+
+ tenantReceiver = new LoadBalancerTenantEventReceiver();
+ tenantReceiver.execute();
+
+ if (log.isInfoEnabled()) {
+ log.info("Tenant receiver thread started");
+ }
+>>>>>>> ae876c1... Remove unnessary threads in messaging model
}
if (configuration.isTopologyEventListenerEnabled()) {
// Start topology receiver
+<<<<<<< HEAD
startTopologyEventReceiver();
+=======
+ topologyReceiver = new LoadBalancerTopologyEventReceiver();
+ topologyReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Topology receiver thread started");
+ }
+
+ if (log.isInfoEnabled()) {
+ if (TopologyServiceFilter.getInstance().isActive()) {
+ StringBuilder sb = new StringBuilder();
+ for (String serviceName : TopologyServiceFilter.getInstance().getIncludedServiceNames()) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(serviceName);
+ }
+ log.info(String.format("Service filter activated: [services] %s", sb.toString()));
+ }
+ if (TopologyClusterFilter.getInstance().isActive()) {
+ StringBuilder sb = new StringBuilder();
+ for (String clusterId : TopologyClusterFilter.getInstance().getIncludedClusterIds()) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(clusterId);
+ }
+ log.info(String.format("Cluster filter activated: [clusters] %s", sb.toString()));
+ }
+ if (TopologyMemberFilter.getInstance().isActive()) {
+ StringBuilder sb = new StringBuilder();
+ for (String clusterId : TopologyMemberFilter.getInstance().getIncludedLbClusterIds()) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(clusterId);
+ }
+ log.info(String.format("Member filter activated: [lb-cluster-ids] %s", sb.toString()));
+ }
+ }
+>>>>>>> ae876c1... Remove unnessary threads in messaging model
}
if(configuration.isCepStatsPublisherEnabled()) {
@@ -220,6 +275,8 @@ public class LoadBalancerServiceComponent {
topologyReceiver.terminate();
// Terminate statistics notifier
statisticsNotifier.terminate();
+
+
}
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/ab1ed3c2/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java
index 04e9d5f..e729dd5 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTenantEventReceiver.java
@@ -37,7 +37,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
* Load balancer tenant receiver updates load balancer context according to
* incoming tenant events.
*/
-public class LoadBalancerTenantEventReceiver implements Runnable {
+public class LoadBalancerTenantEventReceiver{
private static final Log log = LogFactory.getLog(LoadBalancerTenantEventReceiver.class);
@@ -192,18 +192,10 @@ public class LoadBalancerTenantEventReceiver implements Runnable {
}
}
- @Override
- public void run() {
- Thread tenantReceiverThread = new Thread(tenantEventReceiver);
- tenantReceiverThread.start();
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+ public void execute() {
+ tenantEventReceiver.execute();
+
if (log.isInfoEnabled()) {
log.info("Load balancer tenant receiver thread terminated");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/ab1ed3c2/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
index 83e0c18..657d18e 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
@@ -39,7 +39,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
* Load balancer topology receiver updates load balancer context according to
* incoming topology events.
*/
-public class LoadBalancerTopologyEventReceiver implements Runnable {
+public class LoadBalancerTopologyEventReceiver {
private static final Log log = LogFactory.getLog(LoadBalancerTopologyEventReceiver.class);
@@ -51,24 +51,14 @@ public class LoadBalancerTopologyEventReceiver implements Runnable {
addEventListeners();
}
- @Override
- public void run() {
- // Thread thread = new Thread(topologyEventReceiver);
- // thread.start();
+ public void execute() {
+
+ topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Load balancer topology receiver thread started");
}
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Load balancer topology receiver thread terminated");
- }
+
}
private void addEventListeners() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/ab1ed3c2/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index 797f1a1..efce585 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -68,8 +68,8 @@ import java.util.concurrent.ExecutorService;
public class ADCManagementServerComponent {
private static final Log log = LogFactory.getLog(ADCManagementServerComponent.class);
- public static final String STRATOS_MANAGER = "Stratos_manager";
- public static final int THREAD_POOL_SIZE = 20;
+ private static final String STRATOS_MANAGER = "Stratos_manager";
+ private static final int THREAD_POOL_SIZE = 20;
private StratosManagerTopologyEventReceiver stratosManagerTopologyEventReceiver;
private ExecutorService executorService;
@@ -211,6 +211,7 @@ public class ADCManagementServerComponent {
EventPublisherPool.close(Util.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(Util.Topics.TENANT_TOPIC.getTopicName());
+ executorService.shutdownNow();
//terminate Stratos Manager Topology Receiver
stratosManagerTopologyEventReceiver.terminate();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/ab1ed3c2/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index d8fc663..4eabc96 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -29,7 +29,7 @@ import org.apache.stratos.messaging.util.Util;
* A thread for receiving tenant information from message broker and
* build tenant information in tenant manager.
*/
-public class TenantEventReceiver implements Runnable {
+public class TenantEventReceiver{
private static final Log log = LogFactory.getLog(TenantEventReceiver.class);
private TenantEventMessageDelegator messageDelegator;
private TenantEventMessageListener messageListener;
@@ -46,8 +46,8 @@ public class TenantEventReceiver implements Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new Subscriber(Util.Topics.TENANT_TOPIC.getTopicName(), messageListener);
@@ -65,13 +65,7 @@ public class TenantEventReceiver implements Runnable {
log.debug("Tenant event message delegator thread started");
}
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Tenant receiver failed", e);