You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by al...@apache.org on 2012/11/02 18:53:40 UTC

[2/6] git commit: CS-16611: when expunge Async job, expunge corresponding sync queue items

CS-16611: when expunge Async job, expunge corresponding sync queue items

Conflicts:

	server/src/com/cloud/async/AsyncJobManagerImpl.java
	server/src/com/cloud/async/dao/SyncQueueItemDao.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/013102c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/013102c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/013102c0

Branch: refs/heads/master
Commit: 013102c028e5763d28419ab50bc2da9a273c78f9
Parents: fe41325
Author: Alena Prokharchyk <al...@citrix.com>
Authored: Wed Oct 31 16:43:51 2012 -0700
Committer: Alena Prokharchyk <al...@citrix.com>
Committed: Fri Nov 2 10:51:29 2012 -0700

----------------------------------------------------------------------
 api/src/com/cloud/async/SyncQueueItem.java         |    2 +
 .../src/com/cloud/async/AsyncJobManagerImpl.java   |  123 ++++++++-------
 server/src/com/cloud/async/SyncQueueManager.java   |    2 +
 .../src/com/cloud/async/SyncQueueManagerImpl.java  |   27 +++-
 .../src/com/cloud/async/dao/SyncQueueItemDao.java  |    3 +-
 .../com/cloud/async/dao/SyncQueueItemDaoImpl.java  |   23 +++
 6 files changed, 116 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/api/src/com/cloud/async/SyncQueueItem.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/async/SyncQueueItem.java b/api/src/com/cloud/async/SyncQueueItem.java
index f299481..9f9c379 100644
--- a/api/src/com/cloud/async/SyncQueueItem.java
+++ b/api/src/com/cloud/async/SyncQueueItem.java
@@ -16,7 +16,9 @@
 // under the License.
 package com.cloud.async;
 
+
 public interface SyncQueueItem {
+    public final String AsyncJobContentType = "AsyncJob";
 
     String getContentType();
 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index 6cf95fe..5fb5105 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -270,7 +270,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
 		Random random = new Random();
 
         for(int i = 0; i < 5; i++) {
-            queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId(), queueSizeLimit);
+            queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
             if(queue != null) {
                 break;
             }
@@ -598,60 +598,73 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
 		return new Runnable() {
 			@Override
             public void run() {
-				GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
-				try {
-					if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
-						try {
-							reallyRun();
-						} finally {
-							scanLock.unlock();
-						}
-					}
-				} finally {
-					scanLock.releaseRef();
-				}
-			}
-			
-			private void reallyRun() {
-				try {
-					s_logger.trace("Begin cleanup expired async-jobs");
-					
-					Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
-					
-					// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
-					// hopefully this will be fast enough to balance potential growth of job table
-					List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
-					if(l != null && l.size() > 0) {
-						for(AsyncJobVO job : l) {
-							_jobDao.expunge(job.getId());
-						}
-					}
-					
-					// forcely cancel blocking queue items if they've been staying there for too long
-				    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
-				    if(blockItems != null && blockItems.size() > 0) {
-				        for(SyncQueueItemVO item : blockItems) {
-				            if(item.getContentType().equalsIgnoreCase("AsyncJob")) {
-                                completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
+                GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
+                try {
+                    if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+                        try {
+                            reallyRun();
+                        } finally {
+                            scanLock.unlock();
+                        }
+                    }
+                } finally {
+                    scanLock.releaseRef();
+                }
+            }
+            
+            public void reallyRun() {
+                try {
+                    s_logger.trace("Begin cleanup expired async-jobs");
+
+                    Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
+
+                    // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
+                    // hopefully this will be fast enough to balance potential growth of job table
+                    List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
+                    if(l != null && l.size() > 0) {
+                        for(AsyncJobVO job : l) {
+                            expungeAsyncJob(job);
+                        }
+                    }
+
+                    // forcefully cancel blocking queue items if they've been staying there for too long
+                    List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
+                    if(blockItems != null && blockItems.size() > 0) {
+                        for(SyncQueueItemVO item : blockItems) {
+                            if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+                                completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0,
+                                        getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
                             }
-				        
-				            // purge the item and resume queue processing
-				            _queueMgr.purgeItem(item.getId());
-				        }
-				    }
-					
-					s_logger.trace("End cleanup expired async-jobs");
-				} catch(Throwable e) {
-					s_logger.error("Unexpected exception when trying to execute queue item, ", e);
-				} finally {
-					StackMaid.current().exitCleanup();
-				}
-			}
-		};
-	}
-	
-	private long getMsid() {
-		if(_clusterMgr != null) {
+
+                            // purge the item and resume queue processing
+                            _queueMgr.purgeItem(item.getId());
+                        }
+                    }
+
+                    s_logger.trace("End cleanup expired async-jobs");
+                } catch(Throwable e) {
+                    s_logger.error("Unexpected exception when trying to execute queue item, ", e);
+                } finally {
+                    StackMaid.current().exitCleanup();
+                }
+            }
+
+           
+        };
+    }
+    
+    @DB
+    protected void expungeAsyncJob(AsyncJobVO job) {
+        Transaction txn = Transaction.currentTxn();
+        txn.start();
+        _jobDao.expunge(job.getId());
+        //purge corresponding sync queue item
+        _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+        txn.commit();
+    }
+
+    private long getMsid() {
+        if(_clusterMgr != null) {
             return _clusterMgr.getManagementNodeId();
         }
 		
@@ -666,7 +679,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
                 }
 
                 String contentType = item.getContentType();
-                if(contentType != null && contentType.equals("AsyncJob")) {
+                if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
                     Long jobId = item.getContentId();
                     if(jobId != null) {
                         s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/SyncQueueManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java
index b605f1b..a7032da 100644
--- a/server/src/com/cloud/async/SyncQueueManager.java
+++ b/server/src/com/cloud/async/SyncQueueManager.java
@@ -30,4 +30,6 @@ public interface SyncQueueManager extends Manager {
     
 	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
     public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+    void purgeAsyncJobQueueItemId(long asyncJobId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/SyncQueueManagerImpl.java b/server/src/com/cloud/async/SyncQueueManagerImpl.java
index c3f4955..4d15065 100644
--- a/server/src/com/cloud/async/SyncQueueManagerImpl.java
+++ b/server/src/com/cloud/async/SyncQueueManagerImpl.java
@@ -185,13 +185,16 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
 			if(itemVO != null) {
 				SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
 				
-				_syncQueueItemDao.expunge(itemVO.getId());
-				
-				queueVO.setLastUpdated(DateUtil.currentGMTTime());
-                //decrement the count
-				assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
-                queueVO.setQueueSize(queueVO.getQueueSize() - 1);
-				_syncQueueDao.update(queueVO.getId(), queueVO);
+				_syncQueueItemDao.expunge(itemVO.getId());
+				
+				//if item is active, reset queue information
+				if (itemVO.getLastProcessMsid() != null) {
+				    queueVO.setLastUpdated(DateUtil.currentGMTTime());
+	                //decrement the count
+	                assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
+	                queueVO.setQueueSize(queueVO.getQueueSize() - 1);
+	                _syncQueueDao.update(queueVO.getId(), queueVO);
+				}
 			}
     		txt.commit();
     	} catch(Exception e) {
@@ -273,5 +276,13 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
     
     private boolean queueReadyToProcess(SyncQueueVO queueVO) {
         return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+    }
+    
+    @Override
+    public void purgeAsyncJobQueueItemId(long asyncJobId) {
+        Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
+        if (itemId != null) {
+            purgeItem(itemId);
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDao.java b/server/src/com/cloud/async/dao/SyncQueueItemDao.java
index cd9df2f..6b9da8b 100644
--- a/server/src/com/cloud/async/dao/SyncQueueItemDao.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDao.java
@@ -26,4 +26,5 @@ public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
 	public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
 	public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
 	public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
-}
+	public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/013102c0/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
index ce21298..5e75756 100644
--- a/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
+++ b/server/src/com/cloud/async/dao/SyncQueueItemDaoImpl.java
@@ -33,13 +33,25 @@ import com.cloud.async.SyncQueueItemVO;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.GenericSearchBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.Transaction;
 
 @Local(value = { SyncQueueItemDao.class })
 public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
     private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
+    final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
+    
+    protected SyncQueueItemDaoImpl() {
+        super();
+        queueIdSearch = createSearchBuilder(Long.class);
+        queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);
+        queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
+        queueIdSearch.selectField(queueIdSearch.entity().getId());
+        queueIdSearch.done();
+    }
 
 
     @Override
@@ -132,4 +144,15 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
             return lockRows(sc, null, true);
         return listBy(sc, null);
     }
+
+
+    @Override
+    public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) {
+        SearchCriteria<Long> sc = queueIdSearch.create();
+        sc.setParameters("contentId", contentId);
+        sc.setParameters("contentType", contentType);
+        List<Long> id = customSearch(sc, null);
+
+        return id.size() == 0 ? null : id.get(0);
+    }
 }