You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2016/01/05 16:47:27 UTC

[18/50] [abbrv] stratos git commit: using sheduled thread pool per *monitors

using sheduled thread pool per *monitors


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/6189945b
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/6189945b
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/6189945b

Branch: refs/heads/stratos-4.1.x
Commit: 6189945b97b8bdacce63f4f6792684e33260812f
Parents: d4b35c0
Author: Isuru Haththotuwa <is...@apache.org>
Authored: Mon Dec 21 20:58:15 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Mon Dec 21 20:58:15 2015 +0530

----------------------------------------------------------------------
 .../applications/topic/ApplicationBuilder.java  |  3 +
 .../AutoscalerTopologyEventReceiver.java        |  2 +
 .../stratos/autoscaler/monitor/Monitor.java     |  2 +
 .../monitor/cluster/ClusterMonitor.java         | 13 +++--
 .../component/ParentComponentMonitor.java       | 11 +++-
 .../services/impl/AutoscalerServiceImpl.java    |  5 ++
 .../autoscaler/util/AutoscalerConstants.java    |  3 +-
 .../stratos/autoscaler/util/AutoscalerUtil.java |  1 +
 .../threading/GracefulThreadPoolTerminator.java |  2 +
 .../common/threading/StratosThreadPool.java     | 60 ++++++++++++++++++--
 10 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index 5fd4d5a..ec6b50a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -329,10 +329,12 @@ public class ApplicationBuilder {
                                 getAliasToActiveChildMonitorsMap().values()) {
                             //destroying the drools
                             monitor1.destroy();
+                            monitor1.cleanup();
                         }
                     }
                     // stopping application thread
                     applicationMonitor.destroy();
+                    applicationMonitor.cleanup();
                     AutoscalerContext.getInstance().removeAppMonitor(applicationId);
                     // Remove network partition algorithm context
                     AutoscalerContext.getInstance().removeNetworkPartitionAlgorithmContext(applicationId);
@@ -454,6 +456,7 @@ public class ApplicationBuilder {
                                 getAliasToActiveChildMonitorsMap().values()) {
                             //destroying the drools
                             monitor1.destroy();
+                            monitor1.cleanup();
                         }
                     }
                     org.apache.stratos.autoscaler.context.partition.network.NetworkPartitionContext networkPartitionContext =

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index c150a1f..5f123b2 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -204,6 +204,7 @@ public class AutoscalerTopologyEventReceiver {
                 }
                 //changing the status in the monitor, will notify its parent monitor
                 monitor.destroy();
+                monitor.cleanup();
                 monitor.notifyParentMonitor(ClusterStatus.Created, instanceId);
 
             }
@@ -316,6 +317,7 @@ public class AutoscalerTopologyEventReceiver {
                 if (!monitor.hasInstance() && (appMonitor != null && appMonitor.isTerminating())) {
                     //Destroying and Removing the Cluster monitor
                     monitor.destroy();
+                    monitor.cleanup();
                     AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
                 }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
index c58ec41..2814958 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java
@@ -216,4 +216,6 @@ public abstract class Monitor implements EventHandler, Runnable {
     public enum MonitorType {
         Application, Group, Cluster
     }
+
+    public abstract void cleanup ();
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
index ead9130..ee936db 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
@@ -86,8 +86,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class ClusterMonitor extends Monitor {
 
     private static final Log log = LogFactory.getLog(ClusterMonitor.class);
-    private final ScheduledThreadPoolExecutor scheduler;
-    private final ThreadPoolExecutor executor;
+    private ScheduledThreadPoolExecutor scheduler;
+    private ThreadPoolExecutor executor;
     protected boolean hasFaultyMember = false;
     protected ClusterContext clusterContext;
     protected String serviceType;
@@ -107,11 +107,11 @@ public class ClusterMonitor extends Monitor {
     public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree,
                           String deploymentPolicyId) {
 
-        scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID, 50);
         int threadPoolSize = Integer.getInteger(AutoscalerConstants.MONITOR_THREAD_POOL_SIZE, 100);
         executor = StratosThreadPool.getExecutorService(
                 AutoscalerConstants.MONITOR_THREAD_POOL_ID, ((int)Math.ceil(threadPoolSize/3)), threadPoolSize);
         this.clusterId = cluster.getClusterId();
+        scheduler = StratosThreadPool.getScheduledExecutorService(clusterId, 2);
         readConfigurations();
         this.groupScalingEnabledSubtree = groupScalingEnabledSubtree;
         this.setCluster(new Cluster(cluster));
@@ -147,7 +147,7 @@ public class ClusterMonitor extends Monitor {
         return MonitorType.Cluster;
     }
 
-    public void startScheduler() {
+    public synchronized void startScheduler() {
         schedulerFuture = scheduler.scheduleAtFixedRate(this, 0,
                 getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS);
     }
@@ -1540,4 +1540,9 @@ public class ClusterMonitor extends Monitor {
     public String getDeploymentPolicyId() {
         return deploymentPolicyId;
     }
+
+    public void cleanup () {
+        // shutdown thread pools
+        StratosThreadPool.shutDownScheduledThreadPoolGracefully(clusterId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index 9efdf7c..24983a9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -68,8 +68,7 @@ public abstract class ParentComponentMonitor extends Monitor {
     private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
 
     //Scheduler executor service to execute this monitor in a thread
-    private final ScheduledThreadPoolExecutor scheduler = StratosThreadPool.getScheduledExecutorService(
-            "autoscaler.monitor.scheduler.thread.pool", 100);
+    private final ScheduledThreadPoolExecutor scheduler;
     //The monitors dependency tree with all the start-able/kill-able dependencies
     protected DependencyTree startupDependencyTree;
     //The monitors dependency tree with all the scaling dependencies
@@ -95,6 +94,7 @@ public abstract class ParentComponentMonitor extends Monitor {
         terminatingInstancesMap = new ConcurrentHashMap<String, List<String>>();
         pendingChildMonitorsList = new ArrayList<String>();
         id = component.getUniqueIdentifier();
+        scheduler = StratosThreadPool.getScheduledExecutorService(id, 2);
 
         // Building the startup dependencies for this monitor within the immediate children
         startupDependencyTree = DependencyBuilder.getInstance().buildDependency(component);
@@ -126,7 +126,7 @@ public abstract class ParentComponentMonitor extends Monitor {
     /**
      * Starting the scheduler for the monitor
      */
-    public void startScheduler() {
+    public synchronized void startScheduler() {
         int monitoringIntervalMilliseconds = 60000;
         schedulerFuture = scheduler.scheduleAtFixedRate(this, 0,
                 monitoringIntervalMilliseconds, TimeUnit.MILLISECONDS);
@@ -1067,4 +1067,9 @@ public abstract class ParentComponentMonitor extends Monitor {
     public void removeMonitor(String id) {
         this.aliasToActiveChildMonitorsMap.remove(id);
     }
+
+    public void cleanup () {
+        // shutdown thread pools
+        StratosThreadPool.shutDownScheduledThreadPoolGracefully(id);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
index 0943de0..a77c80a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/services/impl/AutoscalerServiceImpl.java
@@ -935,6 +935,7 @@ public class AutoscalerServiceImpl implements AutoscalerService {
                 getAppMonitor(applicationId);
         if (applicationMonitor != null) {
             applicationMonitor.destroy();
+            applicationMonitor.cleanup();
 
             if (applicationMonitor.hasInstance()) {
                 Map<String, Monitor> monitors = applicationMonitor.
@@ -966,6 +967,7 @@ public class AutoscalerServiceImpl implements AutoscalerService {
                             getClusterMonitor(clusterId);
                     if (clusterMonitor != null) {
                         clusterMonitor.destroy();
+                        clusterMonitor.cleanup();
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format(
@@ -977,6 +979,8 @@ public class AutoscalerServiceImpl implements AutoscalerService {
                         Collection<ClusterInstance> allClusterInstances = cluster.getClusterInstances();
 
                         if (allClusterInstances.isEmpty() && clusterMonitor != null) {
+                            clusterMonitor.destroy();
+                            clusterMonitor.cleanup();
                             AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
                         }
 
@@ -1007,6 +1011,7 @@ public class AutoscalerServiceImpl implements AutoscalerService {
                 ApplicationContext applicationContext = AutoscalerContext.getInstance().
                         getApplicationContext(applicationId);
                 applicationMonitor.destroy();
+                applicationMonitor.cleanup();
                 AutoscalerContext.getInstance().removeAppMonitor(applicationId);
                 // Remove network partition algorithm context
                 AutoscalerContext.getInstance().removeNetworkPartitionAlgorithmContext(applicationId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
index ef12983..62ae532 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java
@@ -38,7 +38,8 @@ public final class AutoscalerConstants {
     public static final String AUTOSCALER_SCHEDULER_ID = "autoscaler.scheduler.thread.pool";
     public static final String SCHEDULER_THREAD_POOL_SIZE_KEY = "autoscaler.scheduler.thread.pool.size";
     public static final int AUTOSCALER_SCHEDULER_THREAD_POOL_SIZE = 5;
-    public static final int AUTOSCALER_THREAD_POOL_SIZE = 50;
+    //public static final int AUTOSCALER_THREAD_POOL_SIZE = 50;
+    public static final int AUTOSCALER_THREAD_POOL_SIZE = 10;
     public static final String COMPONENTS_CONFIG = CarbonUtils.getCarbonConfigDirPath() +
             File.separator + "stratos-config.xml";
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index b6ce0ed..e00f4e9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -1027,6 +1027,7 @@ public class AutoscalerUtil {
         while(monitorsIter.hasNext()) {
             Monitor monitor = monitorsIter.next();
             monitor.destroy();
+            monitor.cleanup();
             Iterator<Instance> instances = monitor.getInstances().iterator();
             while(instances.hasNext()) {
                 Instance instance = instances.next();

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
index 70cda66..2ccfc45 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/GracefulThreadPoolTerminator.java
@@ -41,6 +41,7 @@ public class GracefulThreadPoolTerminator implements Callable<String> {
     @Override
     public String call() {
         // try to shut down gracefully
+        log.info("Attempting to gracefully shut down thread pool " +  threadPoolId);
         executor.shutdown();
         // wait 10 secs till terminated
         try {
@@ -53,6 +54,7 @@ public class GracefulThreadPoolTerminator implements Callable<String> {
             // interrupted, shutdown now
             executor.shutdownNow();
         }
+        log.info("Successfully shut down thread pool " +  threadPoolId);
         return threadPoolId;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/6189945b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 4eb8304..1f4e5c8 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -35,8 +35,8 @@ public class StratosThreadPool {
 
     private static final Log log = LogFactory.getLog(StratosThreadPool.class);
 
-    private static Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>();
-    private static Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>();
+    private static volatile Map<String, ThreadPoolExecutor> executorMap = new ConcurrentHashMap<>();
+    private static volatile Map<String, ScheduledThreadPoolExecutor> scheduledExecutorMap = new ConcurrentHashMap<>();
     private static final Object executorServiceMapLock = new Object();
     private static final Object scheduledServiceMapLock = new Object();
 
@@ -47,14 +47,14 @@ public class StratosThreadPool {
      * @param maxSize Thread pool size
      * @return ThreadPoolExecutor
      */
-    public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int
-            maxSize) {
+    public static ThreadPoolExecutor getExecutorService(String identifier, int initialSize, int maxSize) {
         ThreadPoolExecutor executor = executorMap.get(identifier);
         if (executor == null) {
             synchronized (executorServiceMapLock) {
                 if (executor == null) {
+                    int taskQueueSize = initialSize > 3 ? (int)Math.ceil(initialSize/3) : 1;
                     executor = new ThreadPoolExecutor(initialSize, maxSize, 60L, TimeUnit.SECONDS,
-                            new LinkedBlockingQueue<Runnable>(), new StratosThreadFactory(identifier));
+                            new LinkedBlockingQueue<Runnable>(taskQueueSize), new StratosThreadFactory(identifier));
                     executorMap.put(identifier, executor);
                     log.info(String.format("Thread pool created: [type] Executor [id] %s " +
                             "[initial size] %d [max size] %d", identifier, initialSize, maxSize));
@@ -88,6 +88,43 @@ public class StratosThreadPool {
         return scheduledExecutor;
     }
 
+    /**
+     * Stops the executor with the specified id in a graceful manner
+     *
+     * @param threadPoolId thread pool id
+     */
+    public static void shutDownThreadPoolGracefully (String threadPoolId) {
+
+        ThreadPoolExecutor executor = executorMap.get(threadPoolId);
+        if (executor == null) {
+            log.warn("No thread pool found for id " + threadPoolId + ", unable to shut down");
+            return;
+        }
+
+        new GracefulThreadPoolTerminator(threadPoolId, executor).call();
+        removeThreadPoolFromCache(threadPoolId);
+    }
+
+    /**
+     * Stops the scheduled executor with the specified id in a graceful manner
+     *
+     * @param threadPoolId thread pool id
+     */
+    public static void shutDownScheduledThreadPoolGracefully (String threadPoolId) {
+
+        ScheduledThreadPoolExecutor scheduledExecutor = scheduledExecutorMap.get(threadPoolId);
+        if (scheduledExecutor == null) {
+            log.warn("No scheduled thread pool found for id " + threadPoolId + ", unable to shut down");
+            return;
+        }
+
+        new GracefulThreadPoolTerminator(threadPoolId, scheduledExecutor).call();
+        removeScheduledThreadPoolFromCache(threadPoolId);
+    }
+
+    /**
+     * Stop all executors in a graceful manner
+     */
     public static void shutDownAllThreadPoolsGracefully () {
 
         int threadPoolCount = executorMap.size();
@@ -129,6 +166,9 @@ public class StratosThreadPool {
         }
     }
 
+    /**
+     * Stop all scheduled executors in a graceful manner
+     */
     public static void shutDownAllScheduledExecutorsGracefully () {
 
         int threadPoolCount = scheduledExecutorMap.size();
@@ -170,6 +210,11 @@ public class StratosThreadPool {
         }
     }
 
+    /**
+     * Removes the thread pool with id terminatedPoolId from the executorMap
+     *
+     * @param terminatedPoolId thread pool id
+     */
     private static void removeThreadPoolFromCache(String terminatedPoolId) {
         if (executorMap.remove(terminatedPoolId) != null) {
             log.info("Thread pool [id] " + terminatedPoolId + " is successfully shut down" +
@@ -177,6 +222,11 @@ public class StratosThreadPool {
         }
     }
 
+    /**
+     * Removes the scheduled thread pool with id terminatedPoolId from the scheduledExecutorMap
+     *
+     * @param terminatedPoolId thread pool id
+     */
     private static void removeScheduledThreadPoolFromCache(String terminatedPoolId) {
         if (scheduledExecutorMap.remove(terminatedPoolId) != null) {
             log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is successfully shut down" +