You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/27 23:37:08 UTC
[2/2] git commit: updated refs/heads/vmsync to 0233044
Deadlock fix
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0233044b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0233044b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0233044b
Branch: refs/heads/vmsync
Commit: 0233044b20bab0e0f7822660aa9e08bf530409ee
Parents: c6ba4a2
Author: Alex Huang <al...@gmail.com>
Authored: Thu Jun 27 14:37:59 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Thu Jun 27 14:37:59 2013 -0700
----------------------------------------------------------------------
.../cloudstack/framework/jobs/AsyncJob.java | 2 +-
.../framework/jobs/AsyncJobManager.java | 2 -
.../jobs/dao/AsyncJobJoinMapDaoImpl.java | 2 +-
.../jobs/impl/AsyncJobManagerImpl.java | 157 ++++++++++++-------
4 files changed, 103 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index 2ed75a9..61fb396 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -32,7 +32,7 @@ public interface AsyncJob extends JobInfo {
public static final String JOB_STATE = "job.state";
}
- public static interface Contants {
+ public static interface Constants {
// Although we may have detailed masks for each individual wakeup event, i.e.
// periodical timer, matched topic from message bus, it seems that we don't
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index 440188a..bc06101 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -37,8 +37,6 @@ public interface AsyncJobManager extends Manager {
void completeAsyncJob(long jobId, JobInfo.Status jobStatus, int resultCode, String result);
- List<Long> wakeupByJoinedJobCompletion(long joinedJobId);
-
void updateAsyncJobStatus(long jobId, int processStatus, String resultObject);
void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index ba6cfbc..fa3b14b 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -169,7 +169,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
"(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
pstmt = txn.prepareStatement(sql);
- pstmt.setInt(1, AsyncJob.Contants.SIGNAL_MASK_WAKEUP);
+ pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.executeUpdate();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0233044b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 2351a10..9b6aa97 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -19,12 +19,17 @@ package org.apache.cloudstack.framework.jobs.impl;
import java.io.File;
import java.io.FileInputStream;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -241,7 +246,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
for(Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
- if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
+ if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
@@ -353,8 +358,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
// TODO
// this is a temporary solution to solve strange MySQL deadlock issue,
// completeJoin() causes deadlock happens at async_job table
+ // I removed the temporary solution already. I think my changes should fix the deadlock.
-/*
+/*
------------------------
LATEST DETECTED DEADLOCK
------------------------
@@ -387,25 +393,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL; 19: SQ
L NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
*** WE ROLL BACK TRANSACTION (2)
-*/
+*/
- //
- // TODO
- // ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC is a hard-coded time out value, this value
- // should actually be in sync with mysql settings
- //
- // TODO
- // how to handle failures from locking?
-
- if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) {
- try {
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
- } finally {
- _jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME);
- }
- } else {
- s_logger.error("If this happens, it means too bad");
- }
}
@Override
@@ -524,7 +514,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.debug("Executing " + job);
}
- if ((getAndResetPendingSignals(job) & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0) {
+ if ((getAndResetPendingSignals(job) & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0) {
AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
if(jobDispatcher != null) {
jobDispatcher.runJob(job);
@@ -709,19 +699,13 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
}
- if(_jobDao.lockInLockTable(AsyncJob.Contants.SYNC_LOCK_NAME, ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC)) {
- try {
- List<Long> standaloneWakeupJobs = _joinMapDao.wakeupScan();
+ List<Long> standaloneWakeupJobs = wakeupScan();
for(Long jobId : standaloneWakeupJobs) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO job = _jobDao.findById(jobId);
- if (job != null && (job.getPendingSignals() & AsyncJob.Contants.SIGNAL_MASK_WAKEUP) != 0)
+ if (job != null && (job.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(job, false);
}
- } finally {
- _jobDao.unlockFromLockTable(AsyncJob.Contants.SYNC_LOCK_NAME);
- }
- }
} catch(Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
} finally {
@@ -830,6 +814,92 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
}
+ @DB
+ protected List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
+ SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId);
+
+ List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
+ if (result.size() != 0) {
+ Collections.sort(result);
+ Long[] ids = result.toArray(new Long[result.size()]);
+
+ SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
+ SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
+
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
+ AsyncJobVO job = _jobDao.createForUpdate();
+ job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+ _jobDao.update(job, jobsSC);
+
+ SyncQueueItemVO item = _queueItemDao.createForUpdate();
+ item.setLastProcessNumber(null);
+ item.setLastProcessMsid(null);
+ _queueItemDao.update(item, queueItemsSC);
+ txn.commit();
+ }
+ return _joinMapDao.findJobsToWake(joinedJobId);
+ }
+
+ @DB
+ protected List<Long> wakeupScan() {
+ List<Long> standaloneList = new ArrayList<Long>();
+
+ Date cutDate = DateUtil.currentGMTTime();
+
+ Transaction txn = Transaction.currentTxn();
+ PreparedStatement pstmt = null;
+ try {
+ txn.start();
+
+
+ //
+ // performance sensitive processing, do it in plain SQL
+ //
+ String sql = "UPDATE async_job SET job_pending_signals=? WHERE id IN " +
+ "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+ pstmt = txn.prepareStatement(sql);
+ pstmt.setInt(1, AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setString(3, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.executeUpdate();
+ pstmt.close();
+
+ sql = "UPDATE sync_queue_item SET queue_proc_msid=NULL, queue_proc_number=NULL WHERE content_id IN " +
+ "(SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ?)";
+ pstmt = txn.prepareStatement(sql);
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.executeUpdate();
+ pstmt.close();
+
+ sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
+ pstmt = txn.prepareStatement(sql);
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ ResultSet rs = pstmt.executeQuery();
+ while (rs.next()) {
+ standaloneList.add(rs.getLong(1));
+ }
+ rs.close();
+ pstmt.close();
+
+ // update for next wake-up
+ sql = "UPDATE async_job_join_map SET next_wakeup=DATE_ADD(next_wakeup, INTERVAL wakeup_interval SECOND) WHERE next_wakeup < ? AND expiration > ?";
+ pstmt = txn.prepareStatement(sql);
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+ pstmt.executeUpdate();
+ pstmt.close();
+
+ txn.commit();
+ } catch (SQLException e) {
+ s_logger.error("Unexpected exception", e);
+ }
+
+ return standaloneList;
+ }
+
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_jobExpireSeconds = _configDepot.get(JobExpireMinutes).setMultiplier(60);
@@ -855,6 +925,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
JoinJobSearch.selectField(JoinJobSearch.entity().getJobId());
JoinJobSearch.done();
+ JoinJobTimeSearch
+
JobIdsSearch = _jobDao.createSearchBuilder();
JobIdsSearch.and(JobIdsSearch.entity().getId(), Op.IN, "ids").done();
@@ -877,34 +949,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@Override
- @DB
- public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
- SearchCriteria<Long> joinJobSC = JoinJobSearch.create("joinJobId", joinedJobId);
-
- List<Long> result = _joinMapDao.customSearch(joinJobSC, null);
- if (result.size() != 0) {
- Collections.sort(result);
- Long[] ids = result.toArray(new Long[result.size()]);
-
- SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
- SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
-
- Transaction txn = Transaction.currentTxn();
- txn.start();
- AsyncJobVO job = _jobDao.createForUpdate();
- job.setPendingSignals(1);
- _jobDao.update(job, jobsSC);
-
- SyncQueueItemVO item = _queueItemDao.createForUpdate();
- item.setLastProcessNumber(null);
- item.setLastProcessMsid(null);
- _queueItemDao.update(item, queueItemsSC);
- txn.commit();
- }
- return _joinMapDao.findJobsToWake(joinedJobId);
- }
-
- @Override
public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
}
@@ -960,6 +1004,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private SearchBuilder<AsyncJobVO> JobIdsSearch;
private SearchBuilder<SyncQueueItemVO> QueueJobIdsSearch;
private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobIdsSearch;
+ private GenericSearchBuilder<AsyncJobJoinMapVO, Long> JoinJobTimeSearch;
protected AsyncJobManagerImpl() {