You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/12/21 01:46:11 UTC

[2/2] git commit: updated refs/heads/4.3 to 497feab

CLOUDSTACK-669: covert VMsnapshot orchestration flows to make them be serialized with other VM operations


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/497feab8
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/497feab8
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/497feab8

Branch: refs/heads/4.3
Commit: 497feab84119fabc31768f8a52772fd9e308818f
Parents: bef63e2
Author: Kelven Yang <ke...@gmail.com>
Authored: Fri Dec 20 16:45:08 2013 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Fri Dec 20 16:45:27 2013 -0800

----------------------------------------------------------------------
 ...spring-engine-orchestration-core-context.xml |   1 +
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 132 ++--
 .../framework/jobs/AsyncJobManager.java         |  38 +-
 .../jobs/impl/AsyncJobManagerImpl.java          |  16 +
 .../com/cloud/storage/VmWorkMigrateVolume.java  |  46 ++
 .../com/cloud/storage/VmWorkResizeVolume.java   |  61 ++
 .../com/cloud/storage/VolumeApiServiceImpl.java | 243 +++++++-
 .../vm/snapshot/VMSnapshotManagerImpl.java      | 608 +++++++++++++++++--
 .../vm/snapshot/VmWorkCreateVMSnapshot.java     |  41 ++
 .../vm/snapshot/VmWorkDeleteAllVMSnapshots.java |  35 ++
 .../vm/snapshot/VmWorkDeleteVMSnapshot.java     |  35 ++
 .../vm/snapshot/VmWorkRevertToVMSnapshot.java   |  35 ++
 12 files changed, 1136 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
----------------------------------------------------------------------
diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
index 2e35ae5..7445102 100644
--- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
+++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
@@ -81,6 +81,7 @@
             <map>
                 <entry key="VirtualMachineManagerImpl" value-ref="clusteredVirtualMachineManagerImpl" />
                 <entry key="VolumeApiServiceImpl" value-ref="volumeApiServiceImpl" />
+                <entry key="VMSnapshotManagerImpl" value-ref="vMSnapshotManagerImpl" />
             </map>
         </property>
     </bean>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index e9d3a39..d9226ec 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -42,7 +42,6 @@ import javax.naming.ConfigurationException;
 
 import org.apache.log4j.Logger;
 
-import com.cloud.deploy.DeploymentPlanner;
 import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@@ -119,6 +118,7 @@ import com.cloud.dc.dao.HostPodDao;
 import com.cloud.deploy.DataCenterDeployment;
 import com.cloud.deploy.DeployDestination;
 import com.cloud.deploy.DeploymentPlan;
+import com.cloud.deploy.DeploymentPlanner;
 import com.cloud.deploy.DeploymentPlanner.ExcludeList;
 import com.cloud.deploy.DeploymentPlanningManager;
 import com.cloud.domain.dao.DomainDao;
@@ -722,12 +722,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 				throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-	    	if(jobException != null) {
-		    	if(jobException instanceof ConcurrentOperationException)
-		    		throw (ConcurrentOperationException)jobException;
-		    	else if(jobException instanceof ResourceUnavailableException)
-		    		throw (ResourceUnavailableException)jobException;
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof ResourceUnavailableException)
+                    throw (ResourceUnavailableException)jobResult;
 	    	}
     	}
     }
@@ -736,7 +736,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     @Override
     public void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy, DeploymentPlanner planner)
             throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
-        
+
     	CallContext cctxt = CallContext.current();
         Account account = cctxt.getCallingAccount();
         User caller = cctxt.getCallingUser();
@@ -847,7 +847,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                     }
                 }
 
-                Account owner = _entityMgr.findById(Account.class, vm.getAccountId());              
+                Account owner = _entityMgr.findById(Account.class, vm.getAccountId());
                 VirtualMachineProfileImpl vmProfile = new VirtualMachineProfileImpl(vm, template, offering, owner, params);
                 DeployDestination dest = null;
                 try {
@@ -1246,14 +1246,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 				throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-	    	if(jobException != null) {
-	    		if(jobException instanceof AgentUnavailableException)
-	    			throw (AgentUnavailableException)jobException;
-	    		else if(jobException instanceof ConcurrentOperationException)
-		    		throw (ConcurrentOperationException)jobException;
-		    	else if(jobException instanceof OperationTimedoutException)
-		    		throw (OperationTimedoutException)jobException;
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof AgentUnavailableException)
+                    throw (AgentUnavailableException)jobResult;
+                else if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof OperationTimedoutException)
+                    throw (OperationTimedoutException)jobResult;
 	    	}
     	}
     }
@@ -1536,10 +1536,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 				throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-	    	if(jobException != null) {
-		    	if(jobException instanceof RuntimeException)
-		    		throw (RuntimeException)jobException;
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof RuntimeException)
+                    throw (RuntimeException)jobResult;
 	    	}
     	}
     }
@@ -1618,14 +1618,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 				throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-	    	if(jobException != null) {
-		    	if(jobException instanceof ResourceUnavailableException)
-	    			throw (ResourceUnavailableException)jobException;
-	    		else if(jobException instanceof ConcurrentOperationException)
-		    		throw (ConcurrentOperationException)jobException;
-	    		else if(jobException instanceof RuntimeException)
-		    		throw (RuntimeException)jobException;
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+	    	if(jobResult != null) {
+		    	if(jobResult instanceof ResourceUnavailableException)
+	    			throw (ResourceUnavailableException)jobResult;
+	    		else if(jobResult instanceof ConcurrentOperationException)
+		    		throw (ConcurrentOperationException)jobResult;
+	    		else if(jobResult instanceof RuntimeException)
+		    		throw (RuntimeException)jobResult;
 	    	}
     	}
     }
@@ -1889,7 +1889,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                 throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
+            Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob());
 	    	if(jobException != null) {
 	    	    if(jobException instanceof ResourceUnavailableException)
                     throw (ResourceUnavailableException)jobException;
@@ -2168,14 +2168,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 				throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-	    	if(jobException != null) {
-	    		if(jobException instanceof ResourceUnavailableException)
-	    			throw (ResourceUnavailableException)jobException;
-	    		else if(jobException instanceof ConcurrentOperationException)
-		    		throw (ConcurrentOperationException)jobException;
-	    		else if(jobException instanceof InsufficientCapacityException)
-	    			throw (InsufficientCapacityException)jobException;
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ResourceUnavailableException)
+                    throw (ResourceUnavailableException)jobResult;
+                else if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof InsufficientCapacityException)
+                    throw (InsufficientCapacityException)jobResult;
 	    	}
     	}
     }
@@ -3097,7 +3097,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 	    		NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
 	    		return nic;
 	    	} else {
-		    	Throwable jobException = retrieveExecutionException(outcome.getJob());
+                Object jobException = _jobMgr.unmarshallResultObject(outcome.getJob());
 		    	if(jobException != null) {
 		    	    if(jobException instanceof ResourceUnavailableException)
 	                    throw (ResourceUnavailableException)jobException;
@@ -3204,14 +3204,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 	    		Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
 	    		return result;
 	    	} else {
-		    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-		    	if(jobException != null) {
-		    	    if(jobException instanceof ResourceUnavailableException)
-	                    throw (ResourceUnavailableException)jobException;
-		    		else if(jobException instanceof ConcurrentOperationException)
-			    	    throw (ConcurrentOperationException)jobException;
-		    		else if(jobException instanceof RuntimeException)
-		    			throw (RuntimeException)jobException;
+                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+                if (jobResult != null) {
+                    if (jobResult instanceof ResourceUnavailableException)
+                        throw (ResourceUnavailableException)jobResult;
+                    else if (jobResult instanceof ConcurrentOperationException)
+                        throw (ConcurrentOperationException)jobResult;
+                    else if (jobResult instanceof RuntimeException)
+                        throw (RuntimeException)jobResult;
 	            }
 
 		    	throw new RuntimeException("Job failed with un-handled exception");
@@ -3439,12 +3439,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 				throw new RuntimeException("Execution excetion", e);
 			}
 
-	    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-	    	if(jobException != null) {
-	    		if(jobException instanceof ResourceUnavailableException)
-	    			throw (ResourceUnavailableException)jobException;
-	    		else if(jobException instanceof ConcurrentOperationException)
-		    		throw (ConcurrentOperationException)jobException;
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ResourceUnavailableException)
+                    throw (ResourceUnavailableException)jobResult;
+                else if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
 	    	}
     	}
     }
@@ -3693,12 +3693,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 	    	if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
 	    		return _entityMgr.findById(VMInstanceVO.class, vm.getId());
 	    	} else {
-		    	Throwable jobException = retrieveExecutionException(outcome.getJob());
-		    	if(jobException != null) {
-		    		if(jobException instanceof ResourceUnavailableException)
-		    			throw (ResourceUnavailableException)jobException;
-		    		else if(jobException instanceof ConcurrentOperationException)
-			    		throw (ConcurrentOperationException)jobException;
+                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+                if (jobResult != null) {
+                    if (jobResult instanceof ResourceUnavailableException)
+                        throw (ResourceUnavailableException)jobResult;
+                    else if (jobResult instanceof ConcurrentOperationException)
+                        throw (ConcurrentOperationException)jobResult;
 		    	}
 
 		    	throw new RuntimeException("Failed with un-handled exception");
@@ -4111,20 +4111,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         }
     }
 
-    public Throwable retrieveExecutionException(AsyncJob job) {
-    	assert(job != null);
-        assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER));
-
-    	AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
-    	if(jobVo != null && jobVo.getResult() != null) {
-    		Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
-
-    		if(obj != null && obj instanceof Throwable)
-    			return (Throwable)obj;
-    	}
-    	return null;
-    }
-
     //
     // TODO build a common pattern to reduce code duplication in following methods
     // no time for this at current iteration

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
index bc06101..6d5358e 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobManager.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.cloudstack.framework.jobs;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@@ -25,13 +26,13 @@ import com.cloud.utils.Predicate;
 import com.cloud.utils.component.Manager;
 
 public interface AsyncJobManager extends Manager {
-    
+
 	public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
 
     AsyncJobVO getAsyncJob(long jobId);
-	
+
 	List<? extends AsyncJob> findInstancePendingAsyncJobs(String instanceType, Long accountId);
-	
+
 	long submitAsyncJob(AsyncJob job);
 	long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId);
 
@@ -41,11 +42,11 @@ public interface AsyncJobManager extends Manager {
     void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
     void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
     	journalText, String journalObjJson);
-    
+
 	/**
 	 * A running thread inside management server can have a 1:1 linked pseudo job.
 	 * This is to help make some legacy code work without too dramatic changes.
-	 * 
+	 *
 	 * All pseudo jobs should be expunged upon management start event
 	 *
 	 * @return pseudo job for the thread
@@ -56,21 +57,21 @@ public interface AsyncJobManager extends Manager {
      * Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
      * in synchronous way. Caller needs to use waitAndCheck() to check the completion status
      * of the down-level job
-     * 
+     *
      * Due to the amount of legacy code that relies on synchronous-call semantics, this form of joinJob
      * is used mostly
-     * 
-     * 
+     *
+     *
      * @param jobId upper job that is going to wait the completion of a down-level job
      * @param joinJobId down-level job
 	 */
 	void joinJob(long jobId, long joinJobId);
-	
+
     /**
      * Used by upper level job to wait for completion of a down-level job (usually VmWork jobs)
      * in asynchronous way, it will cause upper job to cease current execution, upper job will be
      * rescheduled to execute periodically or on wakeup events detected from message bus
-     * 
+     *
      * @param jobId upper job that is going to wait the completion of a down-level job
      * @param joinJobId down-level job
      * @Param wakeupHandler	wake-up handler
@@ -81,18 +82,18 @@ public interface AsyncJobManager extends Manager {
      */
     void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakupDispatcher,
     		String[] wakeupTopicsOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds);
-    
+
     /**
      * Dis-join two related jobs
-     * 
+     *
      * @param jobId
      * @param joinedJobId
      */
     void disjoinJob(long jobId, long joinedJobId);
-    
+
     /**
      * Used by down-level job to notify its completion to upper level jobs
-     * 
+     *
      * @param joinJobId down-level job for upper level job to join with
      * @param joinStatus AsyncJobConstants status code to indicate success or failure of the
      * 					down-level job
@@ -102,14 +103,14 @@ public interface AsyncJobManager extends Manager {
      * 					object-stream based serialization instead of GSON
      */
     void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult);
-   
+
     void releaseSyncSource();
     void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit);
-    
+
     /**
      * This method will be deprecated after all code has been migrated to fully-asynchronous mode
      * that uses async-feature of joinJob/disjoinJob
-     * 
+     *
      * @param wakupTopicsOnMessageBus topic on message bus to wakeup the wait
      * @param checkIntervalInMilliSeconds time to break out wait for checking predicate condition
      * @param timeoutInMiliseconds time out to break out the whole wait process
@@ -122,4 +123,7 @@ public interface AsyncJobManager extends Manager {
 
     AsyncJob queryJob(long jobId, boolean updatePollTime);
 
+    String marshallResultObject(Serializable obj);
+
+    Object unmarshallResultObject(AsyncJob job);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 19e5182..9a0c749 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.cloudstack.framework.jobs.impl;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
@@ -652,6 +653,21 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         return false;
     }
 
+    @Override
+    public String marshallResultObject(Serializable obj) {
+        if (obj != null)
+            return JobSerializerHelper.toObjectSerializedString(obj);
+
+        return null;
+    }
+
+    @Override
+    public Object unmarshallResultObject(AsyncJob job) {
+        if(job.getResult() != null)
+            return JobSerializerHelper.fromObjectSerializedString(job.getResult());
+        return null;
+    }
+
     private void checkQueue(long queueId) {
         while (true) {
             try {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/storage/VmWorkMigrateVolume.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VmWorkMigrateVolume.java b/server/src/com/cloud/storage/VmWorkMigrateVolume.java
new file mode 100644
index 0000000..c83e02d
--- /dev/null
+++ b/server/src/com/cloud/storage/VmWorkMigrateVolume.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.storage;
+
+import com.cloud.vm.VmWork;
+
+public class VmWorkMigrateVolume extends VmWork {
+    private static final long serialVersionUID = -565778516928408602L;
+
+    private long volumeId;
+    private long destPoolId;
+    private boolean liveMigrate;
+
+    public VmWorkMigrateVolume(long userId, long accountId, long vmId, String handlerName, long volumeId, long destPoolId, boolean liveMigrate) {
+        super(userId, accountId, vmId, handlerName);
+        this.volumeId = volumeId;
+        this.destPoolId = destPoolId;
+        this.liveMigrate = liveMigrate;
+    }
+
+    public long getVolumeId() {
+        return volumeId;
+    }
+
+    public long getDestPoolId() {
+        return destPoolId;
+    }
+
+    public boolean isLiveMigrate() {
+        return liveMigrate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/storage/VmWorkResizeVolume.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VmWorkResizeVolume.java b/server/src/com/cloud/storage/VmWorkResizeVolume.java
new file mode 100644
index 0000000..3ccaecd
--- /dev/null
+++ b/server/src/com/cloud/storage/VmWorkResizeVolume.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.storage;
+
+import com.cloud.vm.VmWork;
+
+public class VmWorkResizeVolume extends VmWork {
+    private static final long serialVersionUID = 6112366316907642498L;
+
+    private long volumeId;
+    private long currentSize;
+    private long newSize;
+    private Long newServiceOfferingId;
+    private boolean shrinkOk;
+
+    public VmWorkResizeVolume(long userId, long accountId, long vmId, String handlerName,
+            long volumeId, long currentSize, long newSize, Long newServiceOfferingId, boolean shrinkOk) {
+
+        super(userId, accountId, vmId, handlerName);
+
+        this.volumeId = volumeId;
+        this.currentSize = currentSize;
+        this.newSize = newSize;
+        this.newServiceOfferingId = newServiceOfferingId;
+        this.shrinkOk = shrinkOk;
+    }
+
+    public long getVolumeId() {
+        return volumeId;
+    }
+
+    public long getCurrentSize() {
+        return currentSize;
+    }
+
+    public long getNewSize() {
+        return newSize;
+    }
+
+    public Long getNewServiceOfferingId() {
+        return newServiceOfferingId;
+    }
+
+    public boolean isShrinkOk() {
+        return shrinkOk;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/storage/VolumeApiServiceImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
index e8cbbfe..a305f0d 100644
--- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
@@ -901,6 +901,43 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             _resourceLimitMgr.checkResourceLimit(_accountMgr.getAccount(volume.getAccountId()), ResourceType.primary_storage, new Long(newSize - currentSize));
         }
 
+        if (userVm != null) {
+            // serialize VM operation
+            AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+            if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+                // avoid re-entrance
+                return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
+                        newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+            } else {
+                Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize,
+                        newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+
+                Volume vol = null;
+                try {
+                    vol = outcome.get();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Operation is interrupted", e);
+                } catch (java.util.concurrent.ExecutionException e) {
+                    throw new RuntimeException("Execution excetion", e);
+                }
+
+                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+                if (jobResult != null) {
+                    if (jobResult instanceof ConcurrentOperationException)
+                        throw (ConcurrentOperationException)jobResult;
+                    else if (jobResult instanceof Throwable)
+                        throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
+                }
+                return volume;
+            }
+        }
+        return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
+                newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
+    }
+
+    private VolumeVO orchestrateResizeVolume(long volumeId, long currentSize, long newSize, Long newDiskOfferingId, boolean shrinkOk) {
+        VolumeVO volume = _volsDao.findById(volumeId);
+        UserVmVO userVm = _userVmDao.findById(volume.getInstanceId());
         /*
          * get a list of hosts to send the commands to, try the system the
          * associated vm is running on first, then the last known place it ran.
@@ -938,8 +975,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
 
             volume = _volsDao.findById(volume.getId());
 
-            if (newDiskOffering != null) {
-                volume.setDiskOfferingId(cmd.getNewDiskOfferingId());
+            if (newDiskOfferingId != null) {
+                volume.setDiskOfferingId(newDiskOfferingId);
             }
             _volsDao.update(volume.getId(), volume);
             // Log usage event for volumes belonging user VM's only
@@ -1073,12 +1110,12 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 throw new RuntimeException("Execution excetion", e);
             }
 
-            Throwable jobException = retrieveExecutionException(outcome.getJob());
-            if (jobException != null) {
-                if (jobException instanceof ConcurrentOperationException)
-                    throw (ConcurrentOperationException)jobException;
-                else
-                    throw new RuntimeException("Unexpected exception", jobException);
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof Throwable)
+                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
             }
             return vol;
         }
@@ -1386,12 +1423,12 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 throw new RuntimeException("Execution excetion", e);
             }
 
-            Throwable jobException = retrieveExecutionException(outcome.getJob());
-            if (jobException != null) {
-                if (jobException instanceof ConcurrentOperationException)
-                    throw (ConcurrentOperationException)jobException;
-                else
-                    throw new RuntimeException("Unexpected exception", jobException);
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof Throwable)
+                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
             }
             return vol;
         }
@@ -1520,6 +1557,48 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             throw new InvalidParameterValueException("Migration of volume from local storage pool is not supported");
         }
 
+        if (vm != null) {
+            // serialize VM operation
+            AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+            if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+                // avoid re-entrance
+                return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+            } else {
+                Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume);
+
+                try {
+                    outcome.get();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException("Operation is interrupted", e);
+                } catch (java.util.concurrent.ExecutionException e) {
+                    throw new RuntimeException("Execution excetion", e);
+                }
+
+                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+                if (jobResult != null) {
+                    if (jobResult instanceof ConcurrentOperationException)
+                        throw (ConcurrentOperationException)jobResult;
+                    else if (jobResult instanceof Throwable)
+                        throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
+                }
+
+                // retrieve the migrated new volume from job result
+                if (jobResult != null && jobResult instanceof Long) {
+                    return _entityMgr.findById(VolumeVO.class, ((Long)jobResult).longValue());
+                }
+                return null;
+            }
+        }
+
+        return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
+    }
+
+    private Volume orchestrateMigrateVolume(long volumeId, long destPoolId, boolean liveMigrateVolume) {
+        VolumeVO vol = _volsDao.findById(volumeId);
+        assert (vol != null);
+        StoragePool destPool = (StoragePool)dataStoreMgr.getDataStore(destPoolId, DataStoreRole.Primary);
+        assert (destPool != null);
+
         Volume newVol = null;
         if (liveMigrateVolume) {
             newVol = liveMigrateVolume(vol, destPool);
@@ -1946,20 +2025,6 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
         _storagePoolAllocators = storagePoolAllocators;
     }
 
-    public Throwable retrieveExecutionException(AsyncJob job) {
-        assert (job != null);
-        assert (job.getDispatcher().equals(VmWorkConstants.VM_WORK_JOB_DISPATCHER));
-
-        AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
-        if (jobVo != null && jobVo.getResult() != null) {
-            Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
-
-            if (obj != null && obj instanceof Throwable)
-                return (Throwable)obj;
-        }
-        return null;
-    }
-
     public class VmJobSyncOutcome extends OutcomeImpl<Volume> {
         private long _volumeId;
 
@@ -2074,6 +2139,98 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 volumeId);
     }
 
+    public Outcome<Volume> resizeVolumeThroughJobQueue(final Long vmId, final long volumeId,
+            final long currentSize, final long newSize, final Long newServiceOfferingId, final boolean shrinkOk) {
+
+        final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VmWorkJobVO workJob = null;
+
+                _vmInstanceDao.lockRow(vm.getId(), true);
+                workJob = new VmWorkJobVO(context.getContextId());
+
+                workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+                workJob.setCmd(VmWorkResizeVolume.class.getName());
+
+                workJob.setAccountId(callingAccount.getId());
+                workJob.setUserId(callingUser.getId());
+                workJob.setStep(VmWorkJobVO.Step.Starting);
+                workJob.setVmType(vm.getType());
+                workJob.setVmInstanceId(vm.getId());
+
+                // save work context info (there are some duplications)
+                VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
+                        VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, currentSize, newSize, newServiceOfferingId, shrinkOk);
+                workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+                _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+
+                // Transaction syntax sugar has a cost here
+                context.putContextParameter("workJob", workJob);
+                context.putContextParameter("jobId", new Long(workJob.getId()));
+            }
+        });
+
+        final long jobId = (Long)context.getContextParameter("jobId");
+        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+                volumeId);
+    }
+
+    public Outcome<Volume> migrateVolumeThroughJobQueue(final Long vmId, final long volumeId,
+            final long destPoolId, final boolean liveMigrate) {
+
+        final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VmWorkJobVO workJob = null;
+
+                _vmInstanceDao.lockRow(vm.getId(), true);
+                workJob = new VmWorkJobVO(context.getContextId());
+
+                workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+                workJob.setCmd(VmWorkMigrateVolume.class.getName());
+
+                workJob.setAccountId(callingAccount.getId());
+                workJob.setUserId(callingUser.getId());
+                workJob.setStep(VmWorkJobVO.Step.Starting);
+                workJob.setVmType(vm.getType());
+                workJob.setVmInstanceId(vm.getId());
+
+                // save work context info (there are some duplications)
+                VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
+                        VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, destPoolId, liveMigrate);
+                workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+                _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+
+                // Transaction syntax sugar has a cost here
+                context.putContextParameter("workJob", workJob);
+                context.putContextParameter("jobId", new Long(workJob.getId()));
+            }
+        });
+
+        final long jobId = (Long)context.getContextParameter("jobId");
+        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+                volumeId);
+    }
+
     @Override
     public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
         VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
@@ -2111,6 +2268,36 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                         + ", volId: " + detachWork.getVolumeId());
 
             return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+        } else if (work instanceof VmWorkResizeVolume) {
+            VmWorkResizeVolume resizeWork = (VmWorkResizeVolume)work;
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId()
+                        + ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize());
+
+            orchestrateResizeVolume(resizeWork.getVolumeId(), resizeWork.getCurrentSize(), resizeWork.getNewSize(),
+                    resizeWork.getNewServiceOfferingId(), resizeWork.isShrinkOk());
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Done executing Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId()
+                        + ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize());
+
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+
+        } else if (work instanceof VmWorkMigrateVolume) {
+            VmWorkMigrateVolume migrateWork = (VmWorkMigrateVolume)work;
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId()
+                        + ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
+
+            Volume newVol = orchestrateMigrateVolume(migrateWork.getVolumeId(), migrateWork.getDestPoolId(), migrateWork.isLiveMigrate());
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Done executing Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId()
+                        + ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
+
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(newVol.getId())));
         } else {
             RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
             String exceptionJson = JobSerializerHelper.toSerializedString(e);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index a35a27d..aec1506 100644
--- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -27,14 +27,25 @@ import javax.ejb.Local;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
 import org.apache.cloudstack.api.command.user.vmsnapshot.ListVMSnapshotCmd;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotOptions;
 import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotStrategy;
+import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
+import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.event.ActionEvent;
 import com.cloud.event.EventTypes;
@@ -54,15 +65,22 @@ import com.cloud.storage.dao.SnapshotDao;
 import com.cloud.storage.dao.VolumeDao;
 import com.cloud.user.Account;
 import com.cloud.user.AccountManager;
+import com.cloud.user.User;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.uservm.UserVm;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Predicate;
 import com.cloud.utils.Ternary;
 import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.db.EntityManager;
 import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
@@ -70,14 +88,24 @@ import com.cloud.vm.VirtualMachine;
 import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.VirtualMachineManager;
 import com.cloud.vm.VirtualMachineProfile;
+import com.cloud.vm.VmWork;
+import com.cloud.vm.VmWorkConstants;
+import com.cloud.vm.VmWorkJobHandler;
+import com.cloud.vm.VmWorkSerializer;
 import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.dao.VMInstanceDao;
 import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 
 @Component
 @Local(value = { VMSnapshotManager.class, VMSnapshotService.class })
-public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotManager, VMSnapshotService {
+public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotManager, VMSnapshotService, VmWorkJobHandler {
     private static final Logger s_logger = Logger.getLogger(VMSnapshotManagerImpl.class);
+
+    public static final String VM_WORK_JOB_HANDLER = VMSnapshotManagerImpl.class.getSimpleName();
+
     String _name;
+    @Inject
+    VMInstanceDao _vmInstanceDao;
     @Inject VMSnapshotDao _vmSnapshotDao;
     @Inject VolumeDao _volumeDao;
     @Inject AccountDao _accountDao;
@@ -91,9 +119,21 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     @Inject
     StorageStrategyFactory storageStrategyFactory;
 
+    @Inject
+    EntityManager _entityMgr;
+    @Inject
+    AsyncJobManager _jobMgr;
+
     int _vmSnapshotMax;
     int _wait;
 
+    // TODO
+    static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
+            Boolean.class, "vm.job.enabled", "false",
+            "True to enable new VM sync model. false to use the old way", false);
+    static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
+            Long.class, "vm.job.check.interval", "3000",
+            "Interval in milliseconds to check if the job is complete", false);
 
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
@@ -104,7 +144,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
 
         _vmSnapshotMax = NumbersUtil.parseInt(_configDao.getValue("vmsnapshot.max"), VMSNAPSHOTMAX);
-        
+
         String value = _configDao.getValue("vmsnapshot.create.wait");
         _wait = NumbersUtil.parseInt(value, 1800);
 
@@ -129,7 +169,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         boolean listAll = cmd.listAll();
         Long id = cmd.getId();
         Long vmId = cmd.getVmId();
-        
+
         String state = cmd.getState();
         String keyword = cmd.getKeyword();
         String name = cmd.getVmSnapshotName();
@@ -197,13 +237,13 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
 
         return _vmSnapshotDao.search(sc, searchFilter);
-    
+
     }
 
     protected Account getCaller(){
         return CallContext.current().getCallingAccount();
     }
-    
+
     @Override
     public VMSnapshot allocVMSnapshot(Long vmId, String vsDisplayName, String vsDescription, Boolean snapshotMemory)
             throws ResourceAllocationException {
@@ -225,7 +265,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
             throw new InvalidParameterValueException("Creating VM snapshot failed due to length of VM snapshot vsDisplayName should not exceed 255");
         if(vsDescription != null && vsDescription.length()>255)
             throw new InvalidParameterValueException("Creating VM snapshot failed due to length of VM snapshot vsDescription should not exceed 255");
-        
+
         // VM snapshot display name must be unique for a VM
         String timeString = DateUtil.getDateDisplayString(DateUtil.GMT_TIMEZONE, new Date(), DateUtil.YYYYMMDD_FORMAT);
         String vmSnapshotName = userVmVo.getInstanceName() + "_VS_" + timeString;
@@ -235,21 +275,21 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         if(_vmSnapshotDao.findByName(vmId,vsDisplayName) != null){
             throw new InvalidParameterValueException("Creating VM snapshot failed due to VM snapshot with name" + vsDisplayName + "  already exists");
         }
-        
+
         // check VM state
         if (userVmVo.getState() != VirtualMachine.State.Running && userVmVo.getState() != VirtualMachine.State.Stopped) {
             throw new InvalidParameterValueException("Creating vm snapshot failed due to VM:" + vmId + " is not in the running or Stopped state");
         }
-        
+
         if(snapshotMemory && userVmVo.getState() == VirtualMachine.State.Stopped){
             throw new InvalidParameterValueException("Can not snapshot memory when VM is in stopped state");
         }
-        
+
         // for KVM, only allow snapshot with memory when VM is in running state
         if(userVmVo.getHypervisorType() == HypervisorType.KVM && userVmVo.getState() == State.Running && !snapshotMemory){
             throw new InvalidParameterValueException("KVM VM does not allow to take a disk-only snapshot when VM is in running state");
         }
-        
+
         // check access
         _accountMgr.checkAccess(caller, null, true, userVmVo);
 
@@ -269,16 +309,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                         "There is other active volume snapshot tasks on the instance to which the volume is attached, please try again later.");
             }
         }
-        
+
         // check if there are other active VM snapshot tasks
         if (hasActiveVMSnapshotTasks(vmId)) {
             throw new CloudRuntimeException("There is other active vm snapshot tasks on the instance, please try again later");
         }
-        
+
         VMSnapshot.Type vmSnapshotType = VMSnapshot.Type.Disk;
         if(snapshotMemory && userVmVo.getState() == VirtualMachine.State.Running)
             vmSnapshotType = VMSnapshot.Type.DiskAndMemory;
-        
+
         try {
             VMSnapshotVO vmSnapshotVo = new VMSnapshotVO(userVmVo.getAccountId(), userVmVo.getDomainId(), vmId, vsDescription, vmSnapshotName,
                     vsDisplayName, userVmVo.getServiceOfferingId(), vmSnapshotType, null);
@@ -320,6 +360,46 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         if(vmSnapshot == null){
             throw new CloudRuntimeException("VM snapshot id: " + vmSnapshotId + " can not be found");
         }
+
+        // serialize VM operation
+        AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+        if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+            // avoid re-entrance
+            return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
+        } else {
+            Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm);
+
+            VMSnapshot result = null;
+            try {
+                result = outcome.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+            } catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+            }
+
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof Throwable)
+                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
+            }
+
+            return result;
+        }
+    }
+
+    private VMSnapshot orchestrateCreateVMSnapshot(Long vmId, Long vmSnapshotId, Boolean quiescevm) {
+        UserVmVO userVm = _userVMDao.findById(vmId);
+        if (userVm == null) {
+            throw new InvalidParameterValueException("Create vm to snapshot failed due to vm: " + vmId + " is not found");
+        }
+        VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId);
+        if (vmSnapshot == null) {
+            throw new CloudRuntimeException("VM snapshot id: " + vmSnapshotId + " can not be found");
+        }
+
         VMSnapshotOptions options = new VMSnapshotOptions(quiescevm);
         vmSnapshot.setOptions(options);
         try {
@@ -327,13 +407,13 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
             VMSnapshot snapshot = strategy.takeVMSnapshot(vmSnapshot);
             return snapshot;
         } catch (Exception e) {
-            s_logger.debug("Failed to create vm snapshot: " + vmSnapshotId ,e);
+            s_logger.debug("Failed to create vm snapshot: " + vmSnapshotId, e);
             return null;
         }
     }
 
     public VMSnapshotManagerImpl() {
-        
+
     }
 
     @Override
@@ -354,12 +434,12 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
 
         _accountMgr.checkAccess(caller, null, true, vmSnapshot);
-        
+
         // check VM snapshot states, only allow to delete vm snapshots in created and error state
         if (VMSnapshot.State.Ready != vmSnapshot.getState() && VMSnapshot.State.Expunging != vmSnapshot.getState() && VMSnapshot.State.Error != vmSnapshot.getState()) {
             throw new InvalidParameterValueException("Can't delete the vm snapshotshot " + vmSnapshotId + " due to it is not in Created or Error, or Expunging State");
         }
-        
+
         // check if there are other active VM snapshot tasks
         if (hasActiveVMSnapshotTasks(vmSnapshot.getVmId())) {
             List<VMSnapshotVO> expungingSnapshots = _vmSnapshotDao.listByInstanceId(vmSnapshot.getVmId(), VMSnapshot.State.Expunging);
@@ -369,9 +449,65 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
         }
 
-        if(vmSnapshot.getState() == VMSnapshot.State.Allocated){
+        // serialize VM operation
+        AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+        if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+            // avoid re-entrance
+            return orchestrateDeleteVMSnapshot(vmSnapshotId);
+        } else {
+            Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId);
+
+            VMSnapshot result = null;
+            try {
+                result = outcome.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+            } catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+            }
+
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof Throwable)
+                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
+            }
+
+            if (jobResult instanceof Boolean)
+                return ((Boolean)jobResult).booleanValue();
+
+            return false;
+        }
+    }
+
+    public boolean orchestrateDeleteVMSnapshot(Long vmSnapshotId) {
+        Account caller = getCaller();
+
+        VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId);
+        if (vmSnapshot == null) {
+            throw new InvalidParameterValueException("unable to find the vm snapshot with id " + vmSnapshotId);
+        }
+
+        _accountMgr.checkAccess(caller, null, true, vmSnapshot);
+
+        // check VM snapshot states, only allow to delete vm snapshots in created and error state
+        if (VMSnapshot.State.Ready != vmSnapshot.getState() && VMSnapshot.State.Expunging != vmSnapshot.getState() && VMSnapshot.State.Error != vmSnapshot.getState()) {
+            throw new InvalidParameterValueException("Can't delete the vm snapshotshot " + vmSnapshotId + " due to it is not in Created or Error, or Expunging State");
+        }
+
+        // check if there are other active VM snapshot tasks
+        if (hasActiveVMSnapshotTasks(vmSnapshot.getVmId())) {
+            List<VMSnapshotVO> expungingSnapshots = _vmSnapshotDao.listByInstanceId(vmSnapshot.getVmId(), VMSnapshot.State.Expunging);
+            if (expungingSnapshots.size() > 0 && expungingSnapshots.get(0).getId() == vmSnapshot.getId())
+                s_logger.debug("Target VM snapshot already in expunging state, go on deleting it: " + vmSnapshot.getDisplayName());
+            else
+                throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
+        }
+
+        if (vmSnapshot.getState() == VMSnapshot.State.Allocated) {
             return _vmSnapshotDao.remove(vmSnapshot.getId());
-        } else{
+        } else {
             try {
                 VMSnapshotStrategy strategy = findVMSnapshotStrategy(vmSnapshot);
                 return strategy.deleteVMSnapshot(vmSnapshot);
@@ -400,12 +536,83 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                     + vmSnapshotId + " failed due to vm: " + vmId
                     + " is not found");
         }
-        
+
+        // check if there are other active VM snapshot tasks
+        if (hasActiveVMSnapshotTasks(vmId)) {
+            throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
+        }
+
+        Account caller = getCaller();
+        _accountMgr.checkAccess(caller, null, true, vmSnapshotVo);
+
+        // VM should be in running or stopped states
+        if (userVm.getState() != VirtualMachine.State.Running
+                && userVm.getState() != VirtualMachine.State.Stopped) {
+            throw new InvalidParameterValueException(
+                    "VM Snapshot reverting failed due to vm is not in the state of Running or Stopped.");
+        }
+
+        // if snapshot is not created, error out
+        if (vmSnapshotVo.getState() != VMSnapshot.State.Ready) {
+            throw new InvalidParameterValueException(
+                    "VM Snapshot reverting failed due to vm snapshot is not in the state of Created.");
+        }
+
+        // serialize VM operation
+        AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+        if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+            // avoid re-entrance
+            return orchestrateRevertToVMSnapshot(vmSnapshotId);
+        } else {
+            Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId);
+
+            VMSnapshot result = null;
+            try {
+                result = outcome.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+            } catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+            }
+
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof InsufficientCapacityException)
+                    throw (InsufficientCapacityException)jobResult;
+                else if (jobResult instanceof ResourceUnavailableException)
+                    throw (ResourceUnavailableException)jobResult;
+                else if (jobResult instanceof Throwable)
+                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
+            }
+
+            return userVm;
+        }
+    }
+
+    public UserVm orchestrateRevertToVMSnapshot(Long vmSnapshotId) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException {
+
+        // check if VM snapshot exists in DB
+        VMSnapshotVO vmSnapshotVo = _vmSnapshotDao.findById(vmSnapshotId);
+        if (vmSnapshotVo == null) {
+            throw new InvalidParameterValueException(
+                    "unable to find the vm snapshot with id " + vmSnapshotId);
+        }
+        Long vmId = vmSnapshotVo.getVmId();
+        UserVmVO userVm = _userVMDao.findById(vmId);
+        // check if VM exists
+        if (userVm == null) {
+            throw new InvalidParameterValueException("Revert vm to snapshot: "
+                    + vmSnapshotId + " failed due to vm: " + vmId
+                    + " is not found");
+        }
+
         // check if there are other active VM snapshot tasks
         if (hasActiveVMSnapshotTasks(vmId)) {
             throw new InvalidParameterValueException("There is other active vm snapshot tasks on the instance, please try again later");
         }
-        
+
         Account caller = getCaller();
         _accountMgr.checkAccess(caller, null, true, vmSnapshotVo);
 
@@ -415,7 +622,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
             throw new InvalidParameterValueException(
                     "VM Snapshot reverting failed due to vm is not in the state of Running or Stopped.");
         }
-        
+
         // if snapshot is not created, error out
         if (vmSnapshotVo.getState() != VMSnapshot.State.Ready) {
             throw new InvalidParameterValueException(
@@ -424,19 +631,19 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
 
         UserVmVO vm = null;
         Long hostId = null;
-        
+
         // start or stop VM first, if revert from stopped state to running state, or from running to stopped
-        if(userVm.getState() == VirtualMachine.State.Stopped && vmSnapshotVo.getType() == VMSnapshot.Type.DiskAndMemory){
+        if (userVm.getState() == VirtualMachine.State.Stopped && vmSnapshotVo.getType() == VMSnapshot.Type.DiskAndMemory) {
             try {
                 _itMgr.advanceStart(userVm.getUuid(), new HashMap<VirtualMachineProfile.Param, Object>(), null);
                 vm = _userVMDao.findById(userVm.getId());
-        	    hostId = vm.getHostId();
-        	} catch (Exception e) {
-        	    s_logger.error("Start VM " + userVm.getInstanceName() + " before reverting failed due to " + e.getMessage());
-        	    throw new CloudRuntimeException(e.getMessage());
-        	}
-        }else {
-            if(userVm.getState() == VirtualMachine.State.Running && vmSnapshotVo.getType() == VMSnapshot.Type.Disk){
+                hostId = vm.getHostId();
+            } catch (Exception e) {
+                s_logger.error("Start VM " + userVm.getInstanceName() + " before reverting failed due to " + e.getMessage());
+                throw new CloudRuntimeException(e.getMessage());
+            }
+        } else {
+            if (userVm.getState() == VirtualMachine.State.Running && vmSnapshotVo.getType() == VMSnapshot.Type.Disk) {
                 try {
                     _itMgr.advanceStop(userVm.getUuid(), true);
                 } catch (Exception e) {
@@ -481,6 +688,38 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
 
     @Override
     public boolean deleteAllVMSnapshots(long vmId, VMSnapshot.Type type) {
+        // serialize VM operation
+        AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+        if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+            // avoid re-entrance
+            return orchestrateDeleteAllVMSnapshots(vmId, type);
+        } else {
+            Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type);
+
+            try {
+                outcome.get();
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+            } catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+            }
+
+            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
+            if (jobResult != null) {
+                if (jobResult instanceof ConcurrentOperationException)
+                    throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof Throwable)
+                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
+            }
+
+            if (jobResult instanceof Boolean)
+                return (Boolean)jobResult;
+
+            return false;
+        }
+    }
+
+    private boolean orchestrateDeleteAllVMSnapshots(long vmId, VMSnapshot.Type type) {
         boolean result = true;
         List<VMSnapshotVO> listVmSnapshots = _vmSnapshotDao.findByVm(vmId);
         if (listVmSnapshots == null || listVmSnapshots.isEmpty()) {
@@ -488,7 +727,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
         for (VMSnapshotVO snapshot : listVmSnapshots) {
             VMSnapshotVO target = _vmSnapshotDao.findById(snapshot.getId());
-            if(type != null && target.getType() != type)
+            if (type != null && target.getType() != type)
                 continue;
             VMSnapshotStrategy strategy = findVMSnapshotStrategy(target);
             if (!strategy.deleteVMSnapshot(target)) {
@@ -502,11 +741,11 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     @Override
     public boolean syncVMSnapshot(VMInstanceVO vm, Long hostId) {
         try{
-            
+
             UserVmVO userVm = _userVMDao.findById(vm.getId());
             if(userVm == null)
                 return false;
-            
+
             List<VMSnapshotVO> vmSnapshotsInExpungingStates = _vmSnapshotDao.listByInstanceId(vm.getId(), VMSnapshot.State.Expunging, VMSnapshot.State.Reverting, VMSnapshot.State.Creating);
             for (VMSnapshotVO vmSnapshotVO : vmSnapshotsInExpungingStates) {
                 VMSnapshotStrategy strategy = findVMSnapshotStrategy(vmSnapshotVO);
@@ -527,5 +766,300 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
         return false;
     }
- 
+
+    public class VmJobSyncOutcome extends OutcomeImpl<VMSnapshot> {
+        private long _vmSnapshotId;
+
+        public VmJobSyncOutcome(final AsyncJob job, final long vmSnapshotId) {
+            super(VMSnapshot.class, job, VmJobCheckInterval.value(), new Predicate() {
+                @Override
+                public boolean checkCondition() {
+                    AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
+                    assert (jobVo != null);
+                    if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
+                        return true;
+
+                    return false;
+                }
+            }, AsyncJob.Topics.JOB_STATE);
+            _vmSnapshotId = vmSnapshotId;
+        }
+
+        @Override
+        protected VMSnapshot retrieve() {
+            return _vmSnapshotDao.findById(_vmSnapshotId);
+        }
+    }
+
+    public class VmJobSyncVirtualMachineOutcome extends OutcomeImpl<VirtualMachine> {
+        long vmId;
+
+        public VmJobSyncVirtualMachineOutcome(final AsyncJob job, final long vmId) {
+            super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
+                @Override
+                public boolean checkCondition() {
+                    AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
+                    assert (jobVo != null);
+                    if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
+                        return true;
+
+                    return false;
+                }
+            }, AsyncJob.Topics.JOB_STATE);
+        }
+
+        @Override
+        protected VirtualMachine retrieve() {
+            return _vmInstanceDao.findById(vmId);
+        }
+    }
+
+    public Outcome<VMSnapshot> createVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId, final boolean quiesceVm) {
+
+        final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VmWorkJobVO workJob = null;
+
+                _vmInstanceDao.lockRow(vm.getId(), true);
+                workJob = new VmWorkJobVO(context.getContextId());
+
+                workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+                workJob.setCmd(VmWorkCreateVMSnapshot.class.getName());
+
+                workJob.setAccountId(callingAccount.getId());
+                workJob.setUserId(callingUser.getId());
+                workJob.setStep(VmWorkJobVO.Step.Starting);
+                workJob.setVmType(vm.getType());
+                workJob.setVmInstanceId(vm.getId());
+
+                // save work context info (there are some duplications)
+                VmWorkCreateVMSnapshot workInfo = new VmWorkCreateVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(),
+                        VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId, quiesceVm);
+                workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+                _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+
+                // Transaction syntax sugar has a cost here
+                context.putContextParameter("workJob", workJob);
+                context.putContextParameter("jobId", new Long(workJob.getId()));
+            }
+        });
+
+        final long jobId = (Long)context.getContextParameter("jobId");
+        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+                vmSnapshotId);
+    }
+
+    public Outcome<VMSnapshot> deleteVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId) {
+
+        final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VmWorkJobVO workJob = null;
+
+                _vmInstanceDao.lockRow(vm.getId(), true);
+                workJob = new VmWorkJobVO(context.getContextId());
+
+                workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+                workJob.setCmd(VmWorkDeleteVMSnapshot.class.getName());
+
+                workJob.setAccountId(callingAccount.getId());
+                workJob.setUserId(callingUser.getId());
+                workJob.setStep(VmWorkJobVO.Step.Starting);
+                workJob.setVmType(vm.getType());
+                workJob.setVmInstanceId(vm.getId());
+
+                // save work context info (there are some duplications)
+                VmWorkDeleteVMSnapshot workInfo = new VmWorkDeleteVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(),
+                        VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId);
+                workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+                _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+
+                // Transaction syntax sugar has a cost here
+                context.putContextParameter("workJob", workJob);
+                context.putContextParameter("jobId", new Long(workJob.getId()));
+            }
+        });
+
+        final long jobId = (Long)context.getContextParameter("jobId");
+        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+                vmSnapshotId);
+    }
+
+    public Outcome<VMSnapshot> revertToVMSnapshotThroughJobQueue(final Long vmId, final Long vmSnapshotId) {
+
+        final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VmWorkJobVO workJob = null;
+
+                _vmInstanceDao.lockRow(vm.getId(), true);
+                workJob = new VmWorkJobVO(context.getContextId());
+
+                workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+                workJob.setCmd(VmWorkRevertToVMSnapshot.class.getName());
+
+                workJob.setAccountId(callingAccount.getId());
+                workJob.setUserId(callingUser.getId());
+                workJob.setStep(VmWorkJobVO.Step.Starting);
+                workJob.setVmType(vm.getType());
+                workJob.setVmInstanceId(vm.getId());
+
+                // save work context info (there are some duplications)
+                VmWorkRevertToVMSnapshot workInfo = new VmWorkRevertToVMSnapshot(callingUser.getId(), callingAccount.getId(), vm.getId(),
+                        VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, vmSnapshotId);
+                workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+                _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+
+                // Transaction syntax sugar has a cost here
+                context.putContextParameter("workJob", workJob);
+                context.putContextParameter("jobId", new Long(workJob.getId()));
+            }
+        });
+
+        final long jobId = (Long)context.getContextParameter("jobId");
+        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+                vmSnapshotId);
+    }
+
+    public Outcome<VirtualMachine> deleteAllVMSnapshotsThroughJobQueue(final Long vmId, final VMSnapshot.Type type) {
+
+        final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                VmWorkJobVO workJob = null;
+
+                _vmInstanceDao.lockRow(vm.getId(), true);
+                workJob = new VmWorkJobVO(context.getContextId());
+
+                workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+                workJob.setCmd(VmWorkDeleteAllVMSnapshots.class.getName());
+
+                workJob.setAccountId(callingAccount.getId());
+                workJob.setUserId(callingUser.getId());
+                workJob.setStep(VmWorkJobVO.Step.Starting);
+                workJob.setVmType(vm.getType());
+                workJob.setVmInstanceId(vm.getId());
+
+                // save work context info (there are some duplications)
+                VmWorkDeleteAllVMSnapshots workInfo = new VmWorkDeleteAllVMSnapshots(callingUser.getId(), callingAccount.getId(), vm.getId(),
+                        VMSnapshotManagerImpl.VM_WORK_JOB_HANDLER, type);
+                workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+                _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
+
+                // Transaction syntax sugar has a cost here
+                context.putContextParameter("workJob", workJob);
+                context.putContextParameter("jobId", new Long(workJob.getId()));
+            }
+        });
+
+        final long jobId = (Long)context.getContextParameter("jobId");
+        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+
+        return new VmJobSyncVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"),
+                vmId);
+    }
+
+    @Override
+    public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
+
+        if (work instanceof VmWorkCreateVMSnapshot) {
+            VmWorkCreateVMSnapshot createWork = (VmWorkCreateVMSnapshot)work;
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId()
+                        + ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm());
+
+            VMSnapshot vmSnapshot = orchestrateCreateVMSnapshot(createWork.getVmId(), createWork.getVmSnapshotId(), createWork.isQuiesceVm());
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId()
+                        + ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm());
+
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(vmSnapshot.getId())));
+        } else if (work instanceof VmWorkDeleteVMSnapshot) {
+            VmWorkDeleteVMSnapshot deleteWork = (VmWorkDeleteVMSnapshot)work;
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId()
+                        + ", VM snapshotId: " + deleteWork.getVmSnapshotId());
+
+            boolean result = orchestrateDeleteVMSnapshot(deleteWork.getVmSnapshotId());
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Done executing Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId()
+                        + ", VM snapshotId: " + deleteWork.getVmSnapshotId());
+
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
+
+        } else if (work instanceof VmWorkRevertToVMSnapshot) {
+            VmWorkRevertToVMSnapshot revertWork = (VmWorkRevertToVMSnapshot)work;
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId()
+                        + ", VM snapshotId: " + revertWork.getVmSnapshotId());
+
+            orchestrateRevertToVMSnapshot(revertWork.getVmSnapshotId());
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Done executing Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId()
+                        + ", VM snapshotId: " + revertWork.getVmSnapshotId());
+
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+
+        } else if (work instanceof VmWorkDeleteAllVMSnapshots) {
+            VmWorkDeleteAllVMSnapshots deleteAllWork = (VmWorkDeleteAllVMSnapshots)work;
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId());
+
+            boolean result = orchestrateDeleteAllVMSnapshots(deleteAllWork.getVmId(), deleteAllWork.getSnapshotType());
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId());
+
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
+
+        } else {
+
+            RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
+            String exceptionJson = JobSerializerHelper.toSerializedString(e);
+            s_logger.error("Serialize exception object into json: " + exceptionJson);
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java b/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java
new file mode 100644
index 0000000..3371802
--- /dev/null
+++ b/server/src/com/cloud/vm/snapshot/VmWorkCreateVMSnapshot.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm.snapshot;
+
+import com.cloud.vm.VmWork;
+
+public class VmWorkCreateVMSnapshot extends VmWork {
+    private static final long serialVersionUID = 124386202146049838L;
+
+    private Long vmSnapshotId;
+    private boolean quiesceVm;
+
+    public VmWorkCreateVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId, boolean quiesceVm) {
+        super(userId, accountId, vmId, handlerName);
+
+        this.vmSnapshotId = vmSnapshotId;
+        this.quiesceVm = quiesceVm;
+    }
+
+    public Long getVmSnapshotId() {
+        return vmSnapshotId;
+    }
+
+    public boolean isQuiesceVm() {
+        return quiesceVm;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java b/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java
new file mode 100644
index 0000000..ce20dfc
--- /dev/null
+++ b/server/src/com/cloud/vm/snapshot/VmWorkDeleteAllVMSnapshots.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm.snapshot;
+
+import com.cloud.vm.VmWork;
+
+public class VmWorkDeleteAllVMSnapshots extends VmWork {
+    private static final long serialVersionUID = -6010083039865471888L;
+
+    private VMSnapshot.Type type;
+
+    public VmWorkDeleteAllVMSnapshots(long userId, long accountId, long vmId, String handlerName, VMSnapshot.Type type) {
+        super(userId, accountId, vmId, handlerName);
+
+        this.type = type;
+    }
+
+    public VMSnapshot.Type getSnapshotType() {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/497feab8/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java b/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java
new file mode 100644
index 0000000..1a80e39
--- /dev/null
+++ b/server/src/com/cloud/vm/snapshot/VmWorkDeleteVMSnapshot.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm.snapshot;
+
+import com.cloud.vm.VmWork;
+
+public class VmWorkDeleteVMSnapshot extends VmWork {
+    private static final long serialVersionUID = 7168101866614517508L;
+
+    private Long vmSnapshotId;
+
+    public VmWorkDeleteVMSnapshot(long userId, long accountId, long vmId, String handlerName, Long vmSnapshotId) {
+        super(userId, accountId, vmId, handlerName);
+
+        this.vmSnapshotId = vmSnapshotId;
+    }
+
+    public Long getVmSnapshotId() {
+        return vmSnapshotId;
+    }
+}