You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by al...@apache.org on 2012/11/02 18:53:40 UTC
[2/6] git commit: CS-16611: when expunge Async job,
expunge corresponding sync queue items
CS-16611: when expunge Async job, expunge corresponding sync queue items
Conflicts:
server/src/com/cloud/async/AsyncJobManagerImpl.java
server/src/com/cloud/async/dao/SyncQueueItemDao.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/013102c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/013102c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/013102c0
Branch: refs/heads/master
Commit: 013102c028e5763d28419ab50bc2da9a273c78f9
Parents: fe41325
Author: Alena Prokharchyk <al...@citrix.com>
Authored: Wed Oct 31 16:43:51 2012 -0700
Committer: Alena Prokharchyk <al...@citrix.com>
Committed: Fri Nov 2 10:51:29 2012 -0700
----------------------------------------------------------------------
api/src/com/cloud/async/SyncQueueItem.java | 2 +
.../src/com/cloud/async/AsyncJobManagerImpl.java | 123 ++++++++-------
server/src/com/cloud/async/SyncQueueManager.java | 2 +
.../src/com/cloud/async/SyncQueueManagerImpl.java | 27 +++-
.../src/com/cloud/async/dao/SyncQueueItemDao.java | 3 +-
.../com/cloud/async/dao/SyncQueueItemDaoImpl.java | 23 +++
6 files changed, 116 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/api/src/com/cloud/async/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/async/SyncQueueItem.java b/api/src/com/cloud/async/SyncQueueItem.java
index f299481..9f9c379 100644
--- a/api/src/com/cloud/async/SyncQueueItem.java
+++ b/api/src/com/cloud/async/SyncQueueItem.java
@@ -16,7 +16,9 @@
// under the License.
package com.cloud.async;
+
public interface SyncQueueItem {
+ public final String AsyncJobContentType = "AsyncJob";
String getContentType();
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 6cf95fe..5fb5105 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -270,7 +270,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
Random random = new Random();
for(int i = 0; i < 5; i++) {
- queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId(), queueSizeLimit);
+ queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
if(queue != null) {
break;
}
@@ -598,60 +598,73 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
return new Runnable() {
@Override
public void run() {
- GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
- try {
- if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
- try {
- reallyRun();
- } finally {
- scanLock.unlock();
- }
- }
- } finally {
- scanLock.releaseRef();
- }
- }
-
- private void reallyRun() {
- try {
- s_logger.trace("Begin cleanup expired async-jobs");
-
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
-
- // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
- // hopefully this will be fast enough to balance potential growth of job table
- List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
- if(l != null && l.size() > 0) {
- for(AsyncJobVO job : l) {
- _jobDao.expunge(job.getId());
- }
- }
-
- // forcely cancel blocking queue items if they've been staying there for too long
- List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
- if(blockItems != null && blockItems.size() > 0) {
- for(SyncQueueItemVO item : blockItems) {
- if(item.getContentType().equalsIgnoreCase("AsyncJob")) {
- completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
+ GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
+ try {
+ if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+ try {
+ reallyRun();
+ } finally {
+ scanLock.unlock();
+ }
+ }
+ } finally {
+ scanLock.releaseRef();
+ }
+ }
+
+ public void reallyRun() {
+ try {
+ s_logger.trace("Begin cleanup expired async-jobs");
+
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
+
+ // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
+ // hopefully this will be fast enough to balance potential growth of job table
+ List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
+ if(l != null && l.size() > 0) {
+ for(AsyncJobVO job : l) {
+ expungeAsyncJob(job);
+ }
+ }
+
+ // forcefully cancel blocking queue items if they've been staying there for too long
+ List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
+ if(blockItems != null && blockItems.size() > 0) {
+ for(SyncQueueItemVO item : blockItems) {
+ if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+ completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0,
+ getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
}
-
- // purge the item and resume queue processing
- _queueMgr.purgeItem(item.getId());
- }
- }
-
- s_logger.trace("End cleanup expired async-jobs");
- } catch(Throwable e) {
- s_logger.error("Unexpected exception when trying to execute queue item, ", e);
- } finally {
- StackMaid.current().exitCleanup();
- }
- }
- };
- }
-
- private long getMsid() {
- if(_clusterMgr != null) {
+
+ // purge the item and resume queue processing
+ _queueMgr.purgeItem(item.getId());
+ }
+ }
+
+ s_logger.trace("End cleanup expired async-jobs");
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception when trying to execute queue item, ", e);
+ } finally {
+ StackMaid.current().exitCleanup();
+ }
+ }
+
+
+ };
+ }
+
+ @DB
+ protected void expungeAsyncJob(AsyncJobVO job) {
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
+ _jobDao.expunge(job.getId());
+ //purge corresponding sync queue item
+ _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+ txn.commit();
+ }
+
+ private long getMsid() {
+ if(_clusterMgr != null) {
return _clusterMgr.getManagementNodeId();
}
@@ -666,7 +679,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
}
String contentType = item.getContentType();
- if(contentType != null && contentType.equals("AsyncJob")) {
+ if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java
index b605f1b..a7032da 100644
--- a/server/src/com/cloud/async/SyncQueueManager.java
+++ b/server/src/com/cloud/async/SyncQueueManager.java
@@ -30,4 +30,6 @@ public interface SyncQueueManager extends Manager {
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+ void purgeAsyncJobQueueItemId(long asyncJobId);
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java
index c3f4955..4d15065 100644
--- a/server/src/com/cloud/async/SyncQueueManagerImpl.java
+++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java
@@ -185,13 +185,16 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
- _syncQueueItemDao.expunge(itemVO.getId());
-
- queueVO.setLastUpdated(DateUtil.currentGMTTime());
- //decrement the count
- assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
- queueVO.setQueueSize(queueVO.getQueueSize() - 1);
- _syncQueueDao.update(queueVO.getId(), queueVO);
+ _syncQueueItemDao.expunge(itemVO.getId());
+
+ //if item is active, reset queue information
+ if (itemVO.getLastProcessMsid() != null) {
+ queueVO.setLastUpdated(DateUtil.currentGMTTime());
+ //decrement the count
+ assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+ queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+ _syncQueueDao.update(queueVO.getId(), queueVO);
+ }
}
txt.commit();
} catch(Exception e) {
@@ -273,5 +276,13 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+ }
+
+ @Override
+ public void purgeAsyncJobQueueItemId(long asyncJobId) {
+ Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+ if (itemId != null) {
+ purgeItem(itemId);
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDao.java b/server/src/com/cloud/async/dao/SyncQueueItemDao.java
index cd9df2f..6b9da8b 100644
--- a/server/src/com/cloud/async/dao/SyncQueueItemDao.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDao.java
@@ -26,4 +26,5 @@ public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
-}
+ public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
index ce21298..5e75756 100644
--- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
@@ -33,13 +33,25 @@ import com.cloud.async.SyncQueueItemVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
@Local(value = { SyncQueueItemDao.class })
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
+ final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
+
+ protected SyncQueueItemDaoImpl() {
+ super();
+ queueIdSearch = createSearchBuilder(Long.class);
+ queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);
+ queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
+ queueIdSearch.selectField(queueIdSearch.entity().getId());
+ queueIdSearch.done();
+ }
@Override
@@ -132,4 +144,15 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
return lockRows(sc, null, true);
return listBy(sc, null);
}
+
+
+ @Override
+ public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) {
+ SearchCriteria<Long> sc = queueIdSearch.create();
+ sc.setParameters("contentId", contentId);
+ sc.setParameters("contentType", contentType);
+ List<Long> id = customSearch(sc, null);
+
+ return id.size() == 0 ? null : id.get(0);
+ }
}