You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/27 23:37:08 UTC

[2/2] git commit: updated refs/heads/vmsync to 0233044

Deadlock fix


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0233044b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0233044b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0233044b

Branch: refs/heads/vmsync
Commit: 0233044b20bab0e0f7822660aa9e08bf530409ee
Parents: c6ba4a2
Author: Alex Huang <al...@gmail.com>
Authored: Thu Jun 27 14:37:59 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Thu Jun 27 14:37:59 2013 -0700

----------------------------------------------------------------------
 .../cloudstack/framework/jobs/AsyncJob.java     |   2 +-
 .../framework/jobs/AsyncJobManager.java         |   2 -
 .../jobs/dao/AsyncJobJoinMapDaoImpl.java        |   2 +-
 .../jobs/impl/AsyncJobManagerImpl.java          | 157 ++++++++++++-------
 4 files changed, 103 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index 2ed75a9..61fb396 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -32,7 +32,7 @@ public interface AsyncJob extends JobInfo {
         public static final String JOB_STATE = "job.state";
     }
     
-    public static interface Contants {
+    public static interface Constants {
 
     	// Although we may have detailed masks for each individual wakeup event, i.e.
         // periodical timer, matched topic from message bus, it seems that we don't

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index 440188a..bc06101 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -37,8 +37,6 @@ public interface AsyncJobManager extends Manager {
 
     void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, String result);
 
-    List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
-
     void updateAsyncJobStatus(long jobId, int processStatus, String resultObject);
     void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
     void logJobJournal(long jobId, AsyncJob.JournalType journalType, String

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index ba6cfbc..fa3b14b 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -169,7 +169,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 			String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
 					"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
 			pstmt = txn.prepareStatement(sql);
-			pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP);
+			pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
 	        pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
 	        pstmt.executeUpdate();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 2351a10..9b6aa97 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -19,12 +19,17 @@ package org.apache.cloudstack.framework.jobs.impl;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.TimeZone;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -241,7 +246,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             for(Long id : wakeupList) {
             	// TODO, we assume that all jobs in this category is API job only
             	AsyncJobVO jobToWakeup = _jobDao.findById(id);
-                if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
+                if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
             	    scheduleExecution(jobToWakeup, false);
             }
              
@@ -353,8 +358,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     	// TODO
     	// this is a temporary solution to solve strange MySQL deadlock issue,
     	// completeJoin() causes deadlock happens at async_job table
+        // I removed the temporary solution already.  I think my changes should fix the deadlock.
 
-/*    	
+/*
     	------------------------
     	LATEST DETECTED DEADLOCK
     	------------------------
@@ -387,25 +393,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     	0: len 8; hex 0000000000000009; asc         ;; 1: len 6; hex 000005d8b0d7; asc       ;; 2: len 7; hex 00000009280110; asc     (  ;; 3: len 8; hex 0000000000000002; asc         ;; 4: len 8; hex 0000000000000002; asc         ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc     ;; 10: len 4; hex 80000001; asc     ;; 11: len 4; hex 80000000; asc     ;; 12: len 4; hex 80000000; asc     ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 15: len 8; hex 80001a6f7bb0d0a8; asc    o{   ;; 16: len 8; hex 8000124f06cfd5b6; asc    O    ;; 17: len 8; hex 8000124f06cfd5b6; asc    O    ;; 18: SQL NULL; 19: SQ
 L NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc     ;;
 
     	*** WE ROLL BACK TRANSACTION (2)
-*/    	
+*/
     	
-    	//
-    	// TODO
-    	// ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC is a hard-coded time out value, this value
-    	// should actually be in sync with mysql settings
-    	//
-    	// TODO
-    	// how to handle failures from locking?
-    	
-    	if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) {
-    		try {
     			_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
-    		} finally {
-    			_jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME);
-    		}
-    	} else {
-    		s_logger.error("If this happens, it means too bad");
-    	}
     }
     
     @Override
@@ -524,7 +514,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                         s_logger.debug("Executing " + job);
                     }
 
-                    if ((getAndResetPendingSignals(job) & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) {
+                    if ((getAndResetPendingSignals(job) & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) {
                     	AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
                     	if(jobDispatcher != null) {
                     		jobDispatcher.runJob(job);
@@ -709,19 +699,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                         }
                     }
               
-                	if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) {
-                		try {
-		                    List<Long> standaloneWakeupJobs = _joinMapDao.wakeupScan();
+                    List<Long> standaloneWakeupJobs = wakeupScan();
 		                    for(Long jobId : standaloneWakeupJobs) {
 		                    	// TODO, we assume that all jobs in this category is API job only
 		                    	AsyncJobVO job = _jobDao.findById(jobId);
-		                        if (job != null && (job.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
+		                        if (job != null && (job.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
 		                    	    scheduleExecution(job, false);
 		                    }
-                		} finally {
-                			_jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME);
-                		}
-                	}
                 } catch(Throwable e) {
                     s_logger.error("Unexpected exception when trying to execute queue item, ", e);
                 } finally {
@@ -830,6 +814,92 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         }
     }
 
+    @DB
+    protected List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+        SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId);
+
+        List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
+        if (result.size() != 0) {
+            Collections.sort(result);
+            Long[] ids = result.toArray(new Long[result.size()]);
+
+            SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
+            SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
+
+            Transaction txn = Transaction.currentTxn();
+            txn.start();
+            AsyncJobVO job = _jobDao.createForUpdate();
+            job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+            _jobDao.update(job, jobsSC);
+
+            SyncQueueItemVO item = _queueItemDao.createForUpdate();
+            item.setLastProcessNumber(null);
+            item.setLastProcessMsid(null);
+            _queueItemDao.update(item, queueItemsSC);
+            txn.commit();
+        }
+        return _joinMapDao.findJobsToWake(joinedJobId);
+    }
+
+    @DB
+    protected List<Long> wakeupScan() {
+        List<Long> standaloneList = new ArrayList<Long>();
+
+        Date cutDate = DateUtil.currentGMTTime();
+
+        Transaction txn = Transaction.currentTxn();
+        PreparedStatement pstmt = null;
+        try {
+            txn.start();
+
+
+            //
+            // performance sensitive processing, do it in plain SQL
+            //
+            String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
+                    "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+            pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+            pstmt.close();
+
+            sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
+                    "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+            pstmt.close();
+
+            sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            ResultSet rs = pstmt.executeQuery();
+            while (rs.next()) {
+                standaloneList.add(rs.getLong(1));
+            }
+            rs.close();
+            pstmt.close();
+
+            // update for next wake-up
+            sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?";
+            pstmt = txn.prepareStatement(sql);
+            pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+            pstmt.executeUpdate();
+            pstmt.close();
+
+            txn.commit();
+        } catch (SQLException e) {
+            s_logger.error("Unexpected exception", e);
+        }
+
+        return standaloneList;
+    }
+
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
         _jobExpireSeconds = _configDepot.get(JobExpireMinutes).setMultiplier(60);
@@ -855,6 +925,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         JoinJobSearch.selectField(JoinJobSearch.entity().getJobId());
         JoinJobSearch.done();
 
+        JoinJobTimeSearch
+
         JobIdsSearch = _jobDao.createSearchBuilder();
         JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done();
 
@@ -877,34 +949,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     @Override
-    @DB
-    public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
-        SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId);
-
-        List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
-        if (result.size() != 0) {
-            Collections.sort(result);
-            Long[] ids = result.toArray(new Long[result.size()]);
-
-            SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
-            SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
-
-            Transaction txn = Transaction.currentTxn();
-            txn.start();
-            AsyncJobVO job = _jobDao.createForUpdate();
-            job.setPendingSignals(1);
-            _jobDao.update(job, jobsSC);
-
-            SyncQueueItemVO item = _queueItemDao.createForUpdate();
-            item.setLastProcessNumber(null);
-            item.setLastProcessMsid(null);
-            _queueItemDao.update(item, queueItemsSC);
-            txn.commit();
-        }
-        return _joinMapDao.findJobsToWake(joinedJobId);
-    }
-
-    @Override
     public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
@@ -960,6 +1004,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     private SearchBuilder<AsyncJobVO> JobIdsSearch;
     private SearchBuilder<SyncQueueItemVO> QueueJobIdsSearch;
     private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobIdsSearch;
+    private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobTimeSearch;
 
     protected AsyncJobManagerImpl() {