You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/01/21 01:53:51 UTC
[1/3] CLOUDSTACK-5358: Bring back concurrency control in sync-queue
management
Updated Branches:
refs/heads/4.3 cd8501e26 -> 8db0d83d1
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
index 61670bf..7b6eed7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java
@@ -23,9 +23,14 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
import com.cloud.utils.db.GenericDao;
public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
- public SyncQueueItemVO getNextQueueItem(long queueId);
- public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
- public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
- public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
- public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
+ public SyncQueueItemVO getNextQueueItem(long queueId);
+ public int getActiveQueueItemCount(long queueId);
+
+ public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
+
+ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
+
+ public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
+
+ public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 2f04a7c..41f1419 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -36,6 +36,7 @@ import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Func;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
@@ -43,7 +44,8 @@ import com.cloud.utils.db.TransactionLegacy;
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
-
+ final GenericSearchBuilder<SyncQueueItemVO, Integer> queueActiveItemSearch;
+
public SyncQueueItemDaoImpl() {
super();
queueIdSearch = createSearchBuilder(Long.class);
@@ -51,37 +53,52 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
queueIdSearch.selectFields(queueIdSearch.entity().getId());
queueIdSearch.done();
+
+ queueActiveItemSearch = createSearchBuilder(Integer.class);
+ queueActiveItemSearch.and("queueId", queueActiveItemSearch.entity().getQueueId(), Op.EQ);
+ queueActiveItemSearch.and("processNumber", queueActiveItemSearch.entity().getLastProcessNumber(), Op.NNULL);
+ queueActiveItemSearch.select(null, Func.COUNT, queueActiveItemSearch.entity().getId());
+ queueActiveItemSearch.done();
}
- @Override
- public SyncQueueItemVO getNextQueueItem(long queueId) {
-
- SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+ @Override
+ public SyncQueueItemVO getNextQueueItem(long queueId) {
+
+ SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
sb.and("queueId", sb.entity().getQueueId(), SearchCriteria.Op.EQ);
- sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL);
+ sb.and("lastProcessNumber", sb.entity().getLastProcessNumber(), SearchCriteria.Op.NULL);
sb.done();
-
- SearchCriteria<SyncQueueItemVO> sc = sb.create();
- sc.setParameters("queueId", queueId);
-
- Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
+
+ SearchCriteria<SyncQueueItemVO> sc = sb.create();
+ sc.setParameters("queueId", queueId);
+
+ Filter filter = new Filter(SyncQueueItemVO.class, "created", true, 0L, 1L);
List<SyncQueueItemVO> l = listBy(sc, filter);
if(l != null && l.size() > 0)
- return l.get(0);
-
- return null;
- }
-
- @Override
- public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
- List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
-
- String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
- " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
+ return l.get(0);
+
+ return null;
+ }
+
+ @Override
+ public int getActiveQueueItemCount(long queueId) {
+ SearchCriteria<Integer> sc = queueActiveItemSearch.create();
+ sc.setParameters("queueId", queueId);
+
+ List<Integer> count = customSearch(sc, null);
+ return count.get(0);
+ }
+
+ @Override
+ public List<SyncQueueItemVO> getNextQueueItems(int maxItems) {
+ List<SyncQueueItemVO> l = new ArrayList<SyncQueueItemVO>();
+
+ String sql = "SELECT i.id, i.queue_id, i.content_type, i.content_id, i.created " +
+ " FROM sync_queue AS q JOIN sync_queue_item AS i ON q.id = i.queue_id " +
" WHERE i.queue_proc_number IS NULL " +
- " GROUP BY q.id " +
- " ORDER BY i.id " +
- " LIMIT 0, ?";
+ " GROUP BY q.id " +
+ " ORDER BY i.id " +
+ " LIMIT 0, ?";
TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
@@ -90,54 +107,54 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
pstmt.setInt(1, maxItems);
ResultSet rs = pstmt.executeQuery();
while(rs.next()) {
- SyncQueueItemVO item = new SyncQueueItemVO();
- item.setId(rs.getLong(1));
- item.setQueueId(rs.getLong(2));
- item.setContentType(rs.getString(3));
- item.setContentId(rs.getLong(4));
- item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
- l.add(item);
+ SyncQueueItemVO item = new SyncQueueItemVO();
+ item.setId(rs.getLong(1));
+ item.setQueueId(rs.getLong(2));
+ item.setContentType(rs.getString(3));
+ item.setContentId(rs.getLong(4));
+ item.setCreated(DateUtil.parseDateString(TimeZone.getTimeZone("GMT"), rs.getString(5)));
+ l.add(item);
}
} catch (SQLException e) {
- s_logger.error("Unexpected sql excetpion, ", e);
+ s_logger.error("Unexpected sql excetpion, ", e);
} catch (Throwable e) {
- s_logger.error("Unexpected excetpion, ", e);
+ s_logger.error("Unexpected excetpion, ", e);
}
- return l;
- }
-
- @Override
- public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
- SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
+ return l;
+ }
+
+ @Override
+ public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive) {
+ SearchBuilder<SyncQueueItemVO> sb = createSearchBuilder();
sb.and("lastProcessMsid", sb.entity().getLastProcessMsid(),
- SearchCriteria.Op.EQ);
+ SearchCriteria.Op.EQ);
sb.done();
-
- SearchCriteria<SyncQueueItemVO> sc = sb.create();
- sc.setParameters("lastProcessMsid", msid);
-
- Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
-
- if(exclusive)
- return lockRows(sc, filter, true);
+
+ SearchCriteria<SyncQueueItemVO> sc = sb.create();
+ sc.setParameters("lastProcessMsid", msid);
+
+ Filter filter = new Filter(SyncQueueItemVO.class, "created", true, null, null);
+
+ if (exclusive)
+ return lockRows(sc, filter, true);
return listBy(sc, filter);
- }
+ }
@Override
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive) {
Date cutTime = DateUtil.currentGMTTime();
-
+
SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
-
+
sbItem.done();
-
+
SearchCriteria<SyncQueueItemVO> sc = sbItem.create();
sc.setParameters("lastProcessTime2", new Date(cutTime.getTime() - thresholdMs));
-
+
if(exclusive)
return lockRows(sc, null, true);
return listBy(sc, null);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index a77f864..63c365b 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -24,7 +24,6 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -88,12 +87,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
- private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
- private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
+ private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
+ private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
private static final int HEARTBEAT_INTERVAL = 2000;
- private static final int GC_INTERVAL = 10000; // 10 seconds
+ private static final int GC_INTERVAL = 10000; // 10 seconds
@Inject
private SyncQueueItemDao _queueItemDao;
@@ -362,38 +361,38 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
// I removed the temporary solution already. I think my changes should fix the deadlock.
/*
- ------------------------
- LATEST DETECTED DEADLOCK
- ------------------------
- 130625 20:03:10
- *** (1) TRANSACTION:
- TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494
- mysql tables in use 2, locked 1
- LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
- MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing
- UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9)
- *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
- RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting
- Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
- 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL
; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
-
- *** (2) TRANSACTION:
- TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492
- mysql tables in use 2, locked 1
- 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
- MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing
- UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8)
- *** (2) HOLDS THE LOCK(S):
- RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap
- Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
- 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL
; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
-
- *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
- RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting
- Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
- 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL NULL
; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
-
- *** WE ROLL BACK TRANSACTION (2)
+ ------------------------
+ LATEST DETECTED DEADLOCK
+ ------------------------
+ 130625 20:03:10
+ *** (1) TRANSACTION:
+ TRANSACTION 0 98087127, ACTIVE 0 sec, process no 1489, OS thread id 139837829175040 fetching rows, thread declared inside InnoDB 494
+ mysql tables in use 2, locked 1
+ LOCK WAIT 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
+ MySQL thread id 28408, query id 368571321 localhost 127.0.0.1 cloud preparing
+ UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 9)
+ *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
+ RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087127 lock_mode X locks rec but not gap waiting
+ Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
+ 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL N
ULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
+
+ *** (2) TRANSACTION:
+ TRANSACTION 0 98087128, ACTIVE 0 sec, process no 1489, OS thread id 139837671909120 fetching rows, thread declared inside InnoDB 492
+ mysql tables in use 2, locked 1
+ 3 lock struct(s), heap size 368, 2 row lock(s), undo log entries 1
+ MySQL thread id 28406, query id 368571323 localhost 127.0.0.1 cloud preparing
+ UPDATE async_job SET job_pending_signals=1 WHERE id IN (SELECT job_id FROM async_job_join_map WHERE join_job_id = 8)
+ *** (2) HOLDS THE LOCK(S):
+ RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap
+ Record lock, heap no 9 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
+ 0: len 8; hex 0000000000000008; asc ;; 1: len 6; hex 000005d8b0d8; asc ;; 2: len 7; hex 00000009270110; asc ' ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2232222c22706879736963616c6e6574776f726b6964223a; asc {"id":"2","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL N
ULL; 19: SQL NULL; 20: len 30; hex 66376466396532362d323139622d346338652d393231332d393766653636; asc f7df9e26-219b-4c8e-9213-97fe66;...(truncated); 21: len 30; hex 36623238306364362d663436652d343563322d383833642d333863616439; asc 6b280cd6-f46e-45c2-883d-38cad9;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
+
+ *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
+ RECORD LOCKS space id 0 page no 1275 n bits 80 index `PRIMARY` of table `cloud`.`async_job` trx id 0 98087128 lock_mode X locks rec but not gap waiting
+ Record lock, heap no 10 PHYSICAL RECORD: n_fields 26; compact format; info bits 0
+ 0: len 8; hex 0000000000000009; asc ;; 1: len 6; hex 000005d8b0d7; asc ;; 2: len 7; hex 00000009280110; asc ( ;; 3: len 8; hex 0000000000000002; asc ;; 4: len 8; hex 0000000000000002; asc ;; 5: SQL NULL; 6: SQL NULL; 7: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e636f6d6d; asc org.apache.cloudstack.api.comm;...(truncated); 8: len 30; hex 7b226964223a2233222c22706879736963616c6e6574776f726b6964223a; asc {"id":"3","physicalnetworkid":;...(truncated); 9: len 4; hex 80000000; asc ;; 10: len 4; hex 80000001; asc ;; 11: len 4; hex 80000000; asc ;; 12: len 4; hex 80000000; asc ;; 13: len 30; hex 6f72672e6170616368652e636c6f7564737461636b2e6170692e72657370; asc org.apache.cloudstack.api.resp;...(truncated); 14: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 15: len 8; hex 80001a6f7bb0d0a8; asc o{ ;; 16: len 8; hex 8000124f06cfd5b6; asc O ;; 17: len 8; hex 8000124f06cfd5b6; asc O ;; 18: SQL N
ULL; 19: SQL NULL; 20: len 30; hex 62313065306432342d336233352d343663622d386361622d623933623562; asc b10e0d24-3b35-46cb-8cab-b93b5b;...(truncated); 21: len 30; hex 39353664383563632d383336622d346663612d623738622d646238343739; asc 956d85cc-836b-4fca-b78b-db8479;...(truncated); 22: SQL NULL; 23: len 21; hex 4170694173796e634a6f6244697370617463686572; asc ApiAsyncJobDispatcher;; 24: SQL NULL; 25: len 4; hex 80000000; asc ;;
+
+ *** WE ROLL BACK TRANSACTION (2)
*/
_joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
@@ -406,23 +405,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
SyncQueueVO queue = null;
-
- // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
- // we retry five times until we throw an exception
- Random random = new Random();
-
- for (int i = 0; i < 5; i++) {
- queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
- if (queue != null) {
- break;
- }
-
- try {
- Thread.sleep(1000 + random.nextInt(5000));
- } catch (InterruptedException e) {
- }
- }
-
+ queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
if (queue == null)
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
index 7fb0245..9d3bf80 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java
@@ -23,6 +23,7 @@ import java.util.List;
import javax.inject.Inject;
import org.apache.log4j.Logger;
+
import org.apache.cloudstack.framework.jobs.dao.SyncQueueDao;
import org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDao;
@@ -146,18 +147,18 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
processNumber = new Long(1);
else
processNumber = processNumber + 1;
-
+
Date dt = DateUtil.currentGMTTime();
queueVO.setLastProcessNumber(processNumber);
queueVO.setLastUpdated(dt);
queueVO.setQueueSize(queueVO.getQueueSize() + 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
-
+
itemVO.setLastProcessMsid(msid);
itemVO.setLastProcessNumber(processNumber);
itemVO.setLastProcessTime(dt);
_syncQueueItemDao.update(item.getId(), itemVO);
-
+
resultList.add(item);
}
}
@@ -183,9 +184,9 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
-
+
_syncQueueItemDao.expunge(itemVO.getId());
-
+
// if item is active, reset queue information
if (itemVO.getLastProcessMsid() != null) {
queueVO.setLastUpdated(DateUtil.currentGMTTime());
@@ -239,18 +240,15 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage
}
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
- return true;
-
- //
- // TODO
- //
- // Need to disable concurrency disable at queue level due to the need to support
- // job wake-up dispatching task
- //
- // Concurrency control is better done at higher level and leave the job scheduling/serializing simpler
- //
-
- // return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
+ int nActiveItems = _syncQueueItemDao.getActiveQueueItemCount(queueVO.getId());
+ if (nActiveItems < queueVO.getQueueSizeLimit())
+ return true;
+
+ if (s_logger.isDebugEnabled())
+ s_logger.debug("Queue (queue id, sync type, sync id) - (" + queueVO.getId()
+ + "," + queueVO.getSyncObjType() + ", " + queueVO.getSyncObjId()
+ + ") is reaching concurrency limit " + queueVO.getQueueSizeLimit());
+ return false;
}
@Override
[3/3] git commit: updated refs/heads/4.3 to 8db0d83
Posted by ke...@apache.org.
CLOUDSTACK-5358: Bring back concurrency control in sync-queue management
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/8db0d83d
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/8db0d83d
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/8db0d83d
Branch: refs/heads/4.3
Commit: 8db0d83d1aff178845076a7ff9429d69ada81364
Parents: cd8501e
Author: Kelven Yang <ke...@gmail.com>
Authored: Mon Jan 20 16:53:17 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Mon Jan 20 16:53:33 2014 -0800
----------------------------------------------------------------------
.../com/cloud/vm/VirtualMachineManagerImpl.java | 1302 +++++++++---------
.../framework/jobs/dao/SyncQueueItemDao.java | 15 +-
.../jobs/dao/SyncQueueItemDaoImpl.java | 125 +-
.../jobs/impl/AsyncJobManagerImpl.java | 89 +-
.../jobs/impl/SyncQueueManagerImpl.java | 32 +-
5 files changed, 785 insertions(+), 778 deletions(-)
----------------------------------------------------------------------
[2/3] CLOUDSTACK-5358: Bring back concurrency control in sync-queue
management
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8db0d83d/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 981b447..df51a3c 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -347,7 +347,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
Long.class, "vm.job.timeout", "600000",
"Time in milliseconds to wait before attempting to cancel a job", false);
static final ConfigKey<Integer> VmJobStateReportInterval = new ConfigKey<Integer>("Advanced",
- Integer.class, "vm.job.report.interval", "60",
+ Integer.class, "vm.job.report.interval", "60",
"Interval to send application level pings to make sure the connection is still working", false);
ScheduledExecutorService _executor = null;
@@ -740,18 +740,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlanner planner)
- throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
+ throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
advanceStart(vmUuid, params, null, planner);
}
@Override
public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy, DeploymentPlanner planner) throws InsufficientCapacityException,
- ConcurrentOperationException, ResourceUnavailableException {
+ ConcurrentOperationException, ResourceUnavailableException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
@@ -763,16 +763,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
+ } else {
+ Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
- try {
- VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
@@ -780,8 +780,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobResult;
- }
- }
+ }
+ }
}
@@ -789,7 +789,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
public void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy, DeploymentPlanner planner)
throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
- CallContext cctxt = CallContext.current();
+ CallContext cctxt = CallContext.current();
Account account = cctxt.getCallingAccount();
User caller = cctxt.getCallingUser();
@@ -1287,11 +1287,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop)
- throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+ throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
@@ -1305,16 +1305,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
+ } else {
+ Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
- try {
- VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
@@ -1324,8 +1324,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof OperationTimedoutException)
throw (OperationTimedoutException)jobResult;
- }
- }
+ }
+ }
}
private void orchestrateStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
@@ -1593,9 +1593,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void storageMigration(String vmUuid, StoragePool destPool) {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
@@ -1607,23 +1607,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
- try {
- VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
if (jobResult instanceof RuntimeException)
throw (RuntimeException)jobResult;
- }
- }
+ }
+ }
}
private void orchestrateStorageMigration(String vmUuid, StoragePool destPool) {
@@ -1683,11 +1683,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void migrate(String vmUuid, long srcHostId, DeployDestination dest)
- throws ResourceUnavailableException, ConcurrentOperationException {
+ throws ResourceUnavailableException, ConcurrentOperationException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
@@ -1699,27 +1699,27 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
- try {
- VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
- if(jobResult != null) {
- if(jobResult instanceof ResourceUnavailableException)
- throw (ResourceUnavailableException)jobResult;
- else if(jobResult instanceof ConcurrentOperationException)
- throw (ConcurrentOperationException)jobResult;
- else if(jobResult instanceof RuntimeException)
- throw (RuntimeException)jobResult;
- }
- }
+ if (jobResult != null) {
+ if (jobResult instanceof ResourceUnavailableException)
+ throw (ResourceUnavailableException)jobResult;
+ else if (jobResult instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobResult;
+ else if (jobResult instanceof RuntimeException)
+ throw (RuntimeException)jobResult;
+ }
+ }
}
private void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException {
@@ -1964,11 +1964,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool)
- throws ResourceUnavailableException, ConcurrentOperationException {
+ throws ResourceUnavailableException, ConcurrentOperationException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
@@ -1982,29 +1982,29 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_workJobDao.expunge(placeHolder.getId());
}
- } else {
+ } else {
Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
- try {
+ try {
VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
+ } catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
- }
+ }
Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob());
- if(jobException != null) {
- if(jobException instanceof ResourceUnavailableException)
+ if (jobException != null) {
+ if (jobException instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)jobException;
- else if(jobException instanceof ConcurrentOperationException)
- throw (ConcurrentOperationException)jobException;
+ else if (jobException instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobException;
}
- }
+ }
}
private void orchestrateMigrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException,
- ConcurrentOperationException {
+ ConcurrentOperationException {
VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
@@ -2259,11 +2259,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params)
- throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
+ throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
@@ -2275,16 +2275,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
+ } else {
+ Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
- try {
- VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
@@ -2294,8 +2294,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw (ConcurrentOperationException)jobResult;
else if (jobResult instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)jobResult;
- }
- }
+ }
+ }
}
private void orchestrateReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
@@ -2948,9 +2948,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
if(VmJobEnabled.value()) {
- if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) {
- _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport());
- }
+ if (ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) {
+ _syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport());
+ }
}
// take the chance to scan VMs that are stuck in transitional states
@@ -2979,10 +2979,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
if(s_logger.isDebugEnabled())
- s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
+ s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
if(VmJobEnabled.value()) {
- _syncMgr.resetHostSyncState(agent.getId());
+ _syncMgr.resetHostSyncState(agent.getId());
}
if (forRebalance) {
@@ -3193,11 +3193,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested)
- throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException {
+ throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vm.getId());
@@ -3208,16 +3208,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
+ } else {
Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
- try {
+ try {
outcome.get();
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
+ } catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
- }
+ }
Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobException != null) {
@@ -3234,7 +3234,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
throw new RuntimeException("Unexpected job execution result");
- }
+ }
}
private NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException,
@@ -3305,11 +3305,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public boolean removeNicFromVm(VirtualMachine vm, Nic nic)
- throws ConcurrentOperationException, ResourceUnavailableException {
+ throws ConcurrentOperationException, ResourceUnavailableException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vm.getId());
@@ -3321,16 +3321,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_workJobDao.expunge(placeHolder.getId());
}
- } else {
+ } else {
Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
- try {
+ try {
outcome.get();
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
+ } catch (java.util.concurrent.ExecutionException e) {
throw new RuntimeException("Execution excetion", e);
- }
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
@@ -3344,8 +3344,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
return (Boolean)jobResult;
}
- throw new RuntimeException("Job failed with un-handled exception");
- }
+ throw new RuntimeException("Job failed with un-handled exception");
+ }
}
private boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException {
@@ -3408,8 +3408,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
@DB
public boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException {
- // TODO will serialize on the VM object later to resolve operation conflicts
- return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri);
+ // TODO will serialize on the VM object later to resolve operation conflicts
+ return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri);
}
@DB
@@ -3552,10 +3552,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
- throws ResourceUnavailableException, ConcurrentOperationException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ throws ResourceUnavailableException, ConcurrentOperationException {
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
@@ -3567,16 +3567,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
+ } else {
+ Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
- try {
- VirtualMachine vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ try {
+ VirtualMachine vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
@@ -3584,14 +3584,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
throw (ResourceUnavailableException)jobResult;
else if (jobResult instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)jobResult;
- }
- }
+ }
+ }
}
private void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId)
- throws ResourceUnavailableException, ConcurrentOperationException {
+ throws ResourceUnavailableException, ConcurrentOperationException {
- VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+ VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
s_logger.info("Migrating " + vm + " to " + dest);
vm.getServiceOfferingId();
@@ -3775,7 +3775,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException,
- ResourceUnavailableException {
+ ResourceUnavailableException {
boolean result = true;
VMInstanceVO router = _vmDao.findById(vm.getId());
@@ -3809,12 +3809,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
- boolean reconfiguringOnExistingHost)
+ boolean reconfiguringOnExistingHost)
throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException {
- AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
- // avoid re-entrance
+ // avoid re-entrance
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
@@ -3826,17 +3826,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
- } else {
- Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+ } else {
+ Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
- VirtualMachine vm = null;
- try {
- vm = outcome.get();
- } catch (InterruptedException e) {
- throw new RuntimeException("Operation is interrupted", e);
- } catch (java.util.concurrent.ExecutionException e) {
- throw new RuntimeException("Execution excetion", e);
- }
+ VirtualMachine vm = null;
+ try {
+ vm = outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
if (jobResult != null) {
@@ -3853,7 +3853,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
return (VMInstanceVO)vm;
- }
+ }
}
private VMInstanceVO orchestrateReConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException,
@@ -3937,278 +3937,282 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@MessageHandler(topic = Topics.VM_POWER_STATE)
private void HandlePowerStateReport(String subject, String senderAddress, Object args) {
- assert(args != null);
- Long vmId = (Long)args;
+ assert (args != null);
+ Long vmId = (Long)args;
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vmId);
- if(pendingWorkJobs.size() == 0) {
- // there is no pending operation job
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vmId);
+ if (pendingWorkJobs.size() == 0) {
+ // there is no pending operation job
VMInstanceVO vm = _vmDao.findById(vmId);
- if(vm != null) {
- switch(vm.getPowerState()) {
- case PowerOn :
- handlePowerOnReportWithNoPendingJobsOnVM(vm);
- break;
+ if (vm != null) {
+ switch (vm.getPowerState()) {
+ case PowerOn:
+ handlePowerOnReportWithNoPendingJobsOnVM(vm);
+ break;
- case PowerOff :
- handlePowerOffReportWithNoPendingJobsOnVM(vm);
- break;
+ case PowerOff:
+ handlePowerOffReportWithNoPendingJobsOnVM(vm);
+ break;
- // PowerUnknown shouldn't be reported, it is a derived
+ // PowerUnknown shouldn't be reported, it is a derived
// VM power state from host state (host un-reachable)
- case PowerUnknown :
- default :
- assert(false);
- break;
- }
- } else {
- s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
- }
- } else {
+ case PowerUnknown:
+ default:
+ assert (false);
+ break;
+ }
+ } else {
+ s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
+ }
+ } else {
// reset VM power state tracking so that we won't lost signal when VM has
// been translated to
_vmDao.resetVmPowerStateTracking(vmId);
- }
+ }
}
private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
- //
- // 1) handle left-over transitional VM states
- // 2) handle out of band VM live migration
- // 3) handle out of sync stationary states, marking VM from Stopped to Running with
- // alert messages
- //
- switch(vm.getState()) {
- case Starting :
- try {
- stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
- } catch(NoTransitionException e) {
- s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
- }
-
- // we need to alert admin or user about this risky state transition
- _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
- VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
- break;
-
- case Running :
- try {
- if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue())
- s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId());
- stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
- } catch(NoTransitionException e) {
- s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
- }
- break;
-
- case Stopping :
- case Stopped :
- try {
- stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
- } catch(NoTransitionException e) {
- s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
- }
- _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
- VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset");
- break;
-
- case Destroyed :
- case Expunging :
- s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: "
- + vm.getId() + ", state: " + vm.getState());
- break;
-
- case Migrating :
- try {
- stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
- } catch(NoTransitionException e) {
- s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
- }
- break;
-
- case Error :
- default :
- s_logger.info("Receive power on report when VM is in error or unexpected state. vm: "
- + vm.getId() + ", state: " + vm.getState());
- break;
- }
+ //
+ // 1) handle left-over transitional VM states
+ // 2) handle out of band VM live migration
+ // 3) handle out of sync stationary states, marking VM from Stopped to Running with
+ // alert messages
+ //
+ switch (vm.getState()) {
+ case Starting:
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch (NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+
+ // we need to alert admin or user about this risky state transition
+ _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
+ + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
+ break;
+
+ case Running:
+ try {
+ if (vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue())
+ s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId());
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch (NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ break;
+
+ case Stopping:
+ case Stopped:
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch (NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState()
+ + " -> Running) from out-of-context transition. VM network environment may need to be reset");
+ break;
+
+ case Destroyed:
+ case Expunging:
+ s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: "
+ + vm.getId() + ", state: " + vm.getState());
+ break;
+
+ case Migrating:
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+ } catch (NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ break;
+
+ case Error:
+ default:
+ s_logger.info("Receive power on report when VM is in error or unexpected state. vm: "
+ + vm.getId() + ", state: " + vm.getState());
+ break;
+ }
}
private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
- // 1) handle left-over transitional VM states
- // 2) handle out of sync stationary states, schedule force-stop to release resources
- //
- switch(vm.getState()) {
- case Starting :
- case Stopping :
+ // 1) handle left-over transitional VM states
+ // 2) handle out of sync stationary states, schedule force-stop to release resources
+ //
+ switch (vm.getState()) {
+ case Starting:
+ case Stopping:
case Running:
- case Stopped :
- case Migrating :
- try {
- stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
- } catch(NoTransitionException e) {
- s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
- }
- _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
- VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition.");
- // TODO: we need to forcely release all resource allocation
- break;
-
- case Destroyed :
- case Expunging :
- break;
-
- case Error :
- default :
- break;
- }
+ case Stopped:
+ case Migrating:
+ try {
+ stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
+ } catch (NoTransitionException e) {
+ s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+ }
+ _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState()
+ + " -> Stopped) from out-of-context transition.");
+ // TODO: we need to forcely release all resource allocation
+ break;
+
+ case Destroyed:
+ case Expunging:
+ break;
+
+ case Error:
+ default:
+ break;
+ }
}
private void scanStalledVMInTransitionStateOnUpHost(long hostId) {
- //
- // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check
- // VMs in expunging state (this need to be handled specially)
- //
- // checking condition
- // 1) no pending VmWork job
- // 2) on hostId host and host is UP
- //
- // When host is UP, soon or later we will get a report from the host about the VM,
- // however, if VM is missing from the host report (it may happen in out of band changes
- // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
- //
- // Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
- // and a VM stalls for status update, we will consider them to be powered off
- // (which is relatively safe to do so)
-
- long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1);
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs);
- List<Long> mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
- for(Long vmId : mostlikelyStoppedVMs) {
- VMInstanceVO vm = _vmDao.findById(vmId);
- assert(vm != null);
- handlePowerOffReportWithNoPendingJobsOnVM(vm);
- }
-
- List<Long> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime);
- for(Long vmId : vmsWithRecentReport) {
- VMInstanceVO vm = _vmDao.findById(vmId);
- assert(vm != null);
- if(vm.getPowerState() == PowerState.PowerOn)
- handlePowerOnReportWithNoPendingJobsOnVM(vm);
- else
- handlePowerOffReportWithNoPendingJobsOnVM(vm);
- }
+ //
+ // Check VM that is stuck in Starting, Stopping, Migrating states, we won't check
+ // VMs in expunging state (this need to be handled specially)
+ //
+ // checking condition
+ // 1) no pending VmWork job
+ // 2) on hostId host and host is UP
+ //
+ // When host is UP, soon or later we will get a report from the host about the VM,
+ // however, if VM is missing from the host report (it may happen in out of band changes
+ // or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
+ //
+ // Therefore, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
+ // and a VM stalls for status update, we will consider them to be powered off
+ // (which is relatively safe to do so)
+
+ long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1);
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs);
+ List<Long> mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
+ for (Long vmId : mostlikelyStoppedVMs) {
+ VMInstanceVO vm = _vmDao.findById(vmId);
+ assert (vm != null);
+ handlePowerOffReportWithNoPendingJobsOnVM(vm);
+ }
+
+ List<Long> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime);
+ for (Long vmId : vmsWithRecentReport) {
+ VMInstanceVO vm = _vmDao.findById(vmId);
+ assert (vm != null);
+ if (vm.getPowerState() == PowerState.PowerOn)
+ handlePowerOnReportWithNoPendingJobsOnVM(vm);
+ else
+ handlePowerOffReportWithNoPendingJobsOnVM(vm);
+ }
}
private void scanStalledVMInTransitionStateOnDisconnectedHosts() {
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000);
- List<Long> stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime);
- for(Long vmId : stuckAndUncontrollableVMs) {
- VMInstanceVO vm = _vmDao.findById(vmId);
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value() * 1000);
+ List<Long> stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime);
+ for (Long vmId : stuckAndUncontrollableVMs) {
+ VMInstanceVO vm = _vmDao.findById(vmId);
- // We now only alert administrator about this situation
- _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
- VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long");
- }
+ // We now only alert administrator about this situation
+ _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+ VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState()
+ + " state and its host is unreachable for too long");
+ }
}
// VMs that in transitional state without recent power state report
private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
- String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
+ String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
- "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
- "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
-
- List<Long> l = new ArrayList<Long>();
- TransactionLegacy txn = null;
- try {
- txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
-
- PreparedStatement pstmt = null;
- try {
- pstmt = txn.prepareAutoCloseStatement(sql);
-
- pstmt.setLong(1, hostId);
- pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
- pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
- ResultSet rs = pstmt.executeQuery();
- while(rs.next()) {
- l.add(rs.getLong(1));
- }
- } catch (SQLException e) {
- } catch (Throwable e) {
- }
-
- } finally {
- if(txn != null)
- txn.close();
- }
+ "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+ "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+
+ List<Long> l = new ArrayList<Long>();
+ TransactionLegacy txn = null;
+ try {
+ txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(sql);
+
+ pstmt.setLong(1, hostId);
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+ pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+ ResultSet rs = pstmt.executeQuery();
+ while (rs.next()) {
+ l.add(rs.getLong(1));
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+
+ } finally {
+ if (txn != null)
+ txn.close();
+ }
return l;
}
// VMs that in transitional state and recently have power state update
private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) {
- String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
+ String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
"AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " +
- "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
- "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
-
- List<Long> l = new ArrayList<Long>();
- TransactionLegacy txn = null;
- try {
- txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
- PreparedStatement pstmt = null;
- try {
- pstmt = txn.prepareAutoCloseStatement(sql);
-
- pstmt.setLong(1, hostId);
- pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
- pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
- ResultSet rs = pstmt.executeQuery();
- while(rs.next()) {
- l.add(rs.getLong(1));
- }
- } catch (SQLException e) {
- } catch (Throwable e) {
- }
- return l;
- } finally {
- if(txn != null)
- txn.close();
- }
+ "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+ "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+
+ List<Long> l = new ArrayList<Long>();
+ TransactionLegacy txn = null;
+ try {
+ txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(sql);
+
+ pstmt.setLong(1, hostId);
+ pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+ pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+ ResultSet rs = pstmt.executeQuery();
+ while (rs.next()) {
+ l.add(rs.getLong(1));
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ return l;
+ } finally {
+ if (txn != null)
+ txn.close();
+ }
}
private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
- String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
+ String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
"AND i.power_state_update_time < ? AND i.host_id = h.id " +
- "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
- "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
-
- List<Long> l = new ArrayList<Long>();
- TransactionLegacy txn = null;
- try {
- txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
- PreparedStatement pstmt = null;
- try {
- pstmt = txn.prepareAutoCloseStatement(sql);
-
- pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
- pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
- ResultSet rs = pstmt.executeQuery();
- while(rs.next()) {
- l.add(rs.getLong(1));
- }
- } catch (SQLException e) {
- } catch (Throwable e) {
- }
- return l;
- } finally {
- if(txn != null)
- txn.close();
- }
+ "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+ "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+
+ List<Long> l = new ArrayList<Long>();
+ TransactionLegacy txn = null;
+ try {
+ txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(sql);
+
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+ pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
+ ResultSet rs = pstmt.executeQuery();
+ while (rs.next()) {
+ l.add(rs.getLong(1));
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ return l;
+ } finally {
+ if (txn != null)
+ txn.close();
+ }
}
//
@@ -4244,9 +4248,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
@Override
public boolean checkCondition() {
- AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
- assert(jobVo != null);
- if(jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
+ assert (jobVo != null);
+ if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
return true;
return false;
@@ -4266,58 +4270,58 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
// no time for this at current iteration
//
public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid,
- final Map<VirtualMachineProfile.Param, Object> params,
- final DeploymentPlan planToDeploy) {
+ final Map<VirtualMachineProfile.Param, Object> params,
+ final DeploymentPlan planToDeploy) {
- final CallContext context = CallContext.current();
+ final CallContext context = CallContext.current();
final User callingUser = context.getCallingUser();
final Account callingAccount = context.getCallingAccount();
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- VmWorkJobVO workJob = null;
+ VmWorkJobVO workJob = null;
- _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
- vm.getId(), VmWorkStart.class.getName());
+ _vmDao.lockRow(vm.getId(), true);
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance,
+ vm.getId(), VmWorkStart.class.getName());
- if (pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ if (pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkStart.class.getName());
+ workJob.setCmd(VmWorkStart.class.getName());
- workJob.setAccountId(callingAccount.getId());
- workJob.setUserId(callingUser.getId());
- workJob.setStep(VmWorkJobVO.Step.Starting);
+ workJob.setAccountId(callingAccount.getId());
+ workJob.setUserId(callingUser.getId());
+ workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
+ // save work context info (there are some duplications)
VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER);
- workInfo.setPlan(planToDeploy);
- workInfo.setParams(params);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workInfo.setPlan(planToDeploy);
+ workInfo.setParams(params);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)result[0],
- VirtualMachine.PowerState.PowerOn, vm.getId(), null);
+ VirtualMachine.PowerState.PowerOn, vm.getId(), null);
}
public Outcome<VirtualMachine> stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) {
@@ -4328,51 +4332,51 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- vm.getType(), vm.getId(),
- VmWorkStop.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ vm.getType(), vm.getId(),
+ VmWorkStop.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkStop.class.getName());
+ workJob.setCmd(VmWorkStop.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
- workJob.setStep(VmWorkJobVO.Step.Prepare);
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setStep(VmWorkJobVO.Step.Prepare);
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
+ // save work context info (there are some duplications)
VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, cleanup);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)result[0],
- VirtualMachine.PowerState.PowerOff, vm.getId(), null);
+ VirtualMachine.PowerState.PowerOff, vm.getId(), null);
}
public Outcome<VirtualMachine> rebootVmThroughJobQueue(final String vmUuid,
- final Map<VirtualMachineProfile.Param, Object> params) {
+ final Map<VirtualMachineProfile.Param, Object> params) {
final CallContext context = CallContext.current();
final Account account = context.getCallingAccount();
@@ -4381,47 +4385,47 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkReboot.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkReboot.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkReboot.class.getName());
+ workJob.setCmd(VmWorkReboot.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
- workJob.setStep(VmWorkJobVO.Step.Prepare);
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
+ workJob.setStep(VmWorkJobVO.Step.Prepare);
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
+ // save work context info (there are some duplications)
VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, params);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0],
- vm.getId());
+ vm.getId());
}
public Outcome<VirtualMachine> migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) {
@@ -4432,52 +4436,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkMigrate.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkMigrate.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkMigrate.class.getName());
+ workJob.setCmd(VmWorkMigrate.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
+ // save work context info (there are some duplications)
VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)result[0],
- VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId());
+ VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId());
}
public Outcome<VirtualMachine> migrateVmWithStorageThroughJobQueue(
- final String vmUuid, final long srcHostId, final long destHostId,
- final Map<Volume, StoragePool> volumeToPool) {
+ final String vmUuid, final long srcHostId, final long destHostId,
+ final Map<Volume, StoragePool> volumeToPool) {
final CallContext context = CallContext.current();
final User user = context.getCallingUser();
@@ -4486,52 +4490,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkMigrateWithStorage.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkMigrateWithStorage.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkMigrate.class.getName());
+ workJob.setCmd(VmWorkMigrate.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
- VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
+ // save work context info (there are some duplications)
+ VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(),
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, destHostId, volumeToPool);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmStateSyncOutcome((VmWorkJobVO)result[0],
- VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId);
+ VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId);
}
public Outcome<VirtualMachine> migrateVmForScaleThroughJobQueue(
- final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) {
+ final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) {
final CallContext context = CallContext.current();
final User user = context.getCallingUser();
@@ -4540,52 +4544,52 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkMigrateForScale.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkMigrateForScale.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkMigrate.class.getName());
+ workJob.setCmd(VmWorkMigrate.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
- VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
+ // save work context info (there are some duplications)
+ VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(),
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, srcHostId, dest, newSvcOfferingId);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId());
}
public Outcome<VirtualMachine> migrateVmStorageThroughJobQueue(
- final String vmUuid, final StoragePool destPool) {
+ final String vmUuid, final StoragePool destPool) {
final CallContext context = CallContext.current();
final User user = context.getCallingUser();
@@ -4594,205 +4598,205 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkStorageMigration.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkStorageMigration.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkStorageMigration.class.getName());
+ workJob.setCmd(VmWorkStorageMigration.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
- VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
+ // save work context info (there are some duplications)
+ VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId());
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId());
}
public Outcome<VirtualMachine> addVmToNetworkThroughJobQueue(
- final VirtualMachine vm, final Network network, final NicProfile requested) {
+ final VirtualMachine vm, final Network network, final NicProfile requested) {
final CallContext context = CallContext.current();
final User user = context.getCallingUser();
final Account account = context.getCallingAccount();
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkAddVmToNetwork.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkAddVmToNetwork.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
+ workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
- VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
+ // save work context info (there are some duplications)
+ VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(),
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested);
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
+ }
return new Object[] {workJob, new Long(workJob.getId())};
- }
- });
+ }
+ });
final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId());
}
public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
- final VirtualMachine vm, final Nic nic) {
+ final VirtualMachine vm, final Nic nic) {
final CallContext context = CallContext.current();
final User user = context.getCallingUser();
final Account account = context.getCallingAccount();
- Object[] result = Transaction.execute(new TransactionCallback<Object[]> () {
- @Override
+ Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkRemoveNicFromVm.class.getName());
+ List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+ VirtualMachine.Type.Instance, vm.getId(),
+ VmWorkRemoveNicFromVm.class.getName());
- VmWorkJobVO workJob = null;
- if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
- assert (pendingWorkJobs.size() == 1);
- workJob = pendingWorkJobs.get(0);
- } else {
+ VmWorkJobVO workJob = null;
+ if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+ assert (pendingWorkJobs.size() == 1);
+ workJob = pendingWorkJobs.get(0);
+ } else {
- workJob = new VmWorkJobVO(context.getContextId());
+ workJob = new VmWorkJobVO(context.getContextId());
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
- workJob.setCmd(VmWorkRemoveNicFromVm.class.getName());
+ workJob.setCmd(VmWorkRemoveNicFromVm.class.getName());
- workJob.setAccountId(account.getId());
- workJob.setUserId(user.getId());
+ workJob.setAccountId(account.getId());
+ workJob.setUserId(user.getId());
workJob.setVmType(VirtualMachine.Type.Instance);
- workJob.setVmInstanceId(vm.getId());
+ workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
- // save work context info (there are some duplications)
- VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
+ // save work context info (there are some duplications)
+ VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(),
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, nic.getId());
- workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
- }
- return new Object[] { workJob, new Long(workJob.getId()) };
- }
- });
+ }
+ return new Object[] {workJob, new Long(workJob.getId())};
+ }
+ });
- final long jobId = (Long)result[1];
- AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+ final long jobId = (Long)result[1];
+ AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
return new VmJobVirtualMachineOutcome((VmWorkJobVO)result[0], vm.getId());
}
public Outcome<VirtualMachine> removeVmFromNetworkThroughJobQueue(
- final VirtualMachine vm, final Network network, final URI broadcastUri) {
+ final VirtualMachine vm, final Network network, final URI broadcastUri) {
final CallContext context = CallContext.current();
final User user = context.getCallingUser();
final Account account = context.getCallingAccount();
Object[] result = Transaction.execute(new TransactionCallback<Object[]>() {
- @Override
+ @Override
public Object[] doInTransaction(TransactionStatus status) {
- _vmDao.lockRow(vm.getId(), true);
+ _vmDao.lockRow(vm.getId(), true);
- List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
- VirtualMachine.Type.Instance, vm.getId(),
- VmWorkRemoveVmF
<TRUNCATED>