You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/01/17 19:42:32 UTC

git commit: updated refs/heads/4.3 to be0d688

Updated Branches:
  refs/heads/4.3 f1b157114 -> be0d68851


CLOUDSTACK-5731: Use general instance type to categorize VM work jobs to correctly serialize VM operations


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/be0d6885
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/be0d6885
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/be0d6885

Branch: refs/heads/4.3
Commit: be0d688511577cdb227de8bca9314f71fe1ab51e
Parents: f1b1571
Author: Kelven Yang <ke...@gmail.com>
Authored: Fri Jan 17 10:41:42 2014 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Fri Jan 17 10:42:04 2014 -0800

----------------------------------------------------------------------
 .../src/com/cloud/vm/VmWorkConstants.java       |   1 +
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 171 ++++++++++++++++---
 .../src/com/cloud/vm/VmWorkJobDispatcher.java   |  49 +++---
 .../jobs/AsyncJobExecutionContext.java          |  48 ++++--
 .../framework/jobs/dao/VmWorkJobDao.java        |   4 +-
 .../framework/jobs/dao/VmWorkJobDaoImpl.java    | 121 ++++++++++---
 .../com/cloud/storage/VolumeApiServiceImpl.java |  99 +++++++++--
 .../vm/snapshot/VMSnapshotManagerImpl.java      |  80 ++++++++-
 8 files changed, 468 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
index 20e40b7..4627cfe 100644
--- a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
+++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java
@@ -20,4 +20,5 @@ public interface VmWorkConstants {
     public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
     public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
     public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
+    public static final String VM_WORK_JOB_PLACEHOLDER = "VmWorkJobPlaceHolder";
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index e90ff1f..0393e39 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -558,6 +558,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         _executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
         _executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
         cancelWorkItems(_nodeId);
+
+        // cleanup left over place holder works
+        _workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId());
         return true;
     }
 
@@ -749,7 +752,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateStart(vmUuid, params, planToDeploy, planner);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateStart(vmUuid, params, planToDeploy, planner);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
     	    Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
 
@@ -1275,7 +1288,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
     	} else {
     	    Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
 
@@ -1567,7 +1592,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateStorageMigration(vmUuid, destPool);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateStorageMigration(vmUuid, destPool);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
     	    Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
 
@@ -1649,7 +1684,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateMigrate(vmUuid, srcHostId, dest);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateMigrate(vmUuid, srcHostId, dest);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
     	    Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
 
@@ -1920,7 +1965,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
     	} else {
             Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
 
@@ -2163,6 +2220,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             s_logger.trace("VM Operation Thread Running");
             try {
                 _workDao.cleanup(VmOpCleanupWait.value());
+
+                // TODO. hard-coded to one hour after job has been completed
+                Date cutDate = new Date(new Date().getTime() - 3600000);
+                _workJobDao.expungeCompletedWorkJobs(cutDate);
             } catch (Exception e) {
                 s_logger.error("VM Operations failed due to ", e);
             }
@@ -2199,7 +2260,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateReboot(vmUuid, params);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateReboot(vmUuid, params);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
     	    Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
 
@@ -3123,7 +3194,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		return orchestrateAddVmToNetwork(vm, network,requested);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                return orchestrateAddVmToNetwork(vm, network, requested);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
             Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
 
@@ -3226,7 +3306,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		return orchestrateRemoveNicFromVm(vm, nic);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                return orchestrateRemoveNicFromVm(vm, nic);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
     	} else {
             Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
 
@@ -3462,7 +3552,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
     	    Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
 
@@ -3711,7 +3811,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
     		// avoid re-entrance
-    		return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                VirtualMachine vm = _vmDao.findByUuid(vmUuid);
+                placeHolder = createPlaceHolderWork(vm.getId());
+            }
+            try {
+                return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
     	} else {
     	    Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
 
@@ -4182,7 +4292,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     	            workJob.setAccountId(callingAccount.getId());
     	            workJob.setUserId(callingUser.getId());
     	            workJob.setStep(VmWorkJobVO.Step.Starting);
-    	            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
     	            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4235,7 +4345,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
 		            workJob.setStep(VmWorkJobVO.Step.Prepare);
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4288,7 +4398,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
 		            workJob.setStep(VmWorkJobVO.Step.Prepare);
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4340,7 +4450,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4394,7 +4504,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4448,7 +4558,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4502,7 +4612,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4554,7 +4664,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4605,7 +4715,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4656,7 +4766,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4709,7 +4819,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
 		            workJob.setAccountId(account.getId());
 		            workJob.setUserId(user.getId());
-		            workJob.setVmType(vm.getType());
+                    workJob.setVmType(VirtualMachine.Type.Instance);
 		            workJob.setVmInstanceId(vm.getId());
                     workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -4866,4 +4976,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
         return _jobHandlerProxy.handleVmWorkJob(work);
     }
+
+    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+        VmWorkJobVO workJob = new VmWorkJobVO("");
+
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+        workJob.setCmd("");
+        workJob.setCmdInfo("");
+
+        workJob.setAccountId(0);
+        workJob.setUserId(0);
+        workJob.setStep(VmWorkJobVO.Step.Starting);
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(instanceId);
+        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+        _workJobDao.persist(workJob);
+
+        return workJob;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index bea09b1..40b7784 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -59,9 +59,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
         	String cmd = job.getCmd();
         	assert(cmd != null);
 
-        	if(s_logger.isDebugEnabled())
-                s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated());
-
         	Class<?> workClz = null;
         	try {
         		workClz = Class.forName(job.getCmd());
@@ -79,27 +76,33 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
         		return;
             }
 
-            if (_handlers == null || _handlers.isEmpty()) {
-                s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
-                        + ", job origin: " + job.getRelated());
-                _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
-                return;
-            }
-
-            VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
-
-            if (handler == null) {
-                s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
-                        + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
-                _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
-                return;
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Run VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
+            try {
+                if (_handlers == null || _handlers.isEmpty()) {
+                    s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
+                            + ", job origin: " + job.getRelated());
+                    _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
+                    return;
+                }
+
+                VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
+
+                if (handler == null) {
+                    s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
+                            + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
+                    _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
+                    return;
+                }
+
+                CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
+
+                Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
+                _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
+            } finally {
+                if (s_logger.isDebugEnabled())
+                    s_logger.debug("Done with run of VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
             }
-
-            CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
-
-            Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
-            _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
-
         } catch(Throwable e) {
             s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e);
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
index e9bdae2..966e638 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java
@@ -31,6 +31,8 @@ import com.cloud.exception.InsufficientCapacityException;
 import com.cloud.exception.ResourceUnavailableException;
 
 public class AsyncJobExecutionContext  {
+    private static final Logger s_logger = Logger.getLogger(AsyncJobExecutionContext.class);
+
     private AsyncJob _job;
 
     static private AsyncJobManager _jobMgr;
@@ -112,7 +114,8 @@ public class AsyncJobExecutionContext  {
     }
 
     //
-	// check failure exception before we disjoin the worker job
+    // check failure exception before we disjoin the worker job, work job usually fails with exception
+    // this will help propogate exception between jobs
 	// TODO : it is ugly and this will become unnecessary after we switch to full-async mode
 	//
     public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
@@ -120,21 +123,34 @@ public class AsyncJobExecutionContext  {
     	assert(_job != null);
 
     	AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
-    	if(record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
-    		Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
-    		if(exception != null && exception instanceof Exception) {
-    			if(exception instanceof InsufficientCapacityException)
-    				throw (InsufficientCapacityException)exception;
-    			else if(exception instanceof ConcurrentOperationException)
-    				throw (ConcurrentOperationException)exception;
-    			else if(exception instanceof ResourceUnavailableException)
-    				throw (ResourceUnavailableException)exception;
-    			else
-    				throw new RuntimeException((Exception)exception);
-    		}
+        _jobMgr.disjoinJob(_job.getId(), joinedJobId);
+
+        if (record.getJoinStatus() == JobInfo.Status.FAILED) {
+            if (record.getJoinResult() != null) {
+                Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
+                if (exception != null && exception instanceof Exception) {
+                    if (exception instanceof InsufficientCapacityException) {
+                        s_logger.error("Job " + joinedJobId + " failed with InsufficientCapacityException");
+                        throw (InsufficientCapacityException)exception;
+                    }
+                    else if (exception instanceof ConcurrentOperationException) {
+                        s_logger.error("Job " + joinedJobId + " failed with ConcurrentOperationException");
+                        throw (ConcurrentOperationException)exception;
+                    }
+                    else if (exception instanceof ResourceUnavailableException) {
+                        s_logger.error("Job " + joinedJobId + " failed with ResourceUnavailableException");
+                        throw (ResourceUnavailableException)exception;
+                    }
+                    else {
+                        s_logger.error("Job " + joinedJobId + " failed with exception");
+                        throw new RuntimeException((Exception)exception);
+                    }
+                }
+            } else {
+                s_logger.error("Job " + joinedJobId + " failed without providing an error object");
+                throw new RuntimeException("Job " + joinedJobId + " failed without providing an error object");
+            }
     	}
-
-    	_jobMgr.disjoinJob(_job.getId(), joinedJobId);
     }
 
     public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
@@ -151,6 +167,8 @@ public class AsyncJobExecutionContext  {
 	public static AsyncJobExecutionContext getCurrentExecutionContext() {
 		AsyncJobExecutionContext context = s_currentExectionContext.get();
         if (context == null) {
+            // TODO, this has security implicitions
+            s_logger.warn("Job is executed without a context, setup psudo job for the executing thread");
             context = registerPseudoExecutionContext(CallContext.current().getCallingAccountId(),
                     CallContext.current().getCallingUserId());
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
index dfb063f..bd6289e 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java
@@ -29,7 +29,9 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
 	VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId);
 	List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId);
 	List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd);
-	
+
 	void updateStep(long workJobId, Step step);
 	void expungeCompletedWorkJobs(Date cutDate);
+
+    void expungeLeftoverWorkJobs(long msid);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
index 77515a7..1ee7b25 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java
@@ -16,8 +16,11 @@
 // under the License.
 package org.apache.cloudstack.framework.jobs.dao;
 
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 
 import javax.annotation.PostConstruct;
 
@@ -31,24 +34,26 @@ import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.vm.VirtualMachine;
 
 public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
 
     protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
     protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
-    protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch;
-	
+
 	public VmWorkJobDaoImpl() {
 	}
-	
+
 	@PostConstruct
 	public void init() {
 		PendingWorkJobSearch = createSearchBuilder();
 		PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
 		PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
 		PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
-		PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
 		PendingWorkJobSearch.done();
 
 		PendingWorkJobByCommandSearch = createSearchBuilder();
@@ -58,54 +63,49 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
 		PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
 		PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
 		PendingWorkJobByCommandSearch.done();
-		
-		ExpungeWorkJobSearch = createSearchBuilder();
-		ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
-		ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
-		ExpungeWorkJobSearch.done();
 	}
-	
+
 	@Override
     public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
-		
+
 		SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
 		sc.setParameters("jobStatus", JobInfo.	Status.IN_PROGRESS);
 		sc.setParameters("vmType", type);
 		sc.setParameters("vmInstanceId", instanceId);
-		
+
 		Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
 		List<VmWorkJobVO> result = this.listBy(sc, filter);
 		if(result != null && result.size() > 0)
 			return result.get(0);
-		
+
 		return null;
 	}
-	
+
 	@Override
     public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) {
-		
+
 		SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
 		sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
 		sc.setParameters("vmType", type);
 		sc.setParameters("vmInstanceId", instanceId);
-		
+
 		Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
 		return this.listBy(sc, filter);
 	}
 
 	@Override
     public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) {
-		
+
 		SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create();
 		sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
 		sc.setParameters("vmType", type);
 		sc.setParameters("vmInstanceId", instanceId);
 		sc.setParameters("cmd", jobCmd);
-		
+
 		Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
 		return this.listBy(sc, filter);
 	}
-	
+
 	@Override
     public void updateStep(long workJobId, Step step) {
 		VmWorkJobVO jobVo = findById(workJobId);
@@ -113,13 +113,82 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
 		jobVo.setLastUpdated(DateUtil.currentGMTTime());
 		update(workJobId, jobVo);
 	}
-	
+
 	@Override
-    public void expungeCompletedWorkJobs(Date cutDate) {
-		SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
-		sc.setParameters("lastUpdated",cutDate);
-        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
-		
-		expunge(sc);
+    public void expungeCompletedWorkJobs(final Date cutDate) {
+        // current DAO machenism does not support following usage
+        /*
+        		SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
+        		sc.setParameters("lastUpdated",cutDate);
+                sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
+
+        		expunge(sc);
+        */
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+                PreparedStatement pstmt = null;
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)");
+                    pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?");
+                    pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+            }
+        });
 	}
+
+    @Override
+    public void expungeLeftoverWorkJobs(final long msid) {
+        // current DAO machenism does not support following usage
+        /*
+                SearchCriteria<VmWorkJobVO> sc = ExpungePlaceHolderWorkJobSearch.create();
+                sc.setParameters("dispatcher", "VmWorkJobPlaceHolder");
+                sc.setParameters("msid", msid);
+
+                expunge(sc);
+        */
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+                PreparedStatement pstmt = null;
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?)");
+                    pstmt.setLong(1, msid);
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+
+                try {
+                    pstmt = txn.prepareAutoCloseStatement(
+                            "DELETE FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?");
+                    pstmt.setLong(1, msid);
+
+                    pstmt.execute();
+                } catch (SQLException e) {
+                } catch (Throwable e) {
+                }
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/server/src/com/cloud/storage/VolumeApiServiceImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
index 0c89541..337aea1 100644
--- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
@@ -73,6 +74,7 @@ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
 import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
 
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Answer;
@@ -324,6 +326,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     @Inject
     protected AsyncJobManager _jobMgr;
 
+    @Inject
+    protected VmWorkJobDao _workJobDao;
+
     VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
 
     // TODO
@@ -910,8 +915,19 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
             if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
                 // avoid re-entrance
-                return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
-                        newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+
+                VmWorkJobVO placeHolder = null;
+                if (VmJobEnabled.value()) {
+                    placeHolder = createPlaceHolderWork(userVm.getId());
+                }
+                try {
+                    return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
+                            newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+                } finally {
+                    if (VmJobEnabled.value())
+                        _workJobDao.expunge(placeHolder.getId());
+                }
+
             } else {
                 Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize,
                         newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
@@ -1101,7 +1117,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(command.getVirtualMachineId());
+            }
+            try {
+                return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
 
@@ -1412,7 +1439,16 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateDetachVolumeFromVM(vmId, volumeId);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmId);
+            }
+            try {
+                return orchestrateDetachVolumeFromVM(vmId, volumeId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId);
 
@@ -1577,7 +1613,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
             if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
                 // avoid re-entrance
-                return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+
+                VmWorkJobVO placeHolder = null;
+                if (VmJobEnabled.value()) {
+                    placeHolder = createPlaceHolderWork(vm.getId());
+                }
+                try {
+                    return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+                } finally {
+                    if (VmJobEnabled.value())
+                        _workJobDao.expunge(placeHolder.getId());
+                }
+
             } else {
                 Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume);
 
@@ -1668,7 +1715,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
             if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
                 // avoid re-entrance
-                return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
+
+                VmWorkJobVO placeHolder = null;
+                if (VmJobEnabled.value()) {
+                    placeHolder = createPlaceHolderWork(vm.getId());
+                }
+                try {
+                    return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
+                } finally {
+                    if (VmJobEnabled.value())
+                        _workJobDao.expunge(placeHolder.getId());
+                }
+
             } else {
                 Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm);
 
@@ -2197,7 +2255,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2244,7 +2302,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2289,7 +2347,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2334,7 +2392,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2379,7 +2437,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -2436,4 +2494,23 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
         return _jobHandlerProxy.handleVmWorkJob(work);
     }
+
+    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+        VmWorkJobVO workJob = new VmWorkJobVO("");
+
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+        workJob.setCmd("");
+        workJob.setCmdInfo("");
+
+        workJob.setAccountId(0);
+        workJob.setUserId(0);
+        workJob.setStep(VmWorkJobVO.Step.Starting);
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(instanceId);
+        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+        _workJobDao.persist(workJob);
+
+        return workJob;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be0d6885/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index 5bf248d..e273e1a 100644
--- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -41,10 +41,12 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
 
 import com.cloud.event.ActionEvent;
 import com.cloud.event.EventTypes;
@@ -124,6 +126,9 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     @Inject
     AsyncJobManager _jobMgr;
 
+    @Inject
+    VmWorkJobDao _workJobDao;
+
     VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
 
     int _vmSnapshotMax;
@@ -367,7 +372,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmId);
+            }
+            try {
+                return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm);
 
@@ -455,7 +470,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateDeleteVMSnapshot(vmSnapshotId);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmSnapshot.getVmId());
+            }
+            try {
+                return orchestrateDeleteVMSnapshot(vmSnapshotId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
         } else {
             Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId);
 
@@ -564,7 +588,18 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateRevertToVMSnapshot(vmSnapshotId);
+
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmSnapshotVo.getVmId());
+            }
+            try {
+                return orchestrateRevertToVMSnapshot(vmSnapshotId);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId);
 
@@ -694,7 +729,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
-            return orchestrateDeleteAllVMSnapshots(vmId, type);
+            VmWorkJobVO placeHolder = null;
+            if (VmJobEnabled.value()) {
+                placeHolder = createPlaceHolderWork(vmId);
+            }
+            try {
+                return orchestrateDeleteAllVMSnapshots(vmId, type);
+            } finally {
+                if (VmJobEnabled.value())
+                    _workJobDao.expunge(placeHolder.getId());
+            }
+
         } else {
             Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type);
 
@@ -838,7 +883,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -882,7 +927,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -926,7 +971,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -970,7 +1015,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 workJob.setAccountId(callingAccount.getId());
                 workJob.setUserId(callingUser.getId());
                 workJob.setStep(VmWorkJobVO.Step.Starting);
-                workJob.setVmType(vm.getType());
+                workJob.setVmType(VirtualMachine.Type.Instance);
                 workJob.setVmInstanceId(vm.getId());
                 workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
 
@@ -1019,4 +1064,23 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
         return _jobHandlerProxy.handleVmWorkJob(work);
     }
+
+    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
+        VmWorkJobVO workJob = new VmWorkJobVO("");
+
+        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
+        workJob.setCmd("");
+        workJob.setCmdInfo("");
+
+        workJob.setAccountId(0);
+        workJob.setUserId(0);
+        workJob.setStep(VmWorkJobVO.Step.Starting);
+        workJob.setVmType(VirtualMachine.Type.Instance);
+        workJob.setVmInstanceId(instanceId);
+        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
+
+        _workJobDao.persist(workJob);
+
+        return workJob;
+    }
 }