You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/01/17 20:55:38 UTC
git commit: updated refs/heads/master to a9733b5
Updated Branches:
refs/heads/master 2e5e403e3 -> a9733b5df
CLOUDSTACK-5731: Use general instance type to categorize VM work jobs to correctly serialize VM operations
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a9733b5d
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a9733b5d
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a9733b5d
Branch: refs/heads/master
Commit: a9733b5df23a0d0f996726eca93f8309d17a920a
Parents: 2e5e403
Author: Kelven Yang <ke...@gmail.com>
Authored: Fri Jan 17 10:41:42 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Fri Jan 17 11:55:14 2014 -0800
----------------------------------------------------------------------
.../src/com/cloud/vm/VmWorkConstants.java | 1 +
.../com/cloud/vm/VirtualMachineManagerImpl.java | 171 ++++++++++++++++---
.../src/com/cloud/vm/VmWorkJobDispatcher.java | 49 +++---
.../jobs/AsyncJobExecutionContext.java | 48 ++++--
.../framework/jobs/dao/VmWorkJobDao.java | 2 +
.../framework/jobs/dao/VmWorkJobDaoImpl.java | 93 ++++++++--
.../com/cloud/storage/VolumeApiServiceImpl.java | 99 +++++++++--
.../vm/snapshot/VMSnapshotManagerImpl.java | 80 ++++++++-
8 files changed, 453 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
index 20e40b7..4627cfe 100644
--- a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
+++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
@@ -20,4 +20,5 @@ public interface VmWorkConstants {
public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
+ public static final String VM_WORK_JOB_PLACEHOLDER = "VmWorkJobPlaceHolder";
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 89a0258..06805e1 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -561,6 +561,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
cancelWorkItems(_nodeId);
+
+ // cleanup left over place holder works
+ _workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId());
return true;
}
@@ -751,7 +754,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateStart(vmUuid, params, planToDeploy, planner);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateStart(vmUuid, params, planToDeploy, planner);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
@@ -1275,7 +1288,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
@@ -1567,7 +1592,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateStorageMigration(vmUuid, destPool);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateStorageMigration(vmUuid, destPool);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
@@ -1649,7 +1684,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateMigrate(vmUuid, srcHostId, dest);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateMigrate(vmUuid, srcHostId, dest);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
@@ -1920,7 +1965,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
@@ -2163,6 +2220,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
s_logger.trace("VM Operation Thread Running");
try {
_workDao.cleanup(VmOpCleanupWait.value());
+
+ // TODO. hard-coded to one hour after job has been completed
+ Date cutDate = new Date(new Date().getTime() - 3600000);
+ _workJobDao.expungeCompletedWorkJobs(cutDate);
} catch (Exception e) {
s_logger.error("VM Operations failed due to ", e);
}
@@ -2199,7 +2260,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateReboot(vmUuid, params);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateReboot(vmUuid, params);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
@@ -3120,7 +3191,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateAddVmToNetwork(vm, network, requested);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ return orchestrateAddVmToNetwork(vm, network, requested);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
@@ -3223,7 +3303,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateRemoveNicFromVm(vm, nic);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ return orchestrateRemoveNicFromVm(vm, nic);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
@@ -3462,7 +3552,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
@@ -3711,7 +3811,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
@@ -4187,7 +4297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4240,7 +4350,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setStep(VmWorkJobVO.Step.Prepare);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4293,7 +4403,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setStep(VmWorkJobVO.Step.Prepare);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4343,7 +4453,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4397,7 +4507,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4449,7 +4559,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4501,7 +4611,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4551,7 +4661,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4600,7 +4710,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4649,7 +4759,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4700,7 +4810,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -4856,4 +4966,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
return _jobHandlerProxy.handleVmWorkJob(work);
}
+
+ private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+ VmWorkJobVO workJob = new VmWorkJobVO("");
+
+ workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+ workJob.setCmd("");
+ workJob.setCmdInfo("");
+
+ workJob.setAccountId(0);
+ workJob.setUserId(0);
+ workJob.setStep(VmWorkJobVO.Step.Starting);
+ workJob.setVmType(VirtualMachine.Type.Instance);
+ workJob.setVmInstanceId(instanceId);
+ workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+ _workJobDao.persist(workJob);
+
+ return workJob;
+ }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index 285c8a2..31b2d9c 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -60,9 +60,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
String cmd = job.getCmd();
assert (cmd != null);
- if (s_logger.isDebugEnabled())
- s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated());
-
Class<?> workClz = null;
try {
workClz = Class.forName(job.getCmd());
@@ -80,27 +77,33 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
return;
}
- if (_handlers == null || _handlers.isEmpty()) {
- s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
- + ", job origin: " + job.getRelated());
- _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
- return;
- }
-
- VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
-
- if (handler == null) {
- s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
- + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
- _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
- return;
+ if (s_logger.isDebugEnabled())
+ s_logger.debug("Run VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
+ try {
+ if (_handlers == null || _handlers.isEmpty()) {
+ s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
+ + ", job origin: " + job.getRelated());
+ _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
+ return;
+ }
+
+ VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
+
+ if (handler == null) {
+ s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
+ + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
+ _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
+ return;
+ }
+
+ CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
+
+ Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
+ _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
+ } finally {
+ if (s_logger.isDebugEnabled())
+ s_logger.debug("Done with run of VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
}
-
- CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
-
- Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
- _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
-
} catch(Throwable e) {
s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
index f558e01..20125f4 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
@@ -30,7 +30,9 @@ import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.ResourceUnavailableException;
-public class AsyncJobExecutionContext {
+public class AsyncJobExecutionContext {
+ private static final Logger s_logger = Logger.getLogger(AsyncJobExecutionContext.class);
+
private AsyncJob _job;
static private AsyncJobManager s_jobMgr;
@@ -112,7 +114,8 @@ public class AsyncJobExecutionContext {
}
//
- // check failure exception before we disjoin the worker job
+ // check failure exception before we disjoin the worker job, work job usually fails with exception
+ // this will help propogate exception between jobs
// TODO : it is ugly and this will become unnecessary after we switch to full-async mode
//
public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
@@ -120,21 +123,34 @@ public class AsyncJobExecutionContext {
assert (_job != null);
AsyncJobJoinMapVO record = s_joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
- if (record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
- Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
- if (exception != null && exception instanceof Exception) {
- if (exception instanceof InsufficientCapacityException)
- throw (InsufficientCapacityException)exception;
- else if (exception instanceof ConcurrentOperationException)
- throw (ConcurrentOperationException)exception;
- else if (exception instanceof ResourceUnavailableException)
- throw (ResourceUnavailableException)exception;
- else
- throw new RuntimeException((Exception)exception);
+ s_jobMgr.disjoinJob(_job.getId(), joinedJobId);
+
+ if (record.getJoinStatus() == JobInfo.Status.FAILED) {
+ if (record.getJoinResult() != null) {
+ Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
+ if (exception != null && exception instanceof Exception) {
+ if (exception instanceof InsufficientCapacityException) {
+ s_logger.error("Job " + joinedJobId + " failed with InsufficientCapacityException");
+ throw (InsufficientCapacityException)exception;
+ }
+ else if (exception instanceof ConcurrentOperationException) {
+ s_logger.error("Job " + joinedJobId + " failed with ConcurrentOperationException");
+ throw (ConcurrentOperationException)exception;
+ }
+ else if (exception instanceof ResourceUnavailableException) {
+ s_logger.error("Job " + joinedJobId + " failed with ResourceUnavailableException");
+ throw (ResourceUnavailableException)exception;
+ }
+ else {
+ s_logger.error("Job " + joinedJobId + " failed with exception");
+ throw new RuntimeException((Exception)exception);
+ }
+ }
+ } else {
+ s_logger.error("Job " + joinedJobId + " failed without providing an error object");
+ throw new RuntimeException("Job " + joinedJobId + " failed without providing an error object");
}
}
-
- s_jobMgr.disjoinJob(_job.getId(), joinedJobId);
}
public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
@@ -151,6 +167,8 @@ public class AsyncJobExecutionContext {
public static AsyncJobExecutionContext getCurrentExecutionContext() {
AsyncJobExecutionContext context = s_currentExectionContext.get();
if (context == null) {
+ // TODO, this has security implicitions
+ s_logger.warn("Job is executed without a context, setup psudo job for the executing thread");
context = registerPseudoExecutionContext(CallContext.current().getCallingAccountId(),
CallContext.current().getCallingUserId());
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
index a3dbddf..44e39e4 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
@@ -35,4 +35,6 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
void updateStep(long workJobId, Step step);
void expungeCompletedWorkJobs(Date cutDate);
+
+ void expungeLeftoverWorkJobs(long msid);
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
index 5e0ffb6..cf3e173 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
@@ -16,8 +16,11 @@
// under the License.
package org.apache.cloudstack.framework.jobs.dao;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.TimeZone;
import javax.annotation.PostConstruct;
@@ -31,13 +34,16 @@ import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
import com.cloud.vm.VirtualMachine;
public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
- protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch;
public VmWorkJobDaoImpl() {
}
@@ -48,7 +54,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
- PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
PendingWorkJobSearch.done();
PendingWorkJobByCommandSearch = createSearchBuilder();
@@ -58,11 +63,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
PendingWorkJobByCommandSearch.done();
-
- ExpungeWorkJobSearch = createSearchBuilder();
- ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
- ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
- ExpungeWorkJobSearch.done();
}
@Override
@@ -115,11 +115,80 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
}
@Override
- public void expungeCompletedWorkJobs(Date cutDate) {
- SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
- sc.setParameters("lastUpdated", cutDate);
- sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+ public void expungeCompletedWorkJobs(final Date cutDate) {
+ // current DAO machenism does not support following usage
+ /*
+ SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
+ sc.setParameters("lastUpdated",cutDate);
+ sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+
+ expunge(sc);
+ */
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(
+ "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)");
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+
+ pstmt.execute();
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+
+ try {
+ pstmt = txn.prepareAutoCloseStatement(
+ "DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?");
+ pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+
+ pstmt.execute();
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ }
+ });
+ }
- expunge(sc);
+ @Override
+ public void expungeLeftoverWorkJobs(final long msid) {
+ // current DAO machenism does not support following usage
+ /*
+ SearchCriteria<VmWorkJobVO> sc = ExpungePlaceHolderWorkJobSearch.create();
+ sc.setParameters("dispatcher", "VmWorkJobPlaceHolder");
+ sc.setParameters("msid", msid);
+
+ expunge(sc);
+ */
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+ PreparedStatement pstmt = null;
+ try {
+ pstmt = txn.prepareAutoCloseStatement(
+ "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?)");
+ pstmt.setLong(1, msid);
+
+ pstmt.execute();
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+
+ try {
+ pstmt = txn.prepareAutoCloseStatement(
+ "DELETE FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?");
+ pstmt.setLong(1, msid);
+
+ pstmt.execute();
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/server/src/com/cloud/storage/VolumeApiServiceImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
index 8d475dd..fb35e23 100644
--- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
@@ -73,6 +74,7 @@ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
@@ -327,6 +329,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
@Inject
protected AsyncJobManager _jobMgr;
+ @Inject
+ protected VmWorkJobDao _workJobDao;
+
VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
// TODO
@@ -911,8 +916,19 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
- newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(userVm.getId());
+ }
+ try {
+ return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
+ newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
@@ -1102,7 +1118,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(command.getVirtualMachineId());
+ }
+ try {
+ return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
@@ -1405,7 +1432,16 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateDetachVolumeFromVM(vmId, volumeId);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vmId);
+ }
+ try {
+ return orchestrateDetachVolumeFromVM(vmId, volumeId);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId);
@@ -1571,7 +1607,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume);
@@ -1662,7 +1709,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vm.getId());
+ }
+ try {
+ return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm);
@@ -2190,7 +2248,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -2237,7 +2295,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -2281,7 +2339,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -2326,7 +2384,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -2371,7 +2429,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -2428,4 +2486,23 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
return _jobHandlerProxy.handleVmWorkJob(work);
}
+
+ private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+ VmWorkJobVO workJob = new VmWorkJobVO("");
+
+ workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+ workJob.setCmd("");
+ workJob.setCmdInfo("");
+
+ workJob.setAccountId(0);
+ workJob.setUserId(0);
+ workJob.setStep(VmWorkJobVO.Step.Starting);
+ workJob.setVmType(VirtualMachine.Type.Instance);
+ workJob.setVmInstanceId(instanceId);
+ workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+ _workJobDao.persist(workJob);
+
+ return workJob;
+ }
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index 7d6e0ec..1e3926d 100644
--- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -41,10 +41,12 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.event.ActionEvent;
import com.cloud.event.EventTypes;
@@ -124,6 +126,9 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
@Inject
AsyncJobManager _jobMgr;
+ @Inject
+ VmWorkJobDao _workJobDao;
+
VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
int _vmSnapshotMax;
@@ -364,7 +369,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vmId);
+ }
+ try {
+ return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm);
@@ -452,7 +467,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateDeleteVMSnapshot(vmSnapshotId);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vmSnapshot.getVmId());
+ }
+ try {
+ return orchestrateDeleteVMSnapshot(vmSnapshotId);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
} else {
Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId);
@@ -558,7 +582,18 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateRevertToVMSnapshot(vmSnapshotId);
+
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vmSnapshotVo.getVmId());
+ }
+ try {
+ return orchestrateRevertToVMSnapshot(vmSnapshotId);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId);
@@ -684,7 +719,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
- return orchestrateDeleteAllVMSnapshots(vmId, type);
+ VmWorkJobVO placeHolder = null;
+ if (VmJobEnabled.value()) {
+ placeHolder = createPlaceHolderWork(vmId);
+ }
+ try {
+ return orchestrateDeleteAllVMSnapshots(vmId, type);
+ } finally {
+ if (VmJobEnabled.value())
+ _workJobDao.expunge(placeHolder.getId());
+ }
+
} else {
Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type);
@@ -828,7 +873,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -872,7 +917,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -916,7 +961,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -960,7 +1005,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
- workJob.setVmType(vm.getType());
+ workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@@ -1009,4 +1054,23 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
return _jobHandlerProxy.handleVmWorkJob(work);
}
+
+ private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+ VmWorkJobVO workJob = new VmWorkJobVO("");
+
+ workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+ workJob.setCmd("");
+ workJob.setCmdInfo("");
+
+ workJob.setAccountId(0);
+ workJob.setUserId(0);
+ workJob.setStep(VmWorkJobVO.Step.Starting);
+ workJob.setVmType(VirtualMachine.Type.Instance);
+ workJob.setVmInstanceId(instanceId);
+ workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+ _workJobDao.persist(workJob);
+
+ return workJob;
+ }
}