You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/03/14 01:14:51 UTC

[06/10] git commit: updated refs/heads/master to 42d6373

Seperate job executor pools to avoid thread starvation situation.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/8e27120b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/8e27120b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/8e27120b

Branch: refs/heads/master
Commit: 8e27120be40725c7e89db8a69b6a976cade8e917
Parents: 6ad245e
Author: Kelven Yang <ke...@gmail.com>
Authored: Thu Mar 6 10:36:39 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Thu Mar 13 16:59:56 2014 -0700

----------------------------------------------------------------------
 .../framework/jobs/AsyncJobManager.java         |  3 ++-
 .../jobs/impl/AsyncJobManagerImpl.java          | 21 ++++++++++++++------
 .../framework/jobs/impl/AsyncJobMonitor.java    |  2 +-
 3 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8e27120b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index 67733ed..fe5c067 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -27,7 +27,8 @@ import com.cloud.utils.component.Manager;
 
 public interface AsyncJobManager extends Manager {
 
-    public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
+    public static final String API_JOB_POOL_THREAD_PREFIX = "API-Job-Executor";
+    public static final String WORK_JOB_POOL_THREAD_PREFIX = "Work-Job-Executor";
 
     AsyncJobVO getAsyncJob(long jobId);
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8e27120b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index b9246aa..9b9460c 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -117,7 +117,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     private volatile long _executionRunNumber = 1;
 
     private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
-    private ExecutorService _executor;
+    private ExecutorService _apiJobExecutor;
+    private ExecutorService _workerJobExecutor;
 
     @Override
     public String getConfigComponentName() {
@@ -390,7 +391,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         if (executeInContext) {
             runnable.run();
         } else {
-            _executor.submit(runnable);
+            if (job.getDispatcher() == null || job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher"))
+                _apiJobExecutor.submit(runnable);
+            else
+                _workerJobExecutor.submit(runnable);
         }
     }
 
@@ -855,10 +859,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             final Properties dbProps = DbProperties.getDbProperties();
             final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
 
-            int poolSize = (cloudMaxActive * 2) / 3;
+            int apiPoolSize = cloudMaxActive / 2;
+            int workPoolSize = (cloudMaxActive * 2) / 3;
 
-            s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
-            _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobManager.JOB_POOL_THREAD_PREFIX));
+            s_logger.info("Start AsyncJobManager API executor thread pool in size " + apiPoolSize);
+            _apiJobExecutor = Executors.newFixedThreadPool(apiPoolSize, new NamedThreadFactory(AsyncJobManager.API_JOB_POOL_THREAD_PREFIX));
+
+            s_logger.info("Start AsyncJobManager Work executor thread pool in size " + workPoolSize);
+            _workerJobExecutor = Executors.newFixedThreadPool(workPoolSize, new NamedThreadFactory(AsyncJobManager.WORK_JOB_POOL_THREAD_PREFIX));
         } catch (final Exception e) {
             throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
         }
@@ -941,7 +949,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     @Override
     public boolean stop() {
         _heartbeatScheduler.shutdown();
-        _executor.shutdown();
+        _apiJobExecutor.shutdown();
+        _workerJobExecutor.shutdown();
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8e27120b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
index 0b6f7a5..b1cac3e 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -115,7 +115,7 @@ public class AsyncJobMonitor extends ManagerBase {
             assert (_activeTasks.get(runNumber) == null);
 
             long threadId = Thread.currentThread().getId();
-            boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobManager.JOB_POOL_THREAD_PREFIX);
+            boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobManager.API_JOB_POOL_THREAD_PREFIX);
             ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread);
             _activeTasks.put(runNumber, record);
             if (fromPoolThread)