You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:31 UTC

[14/19] stratos git commit: shutting down thread pools from event receivers

shutting down thread pools from event receivers


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/570d74ea
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/570d74ea
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/570d74ea

Branch: refs/heads/stratos-4.1.x
Commit: 570d74ea5543908c2362cc9b3c6224276921de48
Parents: 4c6442a
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Fri Dec 18 07:23:07 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Fri Dec 18 07:23:07 2015 +0530

----------------------------------------------------------------------
 .../common/threading/StratosThreadPool.java     | 30 +++++++++++++++++---
 .../message/receiver/StratosEventReceiver.java  |  1 +
 .../application/ApplicationsEventReceiver.java  |  4 ++-
 .../signup/ApplicationSignUpEventReceiver.java  |  4 ++-
 .../status/ClusterStatusEventReceiver.java      |  4 ++-
 .../mapping/DomainMappingEventReceiver.java     |  4 ++-
 .../health/stat/HealthStatEventReceiver.java    |  4 ++-
 .../initializer/InitializerEventReceiver.java   |  4 ++-
 .../notifier/InstanceNotifierEventReceiver.java |  7 ++---
 .../status/InstanceStatusEventReceiver.java     |  5 ++--
 .../receiver/tenant/TenantEventReceiver.java    |  4 ++-
 .../topology/TopologyEventReceiver.java         |  4 ++-
 12 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 687cec2..b28fb40 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
 
 /**
  * Utility class for Stratos thread pool
@@ -84,4 +81,29 @@ public class StratosThreadPool {
         }
         return scheduledExecutorService;
     }
+
+    public static void shutdown (String identifier) {
+
+        ExecutorService executorService = executorServiceMap.get(identifier);
+        if (executorService == null) {
+            log.warn("No executor service found for id " + identifier + ", unable to shut down");
+            return;
+        }
+
+        // try to shut down gracefully
+        executorService.shutdown();
+        // wait 10 secs till terminated
+        try {
+            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " +
+                        "timeout, forcefully shutting down");
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            // interrupted, shutdown now
+            executorService.shutdownNow();
+        }
+
+        log.info("Successfully shutdown thread pool associated with id: " + identifier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..5040371 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
 public class StratosEventReceiver {
 
     protected ExecutorService executorService;
+    protected String threadPoolId;
 
     public StratosEventReceiver () {
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 69dba01..697d9dd 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -39,7 +39,8 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
 
     private ApplicationsEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100);
+        this.threadPoolId = "application-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue();
         this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue);
         this.messageListener = new ApplicationsEventMessageListener(messageQueue);
@@ -94,6 +95,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 
     public void initializeCompleteApplicationsModel() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index df90cf9..89cf7ea 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -44,7 +44,8 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
 
     private ApplicationSignUpEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
+        this.threadPoolId = "application-signup-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue();
         this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue);
         this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue);
@@ -116,5 +117,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index be42b43..bdbd509 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -40,7 +40,8 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
 
     private ClusterStatusEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
+        this.threadPoolId = "clusterstatus-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue();
         this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue);
         this.messageListener = new ClusterStatusEventMessageListener(messageQueue);
@@ -90,6 +91,7 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 
     public boolean isSubscribed() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6de99c0..7ce3982 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -41,7 +41,8 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
 
     private DomainMappingEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
+        this.threadPoolId = "domainmapping-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue();
         this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue);
         this.messageListener = new DomainMappingEventMessageListener(messageQueue);
@@ -67,6 +68,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 
     private void execute() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index a9d2602..1699592 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -40,7 +40,8 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
 
     private HealthStatEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
+        this.threadPoolId = "healthstat-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 100);
         HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue();
         this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue);
         this.messageListener = new HealthStatEventMessageListener(messageQueue);
@@ -86,5 +87,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 805a8bf..6c948a0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -39,7 +39,8 @@ public class InitializerEventReceiver extends StratosEventReceiver {
 
     private InitializerEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
+        this.threadPoolId = "initializer-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue();
         this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue);
         this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue);
@@ -84,5 +85,6 @@ public class InitializerEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index e0b8e9f..e8f84c2 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -36,11 +36,11 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
     private EventSubscriber eventSubscriber;
     private InstanceNotifierEventMessageListener messageListener;
     private static volatile InstanceNotifierEventReceiver instance;
-    //private boolean terminated;
 
     private InstanceNotifierEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+        this.threadPoolId = "instance-notifier-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue();
         this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue);
         this.messageListener = new InstanceNotifierEventMessageListener(messageQueue);
@@ -94,7 +94,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver {
     public synchronized void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        //terminated = true;
-        log.info("InstanceNotifierEventReceiver terminated");
+        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a565ea9..233de18 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -39,7 +39,8 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
 
     private InstanceStatusEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+        this.threadPoolId = "instance-status_event-reciever";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue();
         this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue);
         this.messageListener = new InstanceStatusEventMessageListener(messageQueue);
@@ -91,6 +92,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-       // terminated = true;
+        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index a52cb20..5726d44 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -43,7 +43,8 @@ public class TenantEventReceiver extends StratosEventReceiver {
 
     private TenantEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
+        this.threadPoolId = "tenant-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
         this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
         this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -112,5 +113,6 @@ public class TenantEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index bfa3950..776b2b3 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -45,7 +45,8 @@ public class TopologyEventReceiver extends StratosEventReceiver {
 
     private TopologyEventReceiver() {
         // TODO: make pool size configurable
-        this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100);
+        this.threadPoolId = "topology-event-receiver";
+        this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20);
         TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue();
         this.messageDelegator = new TopologyEventMessageDelegator(messageQueue);
         this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -95,6 +96,7 @@ public class TopologyEventReceiver extends StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
+        StratosThreadPool.shutdown(threadPoolId);
     }
 
     public void initializeCompleteTopology() {