You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/03/04 02:45:39 UTC

git commit: updated refs/heads/master to 5dd4fb2

Repository: cloudstack
Updated Branches:
  refs/heads/master bbf5a912c -> 5dd4fb22e


Remove cancelled jobs from job monitoring, correct mis-calculated time-unit in job cancellation.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/5dd4fb22
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/5dd4fb22
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/5dd4fb22

Branch: refs/heads/master
Commit: 5dd4fb22eff5ec6df3c79bab0ebb99226c954e9b
Parents: bbf5a91
Author: Kelven Yang <ke...@gmail.com>
Authored: Mon Mar 3 16:09:40 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Mon Mar 3 17:44:58 2014 -0800

----------------------------------------------------------------------
 .../jobs/impl/AsyncJobManagerImpl.java          | 15 ++++++----
 .../framework/jobs/impl/AsyncJobMonitor.java    | 30 ++++++++++++++++----
 2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5dd4fb22/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 2be2786..b9246aa 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", "1440",
         "Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global, 60l);
     private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", "60",
-        "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 60l);
+            "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 240l);
 
     private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
 
     private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3;     // 3 seconds
-    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60;     // 60 seconds
 
     private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
     private static final int HEARTBEAT_INTERVAL = 2000;
@@ -706,14 +705,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 try {
                     s_logger.trace("Begin cleanup expired async-jobs");
 
-                    Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 1000);
+                    Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
 
                     // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
                     // hopefully this will be fast enough to balance potential growth of job table
                     //1) Expire unfinished jobs that weren't processed yet
                     List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
                     for (AsyncJobVO job : l) {
-                        s_logger.trace("Expunging unfinished job " + job);
+                        s_logger.info("Expunging unfinished job " + job);
+
+                        _jobMonitor.unregisterByJobId(job.getId());
                         expungeAsyncJob(job);
                     }
 
@@ -721,15 +722,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
                     for (AsyncJobVO job : completedJobs) {
                         s_logger.trace("Expunging completed job " + job);
+
                         expungeAsyncJob(job);
                     }
 
                     // forcefully cancel blocking queue items if they've been staying there for too long
-                    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 1000, false);
+                    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
                     if (blockItems != null && blockItems.size() > 0) {
                         for (SyncQueueItemVO item : blockItems) {
                             if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+                                s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
                                 completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
+
+                                _jobMonitor.unregisterByJobId(item.getContentId());
                             }
 
                             // purge the item and resume queue processing

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5dd4fb22/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
index 6718181..0b6f7a5 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -17,6 +17,7 @@
 package org.apache.cloudstack.framework.jobs.impl;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,8 +39,7 @@ import com.cloud.utils.component.ManagerBase;
 public class AsyncJobMonitor extends ManagerBase {
     public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
 
-    @Inject
-    private MessageBus _messageBus;
+    @Inject private MessageBus _messageBus;
 
     private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
     private final Timer _timer = new Timer();
@@ -86,15 +86,16 @@ public class AsyncJobMonitor extends ManagerBase {
         synchronized (this) {
             for (Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
                 if (entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
-                    s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + entry.getValue().millisSinceLastJobHeartbeat() / 1000 +
-                        " seconds");
+                    s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+                            + entry.getValue().millisSinceLastJobHeartbeat() / 1000 + " seconds");
                 }
             }
         }
     }
 
     @Override
-    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+    public boolean configure(String name, Map<String, Object> params)
+            throws ConfigurationException {
 
         _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
         _timer.scheduleAtFixedRate(new ManagedContextTimerTask() {
@@ -141,6 +142,25 @@ public class AsyncJobMonitor extends ManagerBase {
         }
     }
 
+    public void unregisterByJobId(long jobId) {
+        synchronized (this) {
+            Iterator<Map.Entry<Long, ActiveTaskRecord>> it = _activeTasks.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Long, ActiveTaskRecord> entry = it.next();
+                if (entry.getValue().getJobId() == jobId) {
+                    s_logger.info("Remove Job-" + entry.getValue().getJobId() + " from job monitoring due to job cancelling");
+
+                    if (entry.getValue().isPoolThread())
+                        _activePoolThreads.decrementAndGet();
+                    else
+                        _activeInplaceThreads.decrementAndGet();
+
+                    it.remove();
+                }
+            }
+        }
+    }
+
     public int getActivePoolThreads() {
         return _activePoolThreads.get();
     }