You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mc...@apache.org on 2014/01/22 20:27:38 UTC

[13/53] [abbrv] git commit: updated refs/heads/rbac to 33cd1ab

CLOUDSTACK-5731: Use general instance type to categorize VM work jobs to correctly serialize VM operations


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a9733b5d
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a9733b5d
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a9733b5d

Branch: refs/heads/rbac
Commit: a9733b5df23a0d0f996726eca93f8309d17a920a
Parents: 2e5e403
Author: Kelven Yang <ke...@gmail.com>
Authored: Fri Jan 17 10:41:42 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Fri Jan 17 11:55:14 2014 -0800

----------------------------------------------------------------------
 .../src/com/cloud/vm/VmWorkConstants.java       |   1 +
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 171 ++++++++++++++++---
 .../src/com/cloud/vm/VmWorkJobDispatcher.java   |  49 +++---
 .../jobs/AsyncJobExecutionContext.java          |  48 ++++--
 .../framework/jobs/dao/VmWorkJobDao.java        |   2 +
 .../framework/jobs/dao/VmWorkJobDaoImpl.java    |  93 ++++++++--
 .../com/cloud/storage/VolumeApiServiceImpl.java |  99 +++++++++--
 .../vm/snapshot/VMSnapshotManagerImpl.java      |  80 ++++++++-
 8 files changed, 453 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
index 20e40b7..4627cfe 100644
--- a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
+++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
@@ -20,4 +20,5 @@ public interface VmWorkConstants {
     public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
     public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
     public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
+    public static final String VM_WORK_JOB_PLACEHOLDER = "VmWorkJobPlaceHolder";
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 89a0258..06805e1 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -561,6 +561,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         _executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
         _executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
         cancelWorkItems(_nodeId);
+
+        // cleanup left over place holder works
+        _workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId());
         return true;
     }
 
@@ -751,7 +754,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateStart(vmUuid, params, planToDeploy, planner);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateStart(vmUuid, params, planToDeploy, planner);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
 
@@ -1275,7 +1288,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
 
@@ -1567,7 +1592,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateStorageMigration(vmUuid, destPool);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateStorageMigration(vmUuid, destPool);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
 
@@ -1649,7 +1684,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateMigrate(vmUuid, srcHostId, dest);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateMigrate(vmUuid, srcHostId, dest);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
 
@@ -1920,7 +1965,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
 
@@ -2163,6 +2220,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             s_logger.trace("VM Operation Thread Running");
             try {
                 _workDao.cleanup(VmOpCleanupWait.value());
+
+                // TODO. hard-coded to one hour after job has been completed
+                Date cutDate = new Date(new Date().getTime() - 3600000);
+                _workJobDao.expungeCompletedWorkJobs(cutDate);
             } catch (Exception e) {
                 s_logger.error("VM Operations failed due to ", e);
             }
@@ -2199,7 +2260,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateReboot(vmUuid, params);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateReboot(vmUuid, params);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
 
@@ -3120,7 +3191,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateAddVmToNetwork(vm, network, requested);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                return orchestrateAddVmToNetwork(vm, network, requested);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
 
@@ -3223,7 +3303,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateRemoveNicFromVm(vm, nic);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                return orchestrateRemoveNicFromVm(vm, nic);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
 
@@ -3462,7 +3552,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
 
@@ -3711,7 +3811,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
 
@@ -4187,7 +4297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                     workJob.setAccountId(callingAccount.getId());
                     workJob.setUserId(callingUser.getId());
                     workJob.setStep(VmWorkJobVO.Step.Starting);
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4240,7 +4350,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
                     workJob.setStep(VmWorkJobVO.Step.Prepare);
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4293,7 +4403,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
                     workJob.setStep(VmWorkJobVO.Step.Prepare);
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4343,7 +4453,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4397,7 +4507,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4449,7 +4559,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4501,7 +4611,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4551,7 +4661,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4600,7 +4710,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4649,7 +4759,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4700,7 +4810,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
                     workJob.setAccountId(account.getId());
                     workJob.setUserId(user.getId());
-                    workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
                     workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4856,4 +4966,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
         return _jobHandlerProxy.handleVmWorkJob(work);
     }
+
+    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+        VmWorkJobVO workJob = new VmWorkJobVO("");
+
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+        workJob.setCmd("");
+        workJob.setCmdInfo("");
+
+        workJob.setAccountId(0);
+        workJob.setUserId(0);
+        workJob.setStep(VmWorkJobVO.Step.Starting);
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(instanceId);
+        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+        _workJobDao.persist(workJob);
+
+        return workJob;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index 285c8a2..31b2d9c 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -60,9 +60,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
             String cmd = job.getCmd();
             assert (cmd != null);
 
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated());
-
             Class<?> workClz = null;
             try {
                 workClz = Class.forName(job.getCmd());
@@ -80,27 +77,33 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
                 return;
             }
 
-            if (_handlers == null || _handlers.isEmpty()) {
-                s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
-                        + ", job origin: " + job.getRelated());
-                _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
-                return;
-            }
-
-            VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
-
-            if (handler == null) {
-                s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
-                        + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
-                _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
-                return;
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Run VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
+            try {
+                if (_handlers == null || _handlers.isEmpty()) {
+                    s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
+                            + ", job origin: " + job.getRelated());
+                    _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
+                    return;
+                }
+
+                VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
+
+                if (handler == null) {
+                    s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
+                            + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
+                    _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
+                    return;
+                }
+
+                CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
+
+                Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
+                _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
+            } finally {
+                if (s_logger.isDebugEnabled())
+                    s_logger.debug("Done with run of VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
             }
-
-            CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
-
-            Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
-            _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
-
         } catch(Throwable e) {
             s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e);
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
index f558e01..20125f4 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
@@ -30,7 +30,9 @@ import com.cloud.exception.ConcurrentOperationException;
 import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.ResourceUnavailableException;
 
-public class AsyncJobExecutionContext {
+public class AsyncJobExecutionContext  {
+    private static final Logger s_logger = Logger.getLogger(AsyncJobExecutionContext.class);
+
     private AsyncJob _job;
 
     static private AsyncJobManager s_jobMgr;
@@ -112,7 +114,8 @@ public class AsyncJobExecutionContext {
     }
 
     //
-    // check failure exception before we disjoin the worker job
+    // check failure exception before we disjoin the worker job, work job usually fails with exception
+    // this will help propogate exception between jobs
     // TODO : it is ugly and this will become unnecessary after we switch to full-async mode
     //
     public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
@@ -120,21 +123,34 @@ public class AsyncJobExecutionContext {
         assert (_job != null);
 
         AsyncJobJoinMapVO record = s_joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
-        if (record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
-            Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
-            if (exception != null && exception instanceof Exception) {
-                if (exception instanceof InsufficientCapacityException)
-                    throw (InsufficientCapacityException)exception;
-                else if (exception instanceof ConcurrentOperationException)
-                    throw (ConcurrentOperationException)exception;
-                else if (exception instanceof ResourceUnavailableException)
-                    throw (ResourceUnavailableException)exception;
-                else
-                    throw new RuntimeException((Exception)exception);
+        s_jobMgr.disjoinJob(_job.getId(), joinedJobId);
+
+        if (record.getJoinStatus() == JobInfo.Status.FAILED) {
+            if (record.getJoinResult() != null) {
+                Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
+                if (exception != null && exception instanceof Exception) {
+                    if (exception instanceof InsufficientCapacityException) {
+                        s_logger.error("Job " + joinedJobId + " failed with InsufficientCapacityException");
+                        throw (InsufficientCapacityException)exception;
+                    }
+                    else if (exception instanceof ConcurrentOperationException) {
+                        s_logger.error("Job " + joinedJobId + " failed with ConcurrentOperationException");
+                        throw (ConcurrentOperationException)exception;
+                    }
+                    else if (exception instanceof ResourceUnavailableException) {
+                        s_logger.error("Job " + joinedJobId + " failed with ResourceUnavailableException");
+                        throw (ResourceUnavailableException)exception;
+                    }
+                    else {
+                        s_logger.error("Job " + joinedJobId + " failed with exception");
+                        throw new RuntimeException((Exception)exception);
+                    }
+                }
+            } else {
+                s_logger.error("Job " + joinedJobId + " failed without providing an error object");
+                throw new RuntimeException("Job " + joinedJobId + " failed without providing an error object");
             }
         }
-
-        s_jobMgr.disjoinJob(_job.getId(), joinedJobId);
     }
 
     public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
@@ -151,6 +167,8 @@ public class AsyncJobExecutionContext {
     public static AsyncJobExecutionContext getCurrentExecutionContext() {
         AsyncJobExecutionContext context = s_currentExectionContext.get();
         if (context == null) {
+            // TODO, this has security implicitions
+            s_logger.warn("Job is executed without a context, setup psudo job for the executing thread");
             context = registerPseudoExecutionContext(CallContext.current().getCallingAccountId(),
                     CallContext.current().getCallingUserId());
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
index a3dbddf..44e39e4 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
@@ -35,4 +35,6 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
     void updateStep(long workJobId, Step step);
 
     void expungeCompletedWorkJobs(Date cutDate);
+
+    void expungeLeftoverWorkJobs(long msid);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
index 5e0ffb6..cf3e173 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
@@ -16,8 +16,11 @@
 // under the License.
 package org.apache.cloudstack.framework.jobs.dao;
 
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 
 import javax.annotation.PostConstruct;
 
@@ -31,13 +34,16 @@ import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.vm.VirtualMachine;
 
 public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
 
     protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
     protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
-    protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch;
 
     public VmWorkJobDaoImpl() {
     }
@@ -48,7 +54,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
         PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
         PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
         PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
-        PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
         PendingWorkJobSearch.done();
 
         PendingWorkJobByCommandSearch = createSearchBuilder();
@@ -58,11 +63,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
         PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
         PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
         PendingWorkJobByCommandSearch.done();
-
-        ExpungeWorkJobSearch = createSearchBuilder();
-        ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
-        ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
-        ExpungeWorkJobSearch.done();
     }
 
     @Override
@@ -115,11 +115,80 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
     }
 
     @Override
-    public void expungeCompletedWorkJobs(Date cutDate) {
-        SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
-        sc.setParameters("lastUpdated", cutDate);
-        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+    public void expungeCompletedWorkJobs(final Date cutDate) {
+        // current DAO machenism does not support following usage
+        /*
+                SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
+                sc.setParameters("lastUpdated",cutDate);
+                sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+
+                expunge(sc);
+        */
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+                PreparedStatement pstmt = null;
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)");
+                    pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?");
+                    pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+            }
+        });
+    }
 
-        expunge(sc);
+    @Override
+    public void expungeLeftoverWorkJobs(final long msid) {
+        // current DAO machenism does not support following usage
+        /*
+                SearchCriteria<VmWorkJobVO> sc = ExpungePlaceHolderWorkJobSearch.create();
+                sc.setParameters("dispatcher", "VmWorkJobPlaceHolder");
+                sc.setParameters("msid", msid);
+
+                expunge(sc);
+        */
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+                PreparedStatement pstmt = null;
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?)");
+                    pstmt.setLong(1, msid);
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?");
+                    pstmt.setLong(1, msid);
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/server/src/com/cloud/storage/VolumeApiServiceImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
index 8d475dd..fb35e23 100644
--- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
@@ -73,6 +74,7 @@ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
 import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
 
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Answer;
@@ -327,6 +329,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     @Inject
     protected AsyncJobManager _jobMgr;
 
+    @Inject
+    protected VmWorkJobDao _workJobDao;
+
     VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
 
     // TODO
@@ -911,8 +916,19 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
             if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
                 // avoid re-entrance
-                return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
-                        newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+
+                VmWorkJobVO placeHolder = null;
+                if (VmJobEnabled.value()) {
+                    placeHolder = createPlaceHolderWork(userVm.getId());
+                }
+                try {
+                    return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
+                            newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+                } finally {
+                    if (VmJobEnabled.value())
+                        _workJobDao.expunge(placeHolder.getId());
+                }
+
             } else {
                 Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize,
                         newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
@@ -1102,7 +1118,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(command.getVirtualMachineId());
+            }
+            try {
+                return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
 
@@ -1405,7 +1432,16 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateDetachVolumeFromVM(vmId, volumeId);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmId);
+            }
+            try {
+                return orchestrateDetachVolumeFromVM(vmId, volumeId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId);
 
@@ -1571,7 +1607,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
             if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
                 // avoid re-entrance
-                return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+
+                VmWorkJobVO placeHolder = null;
+                if (VmJobEnabled.value()) {
+                    placeHolder = createPlaceHolderWork(vm.getId());
+                }
+                try {
+                    return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+                } finally {
+                    if (VmJobEnabled.value())
+                        _workJobDao.expunge(placeHolder.getId());
+                }
+
             } else {
                 Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume);
 
@@ -1662,7 +1709,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
             if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
                 // avoid re-entrance
-                return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
+
+                VmWorkJobVO placeHolder = null;
+                if (VmJobEnabled.value()) {
+                    placeHolder = createPlaceHolderWork(vm.getId());
+                }
+                try {
+                    return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
+                } finally {
+                    if (VmJobEnabled.value())
+                        _workJobDao.expunge(placeHolder.getId());
+                }
+
             } else {
                 Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm);
 
@@ -2190,7 +2248,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2237,7 +2295,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2281,7 +2339,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2326,7 +2384,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2371,7 +2429,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2428,4 +2486,23 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
         return _jobHandlerProxy.handleVmWorkJob(work);
     }
+
+    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+        VmWorkJobVO workJob = new VmWorkJobVO("");
+
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+        workJob.setCmd("");
+        workJob.setCmdInfo("");
+
+        workJob.setAccountId(0);
+        workJob.setUserId(0);
+        workJob.setStep(VmWorkJobVO.Step.Starting);
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(instanceId);
+        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+        _workJobDao.persist(workJob);
+
+        return workJob;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a9733b5d/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index 7d6e0ec..1e3926d 100644
--- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -41,10 +41,12 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
 
 import com.cloud.event.ActionEvent;
 import com.cloud.event.EventTypes;
@@ -124,6 +126,9 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     @Inject
     AsyncJobManager _jobMgr;
 
+    @Inject
+    VmWorkJobDao _workJobDao;
+
     VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
 
     int _vmSnapshotMax;
@@ -364,7 +369,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmId);
+            }
+            try {
+                return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm);
 
@@ -452,7 +467,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateDeleteVMSnapshot(vmSnapshotId);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmSnapshot.getVmId());
+            }
+            try {
+                return orchestrateDeleteVMSnapshot(vmSnapshotId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId);
 
@@ -558,7 +582,18 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateRevertToVMSnapshot(vmSnapshotId);
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmSnapshotVo.getVmId());
+            }
+            try {
+                return orchestrateRevertToVMSnapshot(vmSnapshotId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId);
 
@@ -684,7 +719,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateDeleteAllVMSnapshots(vmId, type);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmId);
+            }
+            try {
+                return orchestrateDeleteAllVMSnapshots(vmId, type);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type);
 
@@ -828,7 +873,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -872,7 +917,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -916,7 +961,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -960,7 +1005,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -1009,4 +1054,23 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
         return _jobHandlerProxy.handleVmWorkJob(work);
     }
+
+    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+        VmWorkJobVO workJob = new VmWorkJobVO("");
+
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+        workJob.setCmd("");
+        workJob.setCmdInfo("");
+
+        workJob.setAccountId(0);
+        workJob.setUserId(0);
+        workJob.setStep(VmWorkJobVO.Step.Starting);
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(instanceId);
+        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+        _workJobDao.persist(workJob);
+
+        return workJob;
+    }
 }