You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:31 UTC
[14/19] stratos git commit: shutting down thread pools from event
receivers
shutting down thread pools from event receivers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/570d74ea
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/570d74ea
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/570d74ea
Branch: refs/heads/stratos-4.1.x
Commit: 570d74ea5543908c2362cc9b3c6224276921de48
Parents: 4c6442a
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Fri Dec 18 07:23:07 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Fri Dec 18 07:23:07 2015 +0530
----------------------------------------------------------------------
.../common/threading/StratosThreadPool.java | 30 +++++++++++++++++---
.../message/receiver/StratosEventReceiver.java | 1 +
.../application/ApplicationsEventReceiver.java | 4 ++-
.../signup/ApplicationSignUpEventReceiver.java | 4 ++-
.../status/ClusterStatusEventReceiver.java | 4 ++-
.../mapping/DomainMappingEventReceiver.java | 4 ++-
.../health/stat/HealthStatEventReceiver.java | 4 ++-
.../initializer/InitializerEventReceiver.java | 4 ++-
.../notifier/InstanceNotifierEventReceiver.java | 7 ++---
.../status/InstanceStatusEventReceiver.java | 5 ++--
.../receiver/tenant/TenantEventReceiver.java | 4 ++-
.../topology/TopologyEventReceiver.java | 4 ++-
12 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 687cec2..b28fb40 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
/**
* Utility class for Stratos thread pool
@@ -84,4 +81,29 @@ public class StratosThreadPool {
}
return scheduledExecutorService;
}
+
+ public static void shutdown (String identifier) {
+
+ ExecutorService executorService = executorServiceMap.get(identifier);
+ if (executorService == null) {
+ log.warn("No executor service found for id " + identifier + ", unable to shut down");
+ return;
+ }
+
+ // try to shut down gracefully
+ executorService.shutdown();
+ // wait 10 secs till terminated
+ try {
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " +
+ "timeout, forcefully shutting down");
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // interrupted, shutdown now
+ executorService.shutdownNow();
+ }
+
+ log.info("Successfully shutdown thread pool associated with id: " + identifier);
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..5040371 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
public class StratosEventReceiver {
protected ExecutorService executorService;
+ protected String threadPoolId;
public StratosEventReceiver () {
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 69dba01..697d9dd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -39,7 +39,8 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
private ApplicationsEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100);
+ this.threadPoolId = "application-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationsEventMessageListener(messageQueue);
@@ -94,6 +95,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
public void initializeCompleteApplicationsModel() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index df90cf9..89cf7ea 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -44,7 +44,8 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
private ApplicationSignUpEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
+ this.threadPoolId = "application-signup-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
@@ -116,5 +117,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index be42b43..bdbd509 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -40,7 +40,8 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
private ClusterStatusEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
+ this.threadPoolId = "clusterstatus-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
@@ -90,6 +91,7 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
public boolean isSubscribed() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6de99c0..7ce3982 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -41,7 +41,8 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
private DomainMappingEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
+ this.threadPoolId = "domainmapping-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new DomainMappingEventMessageListener(messageQueue);
@@ -67,6 +68,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
private void execute() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index a9d2602..1699592 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -40,7 +40,8 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
private HealthStatEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
+ this.threadPoolId = "healthstat-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 100);
HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
this.messageListener = new HealthStatEventMessageListener(messageQueue);
@@ -86,5 +87,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 805a8bf..6c948a0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -39,7 +39,8 @@ public class InitializerEventReceiver extends StratosEventReceiver {
private InitializerEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
+ this.threadPoolId = "initializer-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
@@ -84,5 +85,6 @@ public class InitializerEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index e0b8e9f..e8f84c2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -36,11 +36,11 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
private EventSubscriber eventSubscriber;
private InstanceNotifierEventMessageListener messageListener;
private static volatile InstanceNotifierEventReceiver instance;
- //private boolean terminated;
private InstanceNotifierEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+ this.threadPoolId = "instance-notifier-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
@@ -94,7 +94,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
public synchronized void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- //terminated = true;
- log.info("InstanceNotifierEventReceiver terminated");
+ StratosThreadPool.shutdown(threadPoolId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a565ea9..233de18 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -39,7 +39,8 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
private InstanceStatusEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+ this.threadPoolId = "instance-status_event-reciever";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
@@ -91,6 +92,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- // terminated = true;
+ StratosThreadPool.shutdown(threadPoolId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index a52cb20..5726d44 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -43,7 +43,8 @@ public class TenantEventReceiver extends StratosEventReceiver {
private TenantEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
+ this.threadPoolId = "tenant-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -112,5 +113,6 @@ public class TenantEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index bfa3950..776b2b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -45,7 +45,8 @@ public class TopologyEventReceiver extends StratosEventReceiver {
private TopologyEventReceiver() {
// TODO: make pool size configurable
- this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+ this.threadPoolId = "topology-event-receiver";
+ this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -95,6 +96,7 @@ public class TopologyEventReceiver extends StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
+ StratosThreadPool.shutdown(threadPoolId);
}
public void initializeCompleteTopology() {