You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mc...@apache.org on 2014/03/06 23:07:04 UTC
[21/50] [abbrv] git commit: updated refs/heads/rbac to 48e08fe
Remove cancelled jobs from job monitoring, correct mis-calculated time-unit in job cancellation.
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/5dd4fb22
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/5dd4fb22
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/5dd4fb22
Branch: refs/heads/rbac
Commit: 5dd4fb22eff5ec6df3c79bab0ebb99226c954e9b
Parents: bbf5a91
Author: Kelven Yang <ke...@gmail.com>
Authored: Mon Mar 3 16:09:40 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Mon Mar 3 17:44:58 2014 -0800
----------------------------------------------------------------------
.../jobs/impl/AsyncJobManagerImpl.java | 15 ++++++----
.../framework/jobs/impl/AsyncJobMonitor.java | 30 ++++++++++++++++----
2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5dd4fb22/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 2be2786..b9246aa 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", "1440",
"Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global, 60l);
private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", "60",
- "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 60l);
+ "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 240l);
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
- private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
@@ -706,14 +705,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
try {
s_logger.trace("Begin cleanup expired async-jobs");
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 1000);
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
//1) Expire unfinished jobs that weren't processed yet
List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
for (AsyncJobVO job : l) {
- s_logger.trace("Expunging unfinished job " + job);
+ s_logger.info("Expunging unfinished job " + job);
+
+ _jobMonitor.unregisterByJobId(job.getId());
expungeAsyncJob(job);
}
@@ -721,15 +722,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
for (AsyncJobVO job : completedJobs) {
s_logger.trace("Expunging completed job " + job);
+
expungeAsyncJob(job);
}
// forcefully cancel blocking queue items if they've been staying there for too long
- List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 1000, false);
+ List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
if (blockItems != null && blockItems.size() > 0) {
for (SyncQueueItemVO item : blockItems) {
if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+ s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
+
+ _jobMonitor.unregisterByJobId(item.getContentId());
}
// purge the item and resume queue processing
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5dd4fb22/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
index 6718181..0b6f7a5 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java
@@ -17,6 +17,7 @@
package org.apache.cloudstack.framework.jobs.impl;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,8 +39,7 @@ import com.cloud.utils.component.ManagerBase;
public class AsyncJobMonitor extends ManagerBase {
public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
- @Inject
- private MessageBus _messageBus;
+ @Inject private MessageBus _messageBus;
private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
private final Timer _timer = new Timer();
@@ -86,15 +86,16 @@ public class AsyncJobMonitor extends ManagerBase {
synchronized (this) {
for (Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
if (entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
- s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + entry.getValue().millisSinceLastJobHeartbeat() / 1000 +
- " seconds");
+ s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
+ + entry.getValue().millisSinceLastJobHeartbeat() / 1000 + " seconds");
}
}
}
}
@Override
- public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+ public boolean configure(String name, Map<String, Object> params)
+ throws ConfigurationException {
_messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
_timer.scheduleAtFixedRate(new ManagedContextTimerTask() {
@@ -141,6 +142,25 @@ public class AsyncJobMonitor extends ManagerBase {
}
}
+ public void unregisterByJobId(long jobId) {
+ synchronized (this) {
+ Iterator<Map.Entry<Long, ActiveTaskRecord>> it = _activeTasks.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Long, ActiveTaskRecord> entry = it.next();
+ if (entry.getValue().getJobId() == jobId) {
+ s_logger.info("Remove Job-" + entry.getValue().getJobId() + " from job monitoring due to job cancelling");
+
+ if (entry.getValue().isPoolThread())
+ _activePoolThreads.decrementAndGet();
+ else
+ _activeInplaceThreads.decrementAndGet();
+
+ it.remove();
+ }
+ }
+ }
+ }
+
public int getActivePoolThreads() {
return _activePoolThreads.get();
}