You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2016/01/05 16:47:27 UTC
[18/50] [abbrv] stratos git commit: using sheduled thread pool per
*monitors
using sheduled thread pool per *monitors
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/6189945b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/6189945b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/6189945b
Branch: refs/heads/stratos-4.1.x
Commit: 6189945b97b8bdacce63f4f6792684e33260812f
Parents: d4b35c0
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Mon Dec 21 20:58:15 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Mon Dec 21 20:58:15 2015 +0530
----------------------------------------------------------------------
.../applications/topic/ApplicationBuilder.java | 3 +
.../AutoscalerTopologyEventReceiver.java | 2 +
.../stratos/autoscaler/monitor/Monitor.java | 2 +
.../monitor/cluster/ClusterMonitor.java | 13 +++--
.../component/ParentComponentMonitor.java | 11 +++-
.../services/impl/AutoscalerServiceImpl.java | 5 ++
.../autoscaler/util/AutoscalerConstants.java | 3 +-
.../stratos/autoscaler/util/AutoscalerUtil.java | 1 +
.../threading/GracefulThreadPoolTerminator.java | 2 +
.../common/threading/StratosThreadPool.java | 60 ++++++++++++++++++--
10 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index 5fd4d5a..ec6b50a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -329,10 +329,12 @@ public class ApplicationBuilder {
getAliasToActiveChildMonitorsMap().values()) {
//destroying the drools
monitor1.destroy();
+ monitor1.cleanup();
}
}
// stopping application thread
applicationMonitor.destroy();
+ applicationMonitor.cleanup();
AutoscalerContext.getInstance().removeAppMonitor(applicationId);
// Remove network partition algorithm context
AutoscalerContext.getInstance().removeNetworkPartitionAlgorithmContext(applicationId);
@@ -454,6 +456,7 @@ public class ApplicationBuilder {
getAliasToActiveChildMonitorsMap().values()) {
//destroying the drools
monitor1.destroy();
+ monitor1.cleanup();
}
}
org.apache.stratos.autoscaler.context.partition.network.NetworkPartitionContext networkPartitionContext =
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index c150a1f..5f123b2 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -204,6 +204,7 @@ public class AutoscalerTopologyEventReceiver {
}
//changing the status in the monitor, will notify its parent monitor
monitor.destroy();
+ monitor.cleanup();
monitor.notifyParentMonitor(ClusterStatus.Created, instanceId);
}
@@ -316,6 +317,7 @@ public class AutoscalerTopologyEventReceiver {
if (!monitor.hasInstance() && (appMonitor != null && appMonitor.isTerminating())) {
//Destroying and Removing the Cluster monitor
monitor.destroy();
+ monitor.cleanup();
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
index c58ec41..2814958 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
@@ -216,4 +216,6 @@ public abstract class Monitor implements EventHandler, Runnable {
public enum MonitorType {
Application, Group, Cluster
}
+
+ public abstract void cleanup ();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index ead9130..ee936db 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -86,8 +86,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class ClusterMonitor extends Monitor {
private static final Log log = LogFactory.getLog(ClusterMonitor.class);
- private final ScheduledThreadPoolExecutor scheduler;
- private final ThreadPoolExecutor executor;
+ private ScheduledThreadPoolExecutor scheduler;
+ private ThreadPoolExecutor executor;
protected boolean hasFaultyMember = false;
protected ClusterContext clusterContext;
protected String serviceType;
@@ -107,11 +107,11 @@ public class ClusterMonitor extends Monitor {
public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree,
String deploymentPolicyId) {
- scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID, 50);
int threadPoolSize = Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100);
executor = StratosThreadPool.getExecutorService(
AutoscalerConstants.MONITOR_THREAD_POOL_ID, ((int)Math.ceil(threadPoolSize/3)), threadPoolSize);
this.clusterId = cluster.getClusterId();
+ scheduler = StratosThreadPool.getScheduledExecutorService(clusterId, 2);
readConfigurations();
this.groupScalingEnabledSubtree = groupScalingEnabledSubtree;
this.setCluster(new Cluster(cluster));
@@ -147,7 +147,7 @@ public class ClusterMonitor extends Monitor {
return MonitorType.Cluster;
}
- public void startScheduler() {
+ public synchronized void startScheduler() {
schedulerFuture = scheduler.scheduleAtFixedRate(this, 0,
getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
}
@@ -1540,4 +1540,9 @@ public class ClusterMonitor extends Monitor {
public String getDeploymentPolicyId() {
return deploymentPolicyId;
}
+
+ public void cleanup () {
+ // shutdown thread pools
+ StratosThreadPool.shutDownScheduledThreadPoolGracefully(clusterId);
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index 9efdf7c..24983a9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -68,8 +68,7 @@ public abstract class ParentComponentMonitor extends Monitor {
private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
//Scheduler executor service to execute this monitor in a thread
- private final ScheduledThreadPoolExecutor scheduler = StratosThreadPool.getScheduledExecutorService(
- "autoscaler.monitor.scheduler.thread.pool", 100);
+ private final ScheduledThreadPoolExecutor scheduler;
//The monitors dependency tree with all the start-able/kill-able dependencies
protected DependencyTree startupDependencyTree;
//The monitors dependency tree with all the scaling dependencies
@@ -95,6 +94,7 @@ public abstract class ParentComponentMonitor extends Monitor {
terminatingInstancesMap = new ConcurrentHashMap<String, List<String>>();
pendingChildMonitorsList = new ArrayList<String>();
id = component.getUniqueIdentifier();
+ scheduler = StratosThreadPool.getScheduledExecutorService(id, 2);
// Building the startup dependencies for this monitor within the immediate children
startupDependencyTree = DependencyBuilder.getInstance().buildDependency(component);
@@ -126,7 +126,7 @@ public abstract class ParentComponentMonitor extends Monitor {
/**
* Starting the scheduler for the monitor
*/
- public void startScheduler() {
+ public synchronized void startScheduler() {
int monitoringIntervalMilliseconds = 60000;
schedulerFuture = scheduler.scheduleAtFixedRate(this, 0,
monitoringIntervalMilliseconds, TimeUnit.MILLISECONDS);
@@ -1067,4 +1067,9 @@ public abstract class ParentComponentMonitor extends Monitor {
public void removeMonitor(String id) {
this.aliasToActiveChildMonitorsMap.remove(id);
}
+
+ public void cleanup () {
+ // shutdown thread pools
+ StratosThreadPool.shutDownScheduledThreadPoolGracefully(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
index 0943de0..a77c80a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
@@ -935,6 +935,7 @@ public class AutoscalerServiceImpl implements AutoscalerService {
getAppMonitor(applicationId);
if (applicationMonitor != null) {
applicationMonitor.destroy();
+ applicationMonitor.cleanup();
if (applicationMonitor.hasInstance()) {
Map<String, Monitor> monitors = applicationMonitor.
@@ -966,6 +967,7 @@ public class AutoscalerServiceImpl implements AutoscalerService {
getClusterMonitor(clusterId);
if (clusterMonitor != null) {
clusterMonitor.destroy();
+ clusterMonitor.cleanup();
} else {
if (log.isDebugEnabled()) {
log.debug(String.format(
@@ -977,6 +979,8 @@ public class AutoscalerServiceImpl implements AutoscalerService {
Collection<ClusterInstance> allClusterInstances = cluster.getClusterInstances();
if (allClusterInstances.isEmpty() && clusterMonitor != null) {
+ clusterMonitor.destroy();
+ clusterMonitor.cleanup();
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
}
@@ -1007,6 +1011,7 @@ public class AutoscalerServiceImpl implements AutoscalerService {
ApplicationContext applicationContext = AutoscalerContext.getInstance().
getApplicationContext(applicationId);
applicationMonitor.destroy();
+ applicationMonitor.cleanup();
AutoscalerContext.getInstance().removeAppMonitor(applicationId);
// Remove network partition algorithm context
AutoscalerContext.getInstance().removeNetworkPartitionAlgorithmContext(applicationId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index ef12983..62ae532 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -38,7 +38,8 @@ public final class AutoscalerConstants {
public static final String AUTOSCALER_SCHEDULER_ID = "autoscaler.scheduler.thread.pool";
public static final String SCHEDULER_THREAD_POOL_SIZE_KEY = "autoscaler.scheduler.thread.pool.size";
public static final int AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE = 5;
- public static final int AUTOSCALER_THREAD_POOL_SIZE = 50;
+ //public static final int AUTOSCALER_THREAD_POOL_SIZE = 50;
+ public static final int AUTOSCALER_THREAD_POOL_SIZE = 10;
public static final String COMPONENTS_CONFIG = CarbonUtils.getCarbonConfigDirPath() +
File.separator + "stratos-config.xml";
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index b6ce0ed..e00f4e9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -1027,6 +1027,7 @@ public class AutoscalerUtil {
while(monitorsIter.hasNext()) {
Monitor monitor = monitorsIter.next();
monitor.destroy();
+ monitor.cleanup();
Iterator<Instance> instances = monitor.getInstances().iterator();
while(instances.hasNext()) {
Instance instance = instances.next();
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
index 70cda66..2ccfc45 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
@@ -41,6 +41,7 @@ public class GracefulThreadPoolTerminator implements Callable<String> {
@Override
public String call() {
// try to shut down gracefully
+ log.info("Attempting to gracefully shut down thread pool " + threadPoolId);
executor.shutdown();
// wait 10 secs till terminated
try {
@@ -53,6 +54,7 @@ public class GracefulThreadPoolTerminator implements Callable<String> {
// interrupted, shutdown now
executor.shutdownNow();
}
+ log.info("Successfully shut down thread pool " + threadPoolId);
return threadPoolId;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 4eb8304..1f4e5c8 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -35,8 +35,8 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);
- private static Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>();
- private static Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>();
+ private static volatile Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>();
+ private static volatile Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>();
private static final Object executorServiceMapLock = new Object();
private static final Object scheduledServiceMapLock = new Object();
@@ -47,14 +47,14 @@ public class StratosThreadPool {
* @param maxSize Thread pool size
* @return ThreadPoolExecutor
*/
- public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int
- maxSize) {
+ public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int maxSize) {
ThreadPoolExecutor executor = executorMap.get(identifier);
if (executor == null) {
synchronized (executorServiceMapLock) {
if (executor == null) {
+ int taskQueueSize = initialSize > 3 ? (int)Math.ceil(initialSize/3) : 1;
executor = new ThreadPoolExecutor(initialSize, maxSize, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new StratosThreadFactory(identifier));
+ new LinkedBlockingQueue<Runnable>(taskQueueSize), new StratosThreadFactory(identifier));
executorMap.put(identifier, executor);
log.info(String.format("Thread pool created: [type] Executor [id] %s " +
"[initial size] %d [max size] %d", identifier, initialSize, maxSize));
@@ -88,6 +88,43 @@ public class StratosThreadPool {
return scheduledExecutor;
}
+ /**
+ * Stops the executor with the specified id in a graceful manner
+ *
+ * @param threadPoolId thread pool id
+ */
+ public static void shutDownThreadPoolGracefully (String threadPoolId) {
+
+ ThreadPoolExecutor executor = executorMap.get(threadPoolId);
+ if (executor == null) {
+ log.warn("No thread pool found for id " + threadPoolId + ", unable to shut down");
+ return;
+ }
+
+ new GracefulThreadPoolTerminator(threadPoolId, executor).call();
+ removeThreadPoolFromCache(threadPoolId);
+ }
+
+ /**
+ * Stops the scheduled executor with the specified id in a graceful manner
+ *
+ * @param threadPoolId thread pool id
+ */
+ public static void shutDownScheduledThreadPoolGracefully (String threadPoolId) {
+
+ ScheduledThreadPoolExecutor scheduledExecutor = scheduledExecutorMap.get(threadPoolId);
+ if (scheduledExecutor == null) {
+ log.warn("No scheduled thread pool found for id " + threadPoolId + ", unable to shut down");
+ return;
+ }
+
+ new GracefulThreadPoolTerminator(threadPoolId, scheduledExecutor).call();
+ removeScheduledThreadPoolFromCache(threadPoolId);
+ }
+
+ /**
+ * Stop all executors in a graceful manner
+ */
public static void shutDownAllThreadPoolsGracefully () {
int threadPoolCount = executorMap.size();
@@ -129,6 +166,9 @@ public class StratosThreadPool {
}
}
+ /**
+ * Stop all scheduled executors in a graceful manner
+ */
public static void shutDownAllScheduledExecutorsGracefully () {
int threadPoolCount = scheduledExecutorMap.size();
@@ -170,6 +210,11 @@ public class StratosThreadPool {
}
}
+ /**
+ * Removes the thread pool with id terminatedPoolId from the executorMap
+ *
+ * @param terminatedPoolId thread pool id
+ */
private static void removeThreadPoolFromCache(String terminatedPoolId) {
if (executorMap.remove(terminatedPoolId) != null) {
log.info("Thread pool [id] " + terminatedPoolId + " is successfully shut down" +
@@ -177,6 +222,11 @@ public class StratosThreadPool {
}
}
+ /**
+ * Removes the scheduled thread pool with id terminatedPoolId from the scheduledExecutorMap
+ *
+ * @param terminatedPoolId thread pool id
+ */
private static void removeScheduledThreadPoolFromCache(String terminatedPoolId) {
if (scheduledExecutorMap.remove(terminatedPoolId) != null) {
log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is successfully shut down" +