You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2016/01/05 16:47:25 UTC
[16/50] [abbrv] stratos git commit: using ScheduledThreadPoolExecutor
in StratosThreadPool
using ScheduledThreadPoolExecutor in StratosThreadPool
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4b15a1b8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4b15a1b8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4b15a1b8
Branch: refs/heads/stratos-4.1.x
Commit: 4b15a1b8fbab3df07d9a4c542ab5286add84380c
Parents: 90008dd
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Mon Dec 7 10:45:30 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Mon Dec 7 18:49:56 2015 +0530
----------------------------------------------------------------------
.../internal/AutoscalerServiceComponent.java | 57 ++++++------
.../monitor/cluster/ClusterMonitor.java | 2 +-
.../component/ParentComponentMonitor.java | 2 +-
.../context/CloudControllerContext.java | 6 +-
.../CloudControllerServiceComponent.java | 21 ++---
.../common/concurrent/locks/ReadWriteLock.java | 5 +-
.../threading/GracefulThreadPoolTerminator.java | 58 ++++++++++++
.../common/threading/StratosThreadPool.java | 96 +++++---------------
.../internal/LoadBalancerServiceComponent.java | 2 -
.../LoadBalancerStatisticsExecutor.java | 1 -
.../StratosManagerServiceComponent.java | 69 +++++++-------
.../mock/iaas/services/impl/MockInstance.java | 8 +-
.../MockHealthStatisticsGenerator.java | 9 +-
13 files changed, 165 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
index 8219f71..9b1a290 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java
@@ -57,10 +57,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServiceComponent" immediate="true"
@@ -87,7 +84,7 @@ public class AutoscalerServiceComponent {
private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
private AutoscalerInitializerTopicReceiver autoscalerInitializerTopicReceiver;
private ThreadPoolExecutor executor;
- private ScheduledExecutorService scheduler;
+ private ScheduledThreadPoolExecutor scheduler;
protected void activate(ComponentContext componentContext) throws Exception {
if (log.isDebugEnabled()) {
@@ -263,39 +260,39 @@ public class AutoscalerServiceComponent {
}
// Shutdown executor service
- shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID);
+ //shutdownExecutorService(AutoscalerConstants.AUTOSCALER_THREAD_POOL_ID);
// Shutdown scheduler
- shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID);
+ //shutdownScheduledExecutorService(AutoscalerConstants.AUTOSCALER_SCHEDULER_ID);
// Shutdown application monitor executor service
- shutdownExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID);
+ //shutdownExecutorService(AutoscalerConstants.MONITOR_THREAD_POOL_ID);
// Shutdown cluster monitor scheduler executor service
- shutdownScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID);
+ //shutdownScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID);
}
- private void shutdownExecutorService(String executorServiceId) {
- ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
- if (executor != null) {
- shutdownExecutorService(executor);
- }
- }
-
- private void shutdownScheduledExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutorService(executorService);
- }
- }
-
- private void shutdownExecutorService(ExecutorService executorService) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down executor service", e);
- }
- }
+// private void shutdownExecutorService(String executorServiceId) {
+// ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
+// if (executor != null) {
+// shutdownExecutorService(executor);
+// }
+// }
+//
+// private void shutdownScheduledExecutorService(String executorServiceId) {
+// ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
+// if (scheduledExecutor != null) {
+// shutdownExecutorService(scheduledExecutor);
+// }
+// }
+//
+// private void shutdownExecutorService(ExecutorService executorService) {
+// try {
+// executorService.shutdownNow();
+// } catch (Exception e) {
+// log.warn("An error occurred while shutting down executor service", e);
+// }
+// }
protected void setRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index 32bf037..ead9130 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -86,7 +86,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ClusterMonitor extends Monitor {
private static final Log log = LogFactory.getLog(ClusterMonitor.class);
- private final ScheduledExecutorService scheduler;
+ private final ScheduledThreadPoolExecutor scheduler;
private final ThreadPoolExecutor executor;
protected boolean hasFaultyMember = false;
protected ClusterContext clusterContext;
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index 1366a3f..9efdf7c 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -68,7 +68,7 @@ public abstract class ParentComponentMonitor extends Monitor {
private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
//Scheduler executor service to execute this monitor in a thread
- private final ScheduledExecutorService scheduler = StratosThreadPool.getScheduledExecutorService(
+ private final ScheduledThreadPoolExecutor scheduler = StratosThreadPool.getScheduledExecutorService(
"autoscaler.monitor.scheduler.thread.pool", 100);
//The monitors dependency tree with all the start-able/kill-able dependencies
protected DependencyTree startupDependencyTree;
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 0771d5a..9858410 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -42,9 +42,7 @@ import org.wso2.carbon.registry.core.exceptions.RegistryException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
/**
@@ -136,8 +134,8 @@ public class CloudControllerContext implements Serializable {
/**
* Thread pool used in this task to execute parallel tasks.
*/
- private transient ThreadPoolExecutor executor = StratosThreadPool
- .getExecutorService("cloud.controller.context.thread.pool", 5, 10);
+// private transient ThreadPoolExecutor executor = StratosThreadPool
+// .getExecutorService("cloud.controller.context.thread.pool", 5, 10);
/**
* Map of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 710e400..0d02d46 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -47,10 +47,7 @@ import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.session.UserRegistry;
import org.wso2.carbon.utils.ConfigurationContextService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* Registering Cloud Controller Service.
@@ -84,7 +81,7 @@ public class CloudControllerServiceComponent {
private ApplicationEventReceiver applicationEventReceiver;
private InitializerTopicReceiver initializerTopicReceiver;
private ThreadPoolExecutor executor;
- private ScheduledExecutorService scheduler;
+ private ScheduledThreadPoolExecutor scheduler;
protected void activate(final ComponentContext context) {
if (log.isDebugEnabled()) {
@@ -264,18 +261,18 @@ public class CloudControllerServiceComponent {
}
// Shutdown executor service
- shutdownExecutorService(THREAD_POOL_ID);
+ //shutdownExecutorService(THREAD_POOL_ID);
// Shutdown scheduler
shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
}
- private void shutdownExecutorService(String executorServiceId) {
- ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
- if (executor != null) {
- shutdownExecutorService(executor);
- }
- }
+// private void shutdownExecutorService(String executorServiceId) {
+// ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
+// if (executor != null) {
+// shutdownExecutorService(executor);
+// }
+// }
private void shutdownScheduledExecutorService(String executorServiceId) {
ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java
index 6d91330..90c2f22 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/concurrent/locks/ReadWriteLock.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -62,9 +63,9 @@ public class ReadWriteLock {
readWriteLockMonitorInterval = Integer.getInteger("read.write.lock.monitor.interval", 30000);
threadPoolSize = Integer.getInteger(READ_WRITE_LOCK_MONITOR_THREAD_POOL_SIZE_KEY, 10);
- ScheduledExecutorService scheduledExecutorService = StratosThreadPool.getScheduledExecutorService(
+ ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService(
READ_WRITE_LOCK_MONITOR_THREAD_POOL, threadPoolSize);
- scheduledExecutorService.scheduleAtFixedRate(new ReadWriteLockMonitor(this),
+ scheduledExecutor.scheduleAtFixedRate(new ReadWriteLockMonitor(this),
readWriteLockMonitorInterval, readWriteLockMonitorInterval, TimeUnit.MILLISECONDS);
if (log.isDebugEnabled()) {
log.debug(String.format("Lock monitor scheduled: [lock-name] %s [interval] %d seconds",
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
new file mode 100644
index 0000000..70cda66
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.threading;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class GracefulThreadPoolTerminator implements Callable<String> {
+
+ private static final Log log = LogFactory.getLog(GracefulThreadPoolTerminator.class);
+
+ private String threadPoolId;
+ private ThreadPoolExecutor executor;
+
+ public GracefulThreadPoolTerminator (String threadPoolId, ThreadPoolExecutor executor) {
+ this.threadPoolId = threadPoolId;
+ this.executor = executor;
+ }
+
+ @Override
+ public String call() {
+ // try to shut down gracefully
+ executor.shutdown();
+ // wait 10 secs till terminated
+ try {
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.info("Thread Pool [id] " + threadPoolId + " did not finish all tasks before " +
+ "timeout, forcefully shutting down");
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // interrupted, shutdown now
+ executor.shutdownNow();
+ }
+ return threadPoolId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 50b53bb..4eb8304 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -36,16 +36,16 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);
private static Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>();
- private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
- private static Object executorServiceMapLock = new Object();
- private static Object scheduledServiceMapLock = new Object();
+ private static Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>();
+ private static final Object executorServiceMapLock = new Object();
+ private static final Object scheduledServiceMapLock = new Object();
/**
- * Return the executor service based on the identifier and thread pool size
+ * Return the executor based on the identifier and thread pool size
*
* @param identifier Thread pool identifier name
* @param maxSize Thread pool size
- * @return ExecutorService
+ * @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int
maxSize) {
@@ -69,23 +69,23 @@ public class StratosThreadPool {
*
* @param identifier Thread pool identifier name
* @param threadPoolSize Thread pool size
- * @return
+ * @return ScheduledThreadPoolExecutor
*/
- public static ScheduledExecutorService getScheduledExecutorService(String identifier, int threadPoolSize) {
- ScheduledExecutorService scheduledExecutorService = scheduledServiceMap.get(identifier);
- if (scheduledExecutorService == null) {
+ public static ScheduledThreadPoolExecutor getScheduledExecutorService(String identifier, int threadPoolSize) {
+ ScheduledThreadPoolExecutor scheduledExecutor = scheduledExecutorMap.get(identifier);
+ if (scheduledExecutor == null) {
synchronized (scheduledServiceMapLock) {
- if (scheduledExecutorService == null) {
- scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize,
+ if (scheduledExecutor == null) {
+ scheduledExecutor = new ScheduledThreadPoolExecutor(threadPoolSize,
new StratosThreadFactory(identifier));
- scheduledServiceMap.put(identifier, scheduledExecutorService);
- log.info(String.format("Thread pool created: [type] Scheduled Executor Service [id] %s [size] %d",
+ scheduledExecutorMap.put(identifier, scheduledExecutor);
+ log.info(String.format("Thread pool created: [type] Scheduled Executor [id] %s [size] %d",
identifier, threadPoolSize));
}
}
}
- return scheduledExecutorService;
+ return scheduledExecutor;
}
public static void shutDownAllThreadPoolsGracefully () {
@@ -102,7 +102,8 @@ public class StratosThreadPool {
try {
threadPoolTerminator = Executors.newFixedThreadPool(threadPoolCount);
for (Map.Entry<String, ThreadPoolExecutor> entry : executorMap.entrySet()) {
- threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulThreadPoolTerminator(entry.getKey(),
+ threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new
+ GracefulThreadPoolTerminator(entry.getKey(),
entry.getValue())));
}
// use the Future to block until shutting down is done
@@ -130,7 +131,7 @@ public class StratosThreadPool {
public static void shutDownAllScheduledExecutorsGracefully () {
- int threadPoolCount = scheduledServiceMap.size();
+ int threadPoolCount = scheduledExecutorMap.size();
if (threadPoolCount == 0) {
log.info("No thread pools found to shut down");
return;
@@ -141,10 +142,9 @@ public class StratosThreadPool {
try {
threadPoolTerminator = Executors.newFixedThreadPool(threadPoolCount);
- for (Map.Entry<String, ScheduledExecutorService> entry : scheduledServiceMap.entrySet()) {
- threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulScheduledThreadPoolTerminator(entry.getKey(),
+ for (Map.Entry<String, ScheduledThreadPoolExecutor> entry : scheduledExecutorMap.entrySet())
+ threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulThreadPoolTerminator(entry.getKey(),
entry.getValue())));
- }
// use the Future to block until shutting down is done
for (Future<String> threadPoolTerminatorFuture : threadPoolTerminatorFutures) {
removeScheduledThreadPoolFromCache(threadPoolTerminatorFuture.get());
@@ -154,10 +154,12 @@ public class StratosThreadPool {
log.error("Error in shutting down thread pools", e);
} catch (ExecutionException e) {
log.error("Error in shutting down thread pools", e);
+ } catch (Exception e) {
+ log.error("Error in shutting down thread pools", e);
} finally {
// if there are any remaining thread pools, shut down immediately
- if (!scheduledServiceMap.isEmpty()) {
- for (Map.Entry<String, ScheduledExecutorService> entry : scheduledServiceMap.entrySet()) {
+ if (!scheduledExecutorMap.isEmpty()) {
+ for (Map.Entry<String, ScheduledThreadPoolExecutor> entry : scheduledExecutorMap.entrySet()) {
entry.getValue().shutdownNow();
removeScheduledThreadPoolFromCache(entry.getKey());
}
@@ -176,59 +178,9 @@ public class StratosThreadPool {
}
private static void removeScheduledThreadPoolFromCache(String terminatedPoolId) {
- if (scheduledServiceMap.remove(terminatedPoolId) != null) {
+ if (scheduledExecutorMap.remove(terminatedPoolId) != null) {
log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is successfully shut down" +
" and removed from the cache");
}
}
-
- private static class GracefulThreadPoolTerminator implements Callable {
-
- private String threadPoolId;
- private ThreadPoolExecutor executor;
-
- public GracefulThreadPoolTerminator (String threadPoolId, ThreadPoolExecutor executor) {
- this.threadPoolId = threadPoolId;
- this.executor = executor;
- }
-
- @Override
- public String call() throws Exception {
- log.info("Shutting down thread pool " + threadPoolId);
- // try to shut down gracefully
- executor.shutdown();
- // wait 10 secs till terminated
- if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
- log.info("Thread Pool [id] " + threadPoolId + " did not finish all tasks before " +
- "timeout, forcefully shutting down");
- executor.shutdownNow();
- }
- return threadPoolId;
- }
- }
-
- private static class GracefulScheduledThreadPoolTerminator implements Callable {
-
- private String threadPoolId;
- private ScheduledExecutorService scheduledExecutor;
-
- public GracefulScheduledThreadPoolTerminator (String threadPoolId, ScheduledExecutorService scheduledExecutor) {
- this.threadPoolId = threadPoolId;
- this.scheduledExecutor = scheduledExecutor;
- }
-
- @Override
- public String call() throws Exception {
- log.info("Shutting down scheduled thread pool " + threadPoolId);
- // try to shut down gracefully
- scheduledExecutor.shutdown();
- // wait 10 secs till terminated
- if (!scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
- log.info("Scheduled thread Pool [id] " + threadPoolId + " did not finish all tasks before " +
- "timeout, forcefully shutting down");
- scheduledExecutor.shutdownNow();
- }
- return threadPoolId;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index cb2297a..e7b45f8 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -63,8 +63,6 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
import java.io.File;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
/**
* @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true"
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
index e625ec7..486b98b 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsExecutor.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/**
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 0486d84..7de06ae 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -48,10 +48,7 @@ import org.wso2.carbon.user.core.UserStoreException;
import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.ConfigurationContextService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* @scr.component name="org.wso2.carbon.hosting.mgt.internal.StratosManagerServiceComponent"
@@ -94,7 +91,7 @@ public class StratosManagerServiceComponent {
private StratosManagerApplicationEventReceiver applicationEventReceiver;
private StratosManagerInitializerTopicReceiver initializerTopicReceiver;
private ThreadPoolExecutor executor;
- private ScheduledExecutorService scheduler;
+ private ScheduledThreadPoolExecutor scheduler;
protected void activate(final ComponentContext componentContext) throws Exception {
if (log.isDebugEnabled()) {
@@ -339,37 +336,37 @@ public class StratosManagerServiceComponent {
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
- shutdownExecutor(THREAD_POOL_ID);
- shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
+ //shutdownExecutor(THREAD_POOL_ID);
+ //shutdownScheduledExecutorService(SCHEDULER_THREAD_POOL_ID);
}
- private void shutdownExecutor(String executorServiceId) {
- ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
- if (executor != null) {
- shutdownExecutor(executor);
- }
- }
-
- private void shutdownScheduledExecutorService(String executorServiceId) {
- ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
- if (executorService != null) {
- shutdownExecutor(executorService);
- }
- }
-
- private void shutdownExecutor(ThreadPoolExecutor executor) {
- try {
- executor.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down executor service", e);
- }
- }
-
- private void shutdownExecutor(ExecutorService executorService) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down executor service", e);
- }
- }
+// private void shutdownExecutor(String executorServiceId) {
+// ThreadPoolExecutor executor = StratosThreadPool.getExecutorService(executorServiceId, 1, 1);
+// if (executor != null) {
+// shutdownExecutor(executor);
+// }
+// }
+
+// private void shutdownScheduledExecutorService(String executorServiceId) {
+// ScheduledThreadPoolExecutor scheduledExecutor = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1);
+// if (scheduledExecutor != null) {
+// shutdownExecutor(scheduledExecutor);
+// }
+// }
+
+// private void shutdownExecutor(ThreadPoolExecutor executor) {
+// try {
+// executor.shutdownNow();
+// } catch (Exception e) {
+// log.warn("An error occurred while shutting down executor service", e);
+// }
+// }
+
+// private void shutdownExecutor(ExecutorService executorService) {
+// try {
+// executorService.shutdownNow();
+// } catch (Exception e) {
+// log.warn("An error occurred while shutting down executor service", e);
+// }
+// }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index df7145e..27a664e 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -62,9 +62,9 @@ public class MockInstance implements Serializable {
private final MockInstanceContext mockInstanceContext;
private final AtomicBoolean hasGracefullyShutdown = new AtomicBoolean(false);
- private static final ThreadPoolExecutor eventListenerExecutor = StratosThreadPool
- .getExecutorService("mock.iaas.event.listener.thread.pool", 35, 100);
- private static final ScheduledExecutorService healthStatNotifierExecutorService = StratosThreadPool
+// private static final ThreadPoolExecutor eventListenerExecutor = StratosThreadPool
+// .getExecutorService("mock.iaas.event.listener.thread.pool", 35, 100);
+ private static final ScheduledThreadPoolExecutor healthStatNotifierExecutor = StratosThreadPool
.getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool", 100);
public MockInstance(MockInstanceContext mockInstanceContext) {
@@ -95,7 +95,7 @@ public class MockInstance implements Serializable {
log.debug(String.format("Starting health statistics notifier: [member-id] %s",
mockInstanceContext.getMemberId()));
}
- healthStatNotifierScheduledFuture = healthStatNotifierExecutorService
+ healthStatNotifierScheduledFuture = healthStatNotifierExecutor
.scheduleAtFixedRate(mockHealthStatisticsNotifier, 0, HEALTH_STAT_INTERVAL, TimeUnit.SECONDS);
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/4b15a1b8/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java
index 1c3da6e..367b4b3 100644
--- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java
+++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/generator/MockHealthStatisticsGenerator.java
@@ -26,10 +26,7 @@ import org.apache.stratos.mock.iaas.config.MockIaasConfig;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* Mock health statistics generator.
@@ -39,7 +36,7 @@ public class MockHealthStatisticsGenerator {
private static final Log log = LogFactory.getLog(MockHealthStatisticsGenerator.class);
private static volatile MockHealthStatisticsGenerator instance;
- private static final ScheduledExecutorService scheduledExecutorService =
+ private static final ScheduledThreadPoolExecutor scheduledExecutor =
StratosThreadPool.getScheduledExecutorService("mock.iaas.health.statistics.generator.thread.pool", 10);
// Map<ServiceName, Map<ScalingFactor, ScheduledFuture>>
@@ -84,7 +81,7 @@ public class MockHealthStatisticsGenerator {
if (statisticsPattern.getCartridgeType().equals(serviceName) &&
(statisticsPattern.getSampleDuration() > 0)) {
MockHealthStatisticsUpdater runnable = new MockHealthStatisticsUpdater(statisticsPattern);
- ScheduledFuture<?> task = scheduledExecutorService.scheduleAtFixedRate(runnable, 0,
+ ScheduledFuture<?> task = scheduledExecutor.scheduleAtFixedRate(runnable, 0,
statisticsPattern.getSampleDuration(), TimeUnit.SECONDS);
taskList.put(statisticsPattern.getFactor().toString(), task);
}