You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by wi...@apache.org on 2014/11/11 13:03:54 UTC
[15/50] [abbrv] git commit: updated
refs/heads/statscollector-graphite to e06a814
CLOUDSTACK-7864: CPVM continues to be in Stopped state after a failure to start because of a management server restart.
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/3a2f6ffd
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/3a2f6ffd
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/3a2f6ffd
Branch: refs/heads/statscollector-graphite
Commit: 3a2f6ffd4993a1fbd132bdd43c9e697a22b86d32
Parents: 27d6bff
Author: Min Chen <mi...@citrix.com>
Authored: Fri Nov 7 11:47:16 2014 -0800
Committer: Min Chen <mi...@citrix.com>
Committed: Fri Nov 7 16:37:05 2014 -0800
----------------------------------------------------------------------
.../framework/jobs/dao/AsyncJobDao.java | 2 ++
.../framework/jobs/dao/AsyncJobDaoImpl.java | 20 ++++++++++++++++++++
.../jobs/impl/AsyncJobManagerImpl.java | 14 ++++++++++++--
3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a2f6ffd/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
index 170da50..169cae9 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDao.java
@@ -39,4 +39,6 @@ public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
void resetJobProcess(long msid, int jobResultCode, String jobResultMessage);
List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
+
+ List<AsyncJobVO> getResetJobs(long msid);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a2f6ffd/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index 612573f..0d024eb 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -186,4 +186,24 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
s_logger.warn("Unable to reset job status for management server " + msid, e);
}
}
+
+ @Override
+ public List<AsyncJobVO> getResetJobs(long msid) {
+ SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
+ sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
+
+ // construct query: (job_executing_msid=msid OR (job_executing_msid IS NULL AND job_init_msid=msid))
+ SearchCriteria<AsyncJobVO> msQuery = createSearchCriteria();
+ msQuery.addOr("executingMsid", SearchCriteria.Op.EQ, msid);
+ SearchCriteria<AsyncJobVO> initMsQuery = createSearchCriteria();
+ initMsQuery.addAnd("executingMsid", SearchCriteria.Op.NULL);
+ initMsQuery.addAnd("initMsid", SearchCriteria.Op.EQ, msid);
+ msQuery.addOr("initMsId", SearchCriteria.Op.SC, initMsQuery);
+
+ sc.addAnd("executingMsid", SearchCriteria.Op.SC, msQuery);
+
+ Filter filter = new Filter(AsyncJobVO.class, "created", true, null, null);
+ return listIncludingRemovedBy(sc, filter);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/3a2f6ffd/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 548182a..91516d5 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -232,7 +232,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + ", resultCode: " + resultCode + ", result: " +
resultObject);
}
-
+ // still purge item from queue to avoid any blocking
+ _queueMgr.purgeAsyncJobQueueItemId(jobId);
return;
}
@@ -240,7 +241,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if (s_logger.isDebugEnabled()) {
s_logger.debug("job-" + jobId + " is already completed.");
}
-
+ // still purge item from queue to avoid any blocking
+ _queueMgr.purgeAsyncJobQueueItemId(jobId);
return;
}
@@ -547,6 +549,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
// guard final clause as well
try {
if (job.getSyncSource() != null) {
+ // here check queue item one more time to double make sure that queue item is removed in case of any uncaught exception
+ _queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
@@ -976,6 +980,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
_queueMgr.cleanupActiveQueueItems(msid, true);
// reset job status for all jobs running on this ms node
_jobDao.resetJobProcess(msid, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart or shutdown");
+ // purge those queue items for those cancelled jobs above, which may not be picked up by any MS node yet
+ List<AsyncJobVO> cancelJobs = _jobDao.getResetJobs(msid);
+ for (AsyncJobVO job : cancelJobs){
+ _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+ }
+
}
});
} catch (Throwable e) {