You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/11/21 03:33:48 UTC

[2/6] git commit: updated refs/heads/master to 278ef81

side-by-side VM sync management at manager level


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/278ef81a
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/278ef81a
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/278ef81a

Branch: refs/heads/master
Commit: 278ef81a8339e8dfc6ecd9dd04701d6713b9c062
Parents: 2d42b2d
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed Nov 20 17:46:45 2013 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Nov 20 17:59:38 2013 -0800

----------------------------------------------------------------------
 api/src/com/cloud/vm/VirtualMachine.java        |    7 +-
 .../apache/cloudstack/api/InternalIdentity.java |    4 +-
 .../apache/cloudstack/context/CallContext.java  |   12 +
 .../src/com/cloud/vm/VirtualMachineManager.java |   30 +-
 .../src/com/cloud/alert/AlertManager.java       |    2 +
 .../components-api/src/com/cloud/vm/VmWork.java |   45 +
 ...spring-engine-orchestration-core-context.xml |    1 +
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 1348 +++++++++++++++++-
 .../cloud/vm/VirtualMachinePowerStateSync.java  |    3 +-
 .../vm/VirtualMachinePowerStateSyncImpl.java    |   24 +-
 .../src/com/cloud/vm/VmWorkAddVmToNetwork.java  |   42 +
 .../src/com/cloud/vm/VmWorkJobDispatcher.java   |  152 ++
 .../src/com/cloud/vm/VmWorkMigrate.java         |   86 ++
 .../src/com/cloud/vm/VmWorkMigrateForScale.java |   48 +
 .../com/cloud/vm/VmWorkMigrateWithStorage.java  |   52 +
 .../src/com/cloud/vm/VmWorkReboot.java          |   68 +
 .../src/com/cloud/vm/VmWorkReconfigure.java     |   43 +
 .../src/com/cloud/vm/VmWorkRemoveNicFromVm.java |   33 +
 .../com/cloud/vm/VmWorkRemoveVmFromNetwork.java |   43 +
 .../src/com/cloud/vm/VmWorkSerializer.java      |   75 +
 .../src/com/cloud/vm/VmWorkStart.java           |  125 ++
 .../src/com/cloud/vm/VmWorkStop.java            |   32 +
 .../com/cloud/vm/VmWorkStorageMigration.java    |   35 +
 .../dao/ManagementServerHostDaoImpl.java        |    7 +-
 .../core/spring-framework-jobs-core-context.xml |    4 +-
 .../jobs/AsyncJobExecutionContext.java          |   10 +-
 26 files changed, 2274 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/api/src/com/cloud/vm/VirtualMachine.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java
index 9a8d883..3400898 100755
--- a/api/src/com/cloud/vm/VirtualMachine.java
+++ b/api/src/com/cloud/vm/VirtualMachine.java
@@ -213,7 +213,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
          * UserBareMetal is only used for selecting VirtualMachineGuru, there is no
          * VM with this type. UserBareMetal should treat exactly as User.
          */
-        UserBareMetal(false);
+        UserBareMetal(false),
+        
+        /*
+         * General VM type for queuing VM orchestration work
+         */
+        Instance(false);
 
         boolean _isUsedBySystem;
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/api/src/org/apache/cloudstack/api/InternalIdentity.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/InternalIdentity.java b/api/src/org/apache/cloudstack/api/InternalIdentity.java
index 1dfeb8c..4149dd1 100644
--- a/api/src/org/apache/cloudstack/api/InternalIdentity.java
+++ b/api/src/org/apache/cloudstack/api/InternalIdentity.java
@@ -16,11 +16,13 @@
 // under the License.
 package org.apache.cloudstack.api;
 
+import java.io.Serializable;
+
 // This interface is a contract that getId() will give the internal
 // ID of an entity which extends this interface
 // Any class having an internal ID in db table/schema should extend this
 // For example, all ControlledEntity, OwnedBy would have an internal ID
 
-public interface InternalIdentity {
+public interface InternalIdentity extends Serializable {
     long getId();
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/api/src/org/apache/cloudstack/context/CallContext.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/context/CallContext.java b/api/src/org/apache/cloudstack/context/CallContext.java
index 5439aee..3cdccc5 100644
--- a/api/src/org/apache/cloudstack/context/CallContext.java
+++ b/api/src/org/apache/cloudstack/context/CallContext.java
@@ -197,6 +197,18 @@ public class CallContext {
         }
         return register(user, account);
     }
+    
+    public static CallContext register(long callingUserId, long callingAccountId, String contextId) throws CloudAuthenticationException {
+        Account account = s_entityMgr.findById(Account.class, callingAccountId);
+        if (account == null) {
+            throw new CloudAuthenticationException("The account is no longer current.").add(Account.class, Long.toString(callingAccountId));
+        }
+        User user = s_entityMgr.findById(User.class, callingUserId);
+        if (user == null) {
+            throw new CloudAuthenticationException("The user is no longer current.").add(User.class, Long.toString(callingUserId));
+        }
+        return register(user, account, contextId);
+    }
 
     public static void unregisterAll() {
         while ( unregister() != null ) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/api/src/com/cloud/vm/VirtualMachineManager.java
----------------------------------------------------------------------
diff --git a/engine/api/src/com/cloud/vm/VirtualMachineManager.java b/engine/api/src/com/cloud/vm/VirtualMachineManager.java
index 35804af..9d19cf5 100644
--- a/engine/api/src/com/cloud/vm/VirtualMachineManager.java
+++ b/engine/api/src/com/cloud/vm/VirtualMachineManager.java
@@ -108,8 +108,13 @@ public interface VirtualMachineManager extends Manager {
     void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
             ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException;
 
+    void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
+    	ResourceUnavailableException, ConcurrentOperationException, OperationTimedoutException;
+    
     void advanceStop(String vmUuid, boolean cleanupEvenIfUnableToStop) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException;
 
+    void orchestrateStop(String vmUuid, boolean cleanupEvenIfUnableToStop) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException;
+
     void advanceExpunge(String vmUuid) throws ResourceUnavailableException, OperationTimedoutException, ConcurrentOperationException;
 
     void destroy(String vmUuid) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException;
@@ -117,11 +122,17 @@ public interface VirtualMachineManager extends Manager {
     void migrateAway(String vmUuid, long hostId) throws InsufficientServerCapacityException;
 
     void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException;
+    
+    void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException;
 
     void migrateWithStorage(String vmUuid, long srcId, long destId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException;
-
+    
+    void orchestrateMigrateWithStorage(String vmUuid, long srcId, long destId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException, ConcurrentOperationException;
+    
     void reboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ResourceUnavailableException;
 
+    void orchestrateReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ResourceUnavailableException;
+
     void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ResourceUnavailableException,
             ConcurrentOperationException, OperationTimedoutException;
 
@@ -137,6 +148,8 @@ public interface VirtualMachineManager extends Manager {
     VirtualMachine findById(long vmId);
 
     void storageMigration(String vmUuid, StoragePool storagePoolId);
+    
+    void orchestrateStorageMigration(String vmUuid, StoragePool storagePoolId);
 
     /**
      * @param vmInstance
@@ -161,7 +174,11 @@ public interface VirtualMachineManager extends Manager {
      * @throws InsufficientCapacityException
      */
     NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException,
-                ResourceUnavailableException, InsufficientCapacityException;
+        ResourceUnavailableException, InsufficientCapacityException;
+    
+    NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException,
+    	ResourceUnavailableException, InsufficientCapacityException;
+    
 
     /**
      * @param vm
@@ -172,6 +189,8 @@ public interface VirtualMachineManager extends Manager {
      */
     boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException;
 
+    boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException;
+
     /**
      * @param vm
      * @param network
@@ -181,6 +200,8 @@ public interface VirtualMachineManager extends Manager {
      * @throws ConcurrentOperationException
      */
     boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException;
+    
+    boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException;
 
     /**
      * @param nic
@@ -196,12 +217,15 @@ public interface VirtualMachineManager extends Manager {
      */
     VirtualMachineTO toVmTO(VirtualMachineProfile profile);
 
-
     VirtualMachine reConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException;
+    
+    VirtualMachine orchestrateReConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException;
 
     void findHostAndMigrate(String vmUuid, Long newSvcOfferingId, DeploymentPlanner.ExcludeList excludeHostList) throws InsufficientCapacityException,
             ConcurrentOperationException, ResourceUnavailableException;
 
     void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long newSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException;
+    
+    void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long newSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException;
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/components-api/src/com/cloud/alert/AlertManager.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/alert/AlertManager.java b/engine/components-api/src/com/cloud/alert/AlertManager.java
index 1ae6b1b..eb5ac0c 100755
--- a/engine/components-api/src/com/cloud/alert/AlertManager.java
+++ b/engine/components-api/src/com/cloud/alert/AlertManager.java
@@ -50,6 +50,8 @@ public interface AlertManager extends Manager {
     public static final short ALERT_TYPE_DIRECT_ATTACHED_PUBLIC_IP = 24;
     public static final short ALERT_TYPE_LOCAL_STORAGE = 25;
     public static final short ALERT_TYPE_RESOURCE_LIMIT_EXCEEDED = 26; // Generated when the resource limit exceeds the limit. Currently used for recurring snapshots only
+
+    public static final short ALERT_TYPE_SYNC = 27;
     
     static final ConfigKey<Double> StorageCapacityThreshold = new ConfigKey<Double>(Double.class, "cluster.storage.capacity.notificationthreshold", "Alert", "0.75",
         "Percentage (as a value between 0 and 1) of storage utilization above which alerts will be sent about low storage available.", true, ConfigKey.Scope.Cluster, null);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/components-api/src/com/cloud/vm/VmWork.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWork.java b/engine/components-api/src/com/cloud/vm/VmWork.java
new file mode 100644
index 0000000..3f9e71d
--- /dev/null
+++ b/engine/components-api/src/com/cloud/vm/VmWork.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+import java.io.Serializable;
+
+public class VmWork implements Serializable {
+	private static final long serialVersionUID = -6946320465729853589L;
+
+	long userId;
+	long accountId;
+	long vmId;
+
+    public VmWork(long userId, long accountId, long vmId) {
+        this.userId = userId;
+        this.accountId = accountId;
+        this.vmId = vmId;
+	}
+
+	public long getUserId() {
+		return userId;
+	}
+
+	public long getAccountId() {
+		return accountId;
+	}
+
+	public long getVmId() {
+		return vmId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
----------------------------------------------------------------------
diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
index b5c4254..880002c 100644
--- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
+++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
@@ -67,5 +67,6 @@
     
     <bean id="virtualMachineEntityImpl" class="org.apache.cloudstack.engine.cloud.entity.api.VirtualMachineEntityImpl" />
     
+    <bean id="virtualMachinePowerStateSyncImpl" class="com.cloud.vm.VirtualMachinePowerStateSyncImpl" />
     
 </beans>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 75a95c5..189c2ba 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -18,6 +18,10 @@
 package com.cloud.vm;
 
 import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -26,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -47,6 +52,18 @@ import org.apache.cloudstack.framework.config.ConfigDepot;
 import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
+import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
+import org.apache.cloudstack.jobs.JobInfo;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
@@ -153,8 +170,10 @@ import com.cloud.storage.dao.VolumeDao;
 import com.cloud.template.VirtualMachineTemplate;
 import com.cloud.user.Account;
 import com.cloud.user.User;
+import com.cloud.utils.DateUtil;
 import com.cloud.utils.Journal;
 import com.cloud.utils.Pair;
+import com.cloud.utils.Predicate;
 import com.cloud.utils.StringUtils;
 import com.cloud.utils.Ternary;
 import com.cloud.utils.component.ManagerBase;
@@ -163,8 +182,10 @@ import com.cloud.utils.db.DB;
 import com.cloud.utils.db.EntityManager;
 import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.TransactionCallbackWithException;
 import com.cloud.utils.db.TransactionCallbackWithExceptionNoReturn;
+import com.cloud.utils.db.TransactionLegacy;
 import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.exception.ExecutionException;
@@ -172,6 +193,7 @@ import com.cloud.utils.fsm.NoTransitionException;
 import com.cloud.utils.fsm.StateMachine2;
 import com.cloud.vm.ItWorkVO.Step;
 import com.cloud.vm.VirtualMachine.Event;
+import com.cloud.vm.VirtualMachine.PowerState;
 import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.NicDao;
 import com.cloud.vm.dao.UserVmDao;
@@ -186,6 +208,8 @@ import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMachineManager, Listener, Configurable {
     private static final Logger s_logger = Logger.getLogger(VirtualMachineManagerImpl.class);
 
+    private static final String VM_SYNC_ALERT_SUBJECT = "VM state sync alert";
+    
     @Inject
     DataStoreManager dataStoreMgr;
     @Inject
@@ -279,6 +303,11 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     @Inject
     DeploymentPlanningManager _dpMgr;
 
+    @Inject protected MessageBus _messageBus;
+    @Inject protected VirtualMachinePowerStateSync _syncMgr;
+    @Inject protected VmWorkJobDao _workJobDao;
+    @Inject protected AsyncJobManager _jobMgr;
+    
     Map<VirtualMachine.Type, VirtualMachineGuru> _vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru>();
     protected StateMachine2<State, VirtualMachine.Event, VirtualMachine> _stateMachine;
 
@@ -298,6 +327,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             "On destroy, force-stop takes this value ", true);
     static final ConfigKey<Integer> ClusterDeltaSyncInterval = new ConfigKey<Integer>("Advanced", Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds",
             false);
+    
+    static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced",
+            Boolean.class, "vm.job.enabled", "false", 
+            "True to enable new VM sync model. false to use the old way", false);
+    static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced",
+            Long.class, "vm.job.check.interval", "3000", 
+            "Interval in milliseconds to check if the job is complete", false);
+    static final ConfigKey<Long> VmJobTimeout = new ConfigKey<Long>("Advanced",
+            Long.class, "vm.job.timeout", "600000", 
+            "Time in milliseconds to wait before attempting to cancel a job", false);
+    static final ConfigKey<Integer> VmJobStateReportInterval = new ConfigKey<Integer>("Advanced",
+    		Integer.class, "vm.job.report.interval", "60",
+            "Interval to send application level pings to make sure the connection is still working", false);
 
 
 
@@ -651,15 +693,46 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
     @Override
-    public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
-    ResourceUnavailableException {
+    public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) 
+    	throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
+    	
         advanceStart(vmUuid, params, null);
     }
 
     @Override
     public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
-    ConcurrentOperationException, ResourceUnavailableException {
-        CallContext cctxt = CallContext.current();
+		ConcurrentOperationException, ResourceUnavailableException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateStart(vmUuid, params, planToDeploy);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
+    	    
+	    	try {
+				VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+		    	if(jobException instanceof ConcurrentOperationException)
+		    		throw (ConcurrentOperationException)jobException;
+		    	else if(jobException instanceof ResourceUnavailableException)
+		    		throw (ResourceUnavailableException)jobException;
+	    	}
+    	}
+    }
+    
+    @Override
+    public void orchestrateStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy) throws InsufficientCapacityException,
+    	ConcurrentOperationException, ResourceUnavailableException {
+        
+    	CallContext cctxt = CallContext.current();
         Account account = cctxt.getCallingAccount();
         User caller = cctxt.getCallingUser();
 
@@ -1145,7 +1218,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
     @Override
-    public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+    public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) 
+    	throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
+    	    
+	    	try {
+				VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+	    		if(jobException instanceof AgentUnavailableException)
+	    			throw (AgentUnavailableException)jobException;
+	    		else if(jobException instanceof ConcurrentOperationException)
+		    		throw (ConcurrentOperationException)jobException;
+		    	else if(jobException instanceof OperationTimedoutException)
+		    		throw (OperationTimedoutException)jobException;
+	    	}
+    	}
+    }
+
+    @Override
+    public void orchestrateStop(String vmUuid, boolean cleanUpEvenIfUnableToStop) throws AgentUnavailableException, OperationTimedoutException, ConcurrentOperationException {
         VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
         advanceStop(vm, cleanUpEvenIfUnableToStop);
@@ -1415,9 +1519,33 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
         return true;
     }
+    
+    public void storageMigration(String vmUuid, StoragePool destPool) {
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateStorageMigration(vmUuid, destPool);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
+    	    
+	    	try {
+				VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+		    	if(jobException instanceof RuntimeException)
+		    		throw (RuntimeException)jobException;
+	    	}
+    	}
+    }
 
     @Override
-    public void storageMigration(String vmUuid, StoragePool destPool) {
+    public void orchestrateStorageMigration(String vmUuid, StoragePool destPool) {
         VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
         try {
@@ -1473,7 +1601,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
     @Override
-    public void migrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException {
+    public void migrate(String vmUuid, long srcHostId, DeployDestination dest) 
+    	throws ResourceUnavailableException, ConcurrentOperationException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateMigrate(vmUuid, srcHostId, dest);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
+    	    
+	    	try {
+				VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+		    	if(jobException instanceof ResourceUnavailableException)
+	    			throw (ResourceUnavailableException)jobException;
+	    		else if(jobException instanceof ConcurrentOperationException)
+		    		throw (ConcurrentOperationException)jobException;
+	    		else if(jobException instanceof RuntimeException)
+		    		throw (RuntimeException)jobException;
+	    	}
+    	}
+    }
+    
+    @Override
+    public void orchestrateMigrate(String vmUuid, long srcHostId, DeployDestination dest) throws ResourceUnavailableException, ConcurrentOperationException {
         VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
         if (vm == null) {
             if (s_logger.isDebugEnabled()) {
@@ -1713,9 +1872,38 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         }
     }
 
+    public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool) 
+    	throws ResourceUnavailableException, ConcurrentOperationException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
+    	} else {
+            Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
+    	    
+	    	try {
+                VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+			}
+
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+	    	    if(jobException instanceof ResourceUnavailableException)
+                    throw (ResourceUnavailableException)jobException;
+	    		else if(jobException instanceof ConcurrentOperationException)
+		    	    throw (ConcurrentOperationException)jobException;
+           }
+    	}
+    }
+    
     @Override
-    public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException,
-    ConcurrentOperationException {
+    public void orchestrateMigrateWithStorage(String vmUuid, long srcHostId, long destHostId, Map<Volume, StoragePool> volumeToPool) throws ResourceUnavailableException,
+    	ConcurrentOperationException {
+    	
         VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
         HostVO srcHost = _hostDao.findById(srcHostId);
@@ -1954,9 +2142,40 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             throw new CloudRuntimeException("Unable to reboot a VM due to concurrent operation", e);
         }
     }
-
+    
     @Override
-    public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
+    public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) 
+    	throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateReboot(vmUuid, params);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
+    	    
+	    	try {
+				VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+	    		if(jobException instanceof ResourceUnavailableException)
+	    			throw (ResourceUnavailableException)jobException;
+	    		else if(jobException instanceof ConcurrentOperationException)
+		    		throw (ConcurrentOperationException)jobException;
+	    		else if(jobException instanceof InsufficientCapacityException)
+	    			throw (InsufficientCapacityException)jobException;
+	    	}
+    	}
+    }
+    
+    @Override
+    public void orchestrateReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> params) throws InsufficientCapacityException, ConcurrentOperationException,
     ResourceUnavailableException {
         VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
@@ -2081,8 +2300,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         return commands;
     }
 
-
-
     public void deltaSync(Map<String, Ternary<String, State, String>> newStates) {
         Map<Long, AgentVmInfo> states = convertToInfos(newStates);
 
@@ -2615,6 +2832,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                         }
                     }
                 }
+                
+                if(VmJobEnabled.value()) {
+	                if(ping.getHostVmStateReport() != null && ping.getHostVmStateReport().size() > 0) {                	
+	            		_syncMgr.processHostVmStatePingReport(agentId, ping.getHostVmStateReport());
+	                }
+                }
+                
+                // take the chance to scan VMs that are stuck in transitional states 
+                // and are missing from the report
+                scanStalledVMInTransitionStateOnUpHost(agentId);
                 processed = true;
             }
         }
@@ -2636,7 +2863,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         if (!(cmd instanceof StartupRoutingCommand)) {
             return;
         }
+        
+        if(s_logger.isDebugEnabled())
+        	s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
 
+        if(VmJobEnabled.value()) {
+        	_syncMgr.resetHostSyncState(agent.getId());
+        }
+        
         if (forRebalance) {
             s_logger.debug("Not processing listener " + this + " as connect happens on rebalance process");
             return;
@@ -2842,9 +3076,50 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         vmForUpdate.setServiceOfferingId(newSvcOff.getId());
         return _vmDao.update(vmId, vmForUpdate);
     }
-
+    
+    @Override
+    public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) 
+    	throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		return orchestrateAddVmToNetwork(vm, network,requested);
+    	} else {
+            Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
+    	    
+	    	try {
+                outcome.get();
+			} catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId());
+	    	if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
+	    		
+	    		NicProfile nic = (NicProfile)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
+	    		return nic;
+	    	} else {
+		    	Throwable jobException = retriveExecutionException(outcome.getJob());
+		    	if(jobException != null) {
+		    	    if(jobException instanceof ResourceUnavailableException)
+	                    throw (ResourceUnavailableException)jobException;
+		    		else if(jobException instanceof ConcurrentOperationException)
+			    	    throw (ConcurrentOperationException)jobException;
+		    		else if(jobException instanceof InsufficientCapacityException)
+			    	    throw (InsufficientCapacityException)jobException;
+		    		else if(jobException instanceof RuntimeException)
+			    	    throw (RuntimeException)jobException;
+	            }
+	    	    throw new RuntimeException("Job failed with unhandled exception");
+	    	}
+    	}
+    }
+    
     @Override
-    public NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException,
+    public NicProfile orchestrateAddVmToNetwork(VirtualMachine vm, Network network, NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException,
     InsufficientCapacityException {
         CallContext cctx = CallContext.current();
 
@@ -2909,9 +3184,48 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         NicTO nicTO = hvGuru.toNicTO(nic);
         return nicTO;
     }
+    
+    public boolean removeNicFromVm(VirtualMachine vm, Nic nic) 
+    	throws ConcurrentOperationException, ResourceUnavailableException {
+    	
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		return orchestrateRemoveNicFromVm(vm, nic);
+    	} else {
+            Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
+    	    
+	    	try {
+                outcome.get();
+			} catch (InterruptedException e) {
+                throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+                throw new RuntimeException("Execution excetion", e);
+			}
+
+	    	AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId());
+	    	
+	    	if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
+	    		Boolean result = (Boolean)JobSerializerHelper.fromObjectSerializedString(jobVo.getResult());
+	    		return result;
+	    	} else {
+		    	Throwable jobException = retriveExecutionException(outcome.getJob());
+		    	if(jobException != null) {
+		    	    if(jobException instanceof ResourceUnavailableException)
+	                    throw (ResourceUnavailableException)jobException;
+		    		else if(jobException instanceof ConcurrentOperationException)
+			    	    throw (ConcurrentOperationException)jobException;
+		    		else if(jobException instanceof RuntimeException)
+		    			throw (RuntimeException)jobException;
+	            }
+		    	
+		    	throw new RuntimeException("Job failed with un-handled exception");
+	    	}
+    	}
+    }
 
     @Override
-    public boolean removeNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException {
+    public boolean orchestrateRemoveNicFromVm(VirtualMachine vm, Nic nic) throws ConcurrentOperationException, ResourceUnavailableException {
         CallContext cctx = CallContext.current();
         VMInstanceVO vmVO = _vmDao.findById(vm.getId());
         NetworkVO network = _networkDao.findById(nic.getNetworkId());
@@ -2971,6 +3285,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     @Override
     @DB
     public boolean removeVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException {
+    	// TODO will serialize on the VM object later to resolve operation conflicts
+    	return orchestrateRemoveVmFromNetwork(vm, network, broadcastUri);
+    }
+    
+    @Override
+    @DB
+    public boolean orchestrateRemoveVmFromNetwork(VirtualMachine vm, Network network, URI broadcastUri) throws ConcurrentOperationException, ResourceUnavailableException {
         CallContext cctx = CallContext.current();
         VMInstanceVO vmVO = _vmDao.findById(vm.getId());
         ReservationContext context = new ReservationContextImpl(null, null, cctx.getCallingUser(), cctx.getCallingAccount());
@@ -3106,10 +3427,40 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             throw e;
         }
     }
+    
+    @Override
+    public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) 
+    	throws ResourceUnavailableException, ConcurrentOperationException {
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
+    	    
+	    	try {
+				VirtualMachine vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+	    	
+	    	Throwable jobException = retriveExecutionException(outcome.getJob());
+	    	if(jobException != null) {
+	    		if(jobException instanceof ResourceUnavailableException)
+	    			throw (ResourceUnavailableException)jobException;
+	    		else if(jobException instanceof ConcurrentOperationException)
+		    		throw (ConcurrentOperationException)jobException;
+	    	}
+    	}
+    }
 
     @Override
-    public void migrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) throws ResourceUnavailableException, ConcurrentOperationException {
-        VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+    public void orchestrateMigrateForScale(String vmUuid, long srcHostId, DeployDestination dest, Long oldSvcOfferingId) 
+    	throws ResourceUnavailableException, ConcurrentOperationException {
+        
+    	VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
         s_logger.info("Migrating " + vm + " to " + dest);
 
         vm.getServiceOfferingId();
@@ -3293,7 +3644,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
     public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException,
-    ResourceUnavailableException {
+    	ResourceUnavailableException {
 
         boolean result = true;
         VMInstanceVO router = _vmDao.findById(vm.getId());
@@ -3324,9 +3675,46 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
         return result;
     }
-
+    
+    public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, 
+    	boolean reconfiguringOnExistingHost) 
+    	throws ResourceUnavailableException, ConcurrentOperationException {
+
+    	AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+    	if(!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER)) {
+    		// avoid re-entrance
+    		return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+    	} else {
+    	    Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
+    	    
+    	    VirtualMachine vm = null;
+	    	try {
+				vm = outcome.get();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Operation is interrupted", e);
+			} catch (java.util.concurrent.ExecutionException e) {
+				throw new RuntimeException("Execution excetion", e);
+			}
+
+	    	AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, outcome.getJob().getId());
+	    	if(jobVo.getResultCode() == JobInfo.Status.SUCCEEDED.ordinal()) {
+	    		return _entityMgr.findById(VMInstanceVO.class, vm.getId());
+	    	} else {
+		    	Throwable jobException = retriveExecutionException(outcome.getJob());
+		    	if(jobException != null) {
+		    		if(jobException instanceof ResourceUnavailableException)
+		    			throw (ResourceUnavailableException)jobException;
+		    		else if(jobException instanceof ConcurrentOperationException)
+			    		throw (ConcurrentOperationException)jobException;
+		    	}
+		    	
+		    	throw new RuntimeException("Failed with un-handled exception");
+	    	}
+    	}
+    }
+    
     @Override
-    public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException,
+    public VMInstanceVO orchestrateReConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, boolean reconfiguringOnExistingHost) throws ResourceUnavailableException,
     ConcurrentOperationException {
         VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
@@ -3388,7 +3776,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     @Override
     public ConfigKey<?>[] getConfigKeys() {
         return new ConfigKey<?>[] {ClusterDeltaSyncInterval, StartRetry, VmDestroyForcestop, VmOpCancelInterval, VmOpCleanupInterval, VmOpCleanupWait, VmOpLockStateRetry,
-                VmOpWaitInterval, ExecuteInSequence};
+                VmOpWaitInterval, ExecuteInSequence, VmJobCheckInterval, VmJobTimeout, VmJobStateReportInterval};
     }
 
     public List<StoragePoolAllocator> getStoragePoolAllocators() {
@@ -3400,4 +3788,920 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         _storagePoolAllocators = storagePoolAllocators;
     }
 
+    
+    //
+    // PowerState report handling for out-of-band changes and handling of left-over transitional VM states
+    //
+    
+    @MessageHandler(topic = Topics.VM_POWER_STATE)
+    private void HandlePownerStateReport(Object target, String subject, String senderAddress, Object args) {
+    	assert(args != null);
+    	Long vmId = (Long)args;
+    	
+    	List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+    		VirtualMachine.Type.Instance, vmId);
+    	if(pendingWorkJobs.size() == 0) {
+    		// there is no pending operation job
+            VMInstanceVO vm = _vmDao.findById(vmId);
+    		if(vm != null) {
+    			switch(vm.getPowerState()) {
+    			case PowerOn :
+    				handlePowerOnReportWithNoPendingJobsOnVM(vm);
+    				break;
+    				
+    			case PowerOff :
+    				handlePowerOffReportWithNoPendingJobsOnVM(vm);
+    				break;
+
+    			// PowerUnknown shouldn't be reported, it is a derived
+    			// VM power state from host state (host un-reachable
+    			case PowerUnknown :
+    			default :
+    				assert(false);
+    				break;
+    			}
+    		} else {
+    			s_logger.warn("VM " + vmId + " no longer exists when processing VM state report");
+    		}
+    	} else {
+    		// TODO, do job wake-up signalling, since currently async job wake-up is not in use
+    		// we will skip it for nows
+    	}
+    }
+    
+    private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
+    	//
+    	// 	1) handle left-over transitional VM states
+    	//	2) handle out of band VM live migration
+    	//	3) handle out of sync stationary states, marking VM from Stopped to Running with
+    	//	   alert messages
+    	//
+    	switch(vm.getState()) {
+    	case Starting :
+    		try {
+    			stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+    		} catch(NoTransitionException e) {
+    			s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+    		}
+    		
+    		// we need to alert admin or user about this risky state transition
+    		_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+    			VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
+    		break;
+    		
+    	case Running :
+    		try {
+    			if(vm.getHostId() != null && vm.getHostId().longValue() != vm.getPowerHostId().longValue())
+    				s_logger.info("Detected out of band VM migration from host " + vm.getHostId() + " to host " + vm.getPowerHostId());
+    			stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+    		} catch(NoTransitionException e) {
+    			s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+    		}
+    		break;
+    		
+    	case Stopping :
+    	case Stopped :
+    		try {
+    			stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+    		} catch(NoTransitionException e) {
+    			s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+    		}
+      		_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+        			VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset");
+          	break;
+    		
+    	case Destroyed :
+    	case Expunging :
+    		s_logger.info("Receive power on report when VM is in destroyed or expunging state. vm: "
+        		    + vm.getId() + ", state: " + vm.getState());
+    		break;
+    		
+    	case Migrating :
+    		try {
+    			stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOnReport, vm.getPowerHostId());
+    		} catch(NoTransitionException e) {
+    			s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+    		}
+    		break;
+    		
+    	case Error :
+    	default :
+    		s_logger.info("Receive power on report when VM is in error or unexpected state. vm: "
+    		    + vm.getId() + ", state: " + vm.getState());
+    		break;
+    	}
+    }
+    
+    private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
+
+    	// 	1) handle left-over transitional VM states
+    	//	2) handle out of sync stationary states, schedule force-stop to release resources
+    	//
+    	switch(vm.getState()) {
+    	case Starting :
+    	case Stopping :
+    	case Stopped :
+    	case Migrating :
+    		try {
+    			stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
+    		} catch(NoTransitionException e) {
+    			s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+    		}
+      		_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+        			VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition.");
+      		// TODO: we need to forcely release all resource allocation
+          	break;
+    		
+    	case Running :
+    	case Destroyed :
+    	case Expunging :
+    		break;
+    		
+    	case Error :
+    	default :
+    		break;
+    	}
+    }
+    
+    private void scanStalledVMInTransitionStateOnUpHost(long hostId) {
+    	//
+    	// Check VM that is stuck in Starting, Stopping, Migrating states, we won't check
+    	// VMs in expunging state (this need to be handled specially)
+    	//
+    	// checking condition
+    	//	1) no pending VmWork job
+    	//	2) on hostId host and host is UP
+    	//
+    	// When host is UP, soon or later we will get a report from the host about the VM,
+    	// however, if VM is missing from the host report (it may happen in out of band changes
+    	// or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
+    	//
+    	// Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
+    	// and a VM stalls for status update, we will consider them to be powered off
+    	// (which is relatively safe to do so)
+    	
+    	long stallThresholdInMs = VmJobStateReportInterval.value() + (VmJobStateReportInterval.value() >> 1);
+    	Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - stallThresholdInMs);
+    	List<Long> mostlikelyStoppedVMs = listStalledVMInTransitionStateOnUpHost(hostId, cutTime);
+    	for(Long vmId : mostlikelyStoppedVMs) {
+    		VMInstanceVO vm = _vmDao.findById(vmId);
+    		assert(vm != null);
+    		handlePowerOffReportWithNoPendingJobsOnVM(vm);
+    	}
+    	
+    	List<Long> vmsWithRecentReport = listVMInTransitionStateWithRecentReportOnUpHost(hostId, cutTime);
+    	for(Long vmId : vmsWithRecentReport) {
+    		VMInstanceVO vm = _vmDao.findById(vmId);
+    		assert(vm != null);
+    		if(vm.getPowerState() == PowerState.PowerOn)
+    			handlePowerOnReportWithNoPendingJobsOnVM(vm);
+    		else
+    			handlePowerOffReportWithNoPendingJobsOnVM(vm);
+    	}
+    }
+    
+    private void scanStalledVMInTransitionStateOnDisconnectedHosts() {
+    	Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - VmOpWaitInterval.value()*1000);
+    	List<Long> stuckAndUncontrollableVMs = listStalledVMInTransitionStateOnDisconnectedHosts(cutTime);
+    	for(Long vmId : stuckAndUncontrollableVMs) {
+    		VMInstanceVO vm = _vmDao.findById(vmId);
+    		
+    		// We now only alert administrator about this situation
+      		_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+        		VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState() + " state and its host is unreachable for too long");
+    	}
+    }
+    
+    
+    // VMs that in transitional state without recent power state report
+    private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
+    	String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
+                     "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
+    			     "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+    			     "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+    	
+    	List<Long> l = new ArrayList<Long>();
+    	TransactionLegacy txn = null;
+    	try {
+    		txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+    	
+	        PreparedStatement pstmt = null;
+	        try {
+	            pstmt = txn.prepareAutoCloseStatement(sql);
+	            
+	            pstmt.setLong(1, hostId);
+	 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+	 	        pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+	            ResultSet rs = pstmt.executeQuery();
+	            while(rs.next()) {
+	            	l.add(rs.getLong(1));
+	            }
+	        } catch (SQLException e) {
+	        } catch (Throwable e) {
+	        }
+        
+    	} finally {
+    		if(txn != null)
+    			txn.close();
+    	}
+        return l;
+    }
+    
+    // VMs that in transitional state and recently have power state update
+    private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) {
+    	String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
+                     "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " +
+    			     "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+    			     "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+    	
+    	List<Long> l = new ArrayList<Long>();
+    	TransactionLegacy txn = null;
+    	try {
+    		txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+	        PreparedStatement pstmt = null;
+	        try {
+	            pstmt = txn.prepareAutoCloseStatement(sql);
+	            
+	            pstmt.setLong(1, hostId);
+	 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+	 	        pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+	            ResultSet rs = pstmt.executeQuery();
+	            while(rs.next()) {
+	            	l.add(rs.getLong(1));
+	            }
+	        } catch (SQLException e) {
+	        } catch (Throwable e) {
+	        }
+	        return l;
+    	} finally {
+    		if(txn != null)
+    			txn.close();
+    	}
+    }
+    
+    private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
+    	String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
+                 "AND i.power_state_update_time < ? AND i.host_id = h.id " +
+			     "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
+			     "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
+	
+    	List<Long> l = new ArrayList<Long>();
+    	TransactionLegacy txn = null;
+    	try {
+    		txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+	    	PreparedStatement pstmt = null;
+	    	try {
+		       pstmt = txn.prepareAutoCloseStatement(sql);
+		       
+		       pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+		       pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
+		       ResultSet rs = pstmt.executeQuery();
+		       while(rs.next()) {
+		       	l.add(rs.getLong(1));
+		       }
+	    	} catch (SQLException e) {
+	    	} catch (Throwable e) {
+	    	}
+	    	return l;
+    	} finally {
+    		if(txn != null)
+    			txn.close();
+    	}
+    }
+    
+    //
+    // VM operation based on new sync model
+    //
+    
+    public class VmStateSyncOutcome extends OutcomeImpl<VirtualMachine> {
+        private long _vmId;
+
+        public VmStateSyncOutcome(final AsyncJob job, final PowerState desiredPowerState, final long vmId, final Long srcHostIdForMigration) {
+            super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
+                @Override
+                public boolean checkCondition() {
+                    VMInstanceVO instance = _vmDao.findById(vmId);
+                    if (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && instance.getPowerHostId() != srcHostIdForMigration))
+                        return true;
+                    return false;
+                }
+            }, Topics.VM_POWER_STATE, AsyncJob.Topics.JOB_STATE);
+            _vmId = vmId;
+        }
+
+        @Override
+        protected VirtualMachine retrieve() {
+            return _vmDao.findById(_vmId);
+        }
+    }
+
+    public class VmJobSyncOutcome extends OutcomeImpl<VirtualMachine> {
+        private long _vmId;
+
+        public VmJobSyncOutcome(final AsyncJob job, final long vmId) {
+            super(VirtualMachine.class, job, VmJobCheckInterval.value(), new Predicate() {
+                @Override
+                public boolean checkCondition() {
+                	AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
+                	assert(jobVo != null);
+                	if(jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS)
+                        return true;
+                	
+                    return false;
+                }
+            }, AsyncJob.Topics.JOB_STATE);
+            _vmId = vmId;
+        }
+
+        @Override
+        protected VirtualMachine retrieve() {
+            return _vmDao.findById(_vmId);
+        }
+    }
+    
+    public Throwable retriveExecutionException(AsyncJob job) {
+    	assert(job != null);
+    	assert(job.getDispatcher().equals(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER));
+    	
+    	AsyncJobVO jobVo = this._entityMgr.findById(AsyncJobVO.class, job.getId());
+    	if(jobVo != null && jobVo.getResult() != null) {
+    		Object obj = JobSerializerHelper.fromSerializedString(job.getResult());
+    		
+    		if(obj != null && obj instanceof Throwable)
+    			return (Throwable)obj;
+    	}
+    	return null;
+    }
+    
+    public Outcome<VirtualMachine> startVmThroughJobQueue(final String vmUuid, 
+    	final Map<VirtualMachineProfile.Param, Object> params, 
+    	final DeploymentPlan planToDeploy) {
+        
+    	final CallContext context = CallContext.current();
+        final User callingUser = context.getCallingUser();
+        final Account callingAccount = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+    	    	VmWorkJobVO workJob = null;
+    	    	
+    	        _vmDao.lockRow(vm.getId(), true);
+    	        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(VirtualMachine.Type.Instance, 
+    	        	vm.getId(), VmWorkStart.class.getName());
+    	        
+    	        if (pendingWorkJobs.size() > 0) {
+    	            assert (pendingWorkJobs.size() == 1);
+    	            workJob = pendingWorkJobs.get(0);
+    	        } else {
+    	            workJob = new VmWorkJobVO(context.getContextId());
+
+    	            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+    	            workJob.setCmd(VmWorkStart.class.getName());
+
+    	            workJob.setAccountId(callingAccount.getId());
+    	            workJob.setUserId(callingUser.getId());
+    	            workJob.setStep(VmWorkJobVO.Step.Starting);
+    	            workJob.setVmType(vm.getType());
+    	            workJob.setVmInstanceId(vm.getId());
+
+    	            // save work context info (there are some duplications)
+    	            VmWorkStart workInfo = new VmWorkStart(callingUser.getId(), callingAccount.getId(), vm.getId());
+    	            workInfo.setPlan(planToDeploy);
+    	            workInfo.setParams(params);
+    	            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+    	            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+    	    	}
+    	        
+	            // Transaction syntax sugar has a cost here
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+    		}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), 
+        	VirtualMachine.PowerState.PowerOn, vm.getId(), null);
+    }
+    
+    public Outcome<VirtualMachine> stopVmThroughJobQueue(final String vmUuid, final boolean cleanup) {
+        final CallContext context = CallContext.current();
+        final Account account = context.getCallingAccount();
+        final User user = context.getCallingUser();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+    	
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+	        		VirtualMachine.Type.Instance, vm.getId(), 
+	        		VmWorkStop.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkStop.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setStep(VmWorkJobVO.Step.Prepare);
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkStop workInfo = new VmWorkStop(user.getId(), account.getId(), vm.getId(), cleanup);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		    	}
+		        
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+    		}
+		});
+
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), 
+        	VirtualMachine.PowerState.PowerOff, vm.getId(), null);
+    }
+    
+    public Outcome<VirtualMachine> rebootVmThroughJobQueue(final String vmUuid, 
+    	final Map<VirtualMachineProfile.Param, Object> params) {
+    	
+        final CallContext context = CallContext.current();
+        final Account account = context.getCallingAccount();
+        final User user = context.getCallingUser();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+    	
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+	        		VirtualMachine.Type.Instance, vm.getId(), 
+	        		VmWorkReboot.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkReboot.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setStep(VmWorkJobVO.Step.Prepare);
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkReboot workInfo = new VmWorkReboot(user.getId(), account.getId(), vm.getId(), params);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		    	}
+		        
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+    		}
+		});
+
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), 
+        	vm.getId());
+    }
+    
+    public Outcome<VirtualMachine> migrateVmThroughJobQueue(final String vmUuid, final long srcHostId, final DeployDestination dest) {
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkMigrate.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkMigrate.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkMigrate workInfo = new VmWorkMigrate(user.getId(), account.getId(), vm.getId(), srcHostId, dest);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), 
+        	VirtualMachine.PowerState.PowerOn, vm.getId(), vm.getPowerHostId());
+    }
+    
+    public Outcome<VirtualMachine> migrateVmWithStorageThroughJobQueue(
+    	final String vmUuid, final long srcHostId, final long destHostId,
+    	final Map<Volume, StoragePool> volumeToPool) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkMigrateWithStorage.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkMigrate.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkMigrateWithStorage workInfo = new VmWorkMigrateWithStorage(user.getId(), account.getId(), vm.getId(), 
+		            	srcHostId, destHostId, volumeToPool);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmStateSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), 
+        	VirtualMachine.PowerState.PowerOn, vm.getId(), destHostId);
+    }
+    
+    //
+    // TODO build a common pattern to reduce code duplication in following methods
+    // no time for this at current iteration
+    //
+    public Outcome<VirtualMachine> migrateVmForScaleThroughJobQueue(
+    	final String vmUuid, final long srcHostId, final DeployDestination dest, final Long newSvcOfferingId) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkMigrateForScale.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkMigrate.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkMigrateForScale workInfo = new VmWorkMigrateForScale(user.getId(), account.getId(), vm.getId(), 
+		            	srcHostId, dest, newSvcOfferingId);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+    }
+    
+    public Outcome<VirtualMachine> migrateVmStorageThroughJobQueue(
+    	final String vmUuid, final StoragePool destPool) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkStorageMigration.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkStorageMigration.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(), 
+		            	destPool);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+    }
+    
+    public Outcome<VirtualMachine> addVmToNetworkThroughJobQueue(
+		final VirtualMachine vm, final Network network, final NicProfile requested) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkAddVmToNetwork.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkAddVmToNetwork.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), 
+		            	network, requested);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+    }
+    
+    public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
+    	final VirtualMachine vm, final Nic nic) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkRemoveNicFromVm.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkRemoveNicFromVm.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkRemoveNicFromVm workInfo = new VmWorkRemoveNicFromVm(user.getId(), account.getId(), vm.getId(), 
+		            	nic);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+    }
+
+    public Outcome<VirtualMachine> removeVmFromNetworkThroughJobQueue(
+    	final VirtualMachine vm, final Network network, final URI broadcastUri) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkRemoveVmFromNetwork.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkRemoveVmFromNetwork.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkRemoveVmFromNetwork workInfo = new VmWorkRemoveVmFromNetwork(user.getId(), account.getId(), vm.getId(), 
+		            	network, broadcastUri);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+    }
+    
+    public Outcome<VirtualMachine> reconfigureVmThroughJobQueue(
+    	final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) {
+    	
+        final CallContext context = CallContext.current();
+        final User user = context.getCallingUser();
+        final Account account = context.getCallingAccount();
+
+        final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
+
+    	Transaction.execute(new TransactionCallbackNoReturn () {
+    		public void doInTransactionWithoutResult(TransactionStatus status) {
+	
+		        _vmDao.lockRow(vm.getId(), true);
+		
+		        List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
+		        	VirtualMachine.Type.Instance, vm.getId(), 
+		        	VmWorkReconfigure.class.getName());
+		
+		        VmWorkJobVO workJob = null;
+		        if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) {
+		            assert (pendingWorkJobs.size() == 1);
+		            workJob = pendingWorkJobs.get(0);
+		        } else {
+		                    
+		            workJob = new VmWorkJobVO(context.getContextId());
+		
+		            workJob.setDispatcher(VmWorkJobDispatcher.VM_WORK_JOB_DISPATCHER);
+		            workJob.setCmd(VmWorkReconfigure.class.getName());
+		
+		            workJob.setAccountId(account.getId());
+		            workJob.setUserId(user.getId());
+		            workJob.setVmType(vm.getType());
+		            workJob.setVmInstanceId(vm.getId());
+		
+		            // save work context info (there are some duplications)
+		            VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(), 
+		            		oldServiceOffering, reconfiguringOnExistingHost);
+		            workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+		
+		            _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
+		        }
+	            context.putContextParameter("workJob", workJob);
+	            context.putContextParameter("jobId", new Long(vm.getId()));
+	    	}
+    	});
+    	
+    	final long jobId = (Long)context.getContextParameter("jobId");
+    	AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(jobId);
+    	
+        return new VmJobSyncOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
+    }
+   
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/278ef81a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
index 7a23ddd..dacc8d0 100644
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachinePowerStateSync.java
@@ -19,7 +19,6 @@ package com.cloud.vm;
 import java.util.Map;
 
 import com.cloud.agent.api.HostVmStateReportEntry;
-import com.cloud.vm.VirtualMachine.PowerState;
 
 public interface VirtualMachinePowerStateSync {
 	
@@ -28,5 +27,5 @@ public interface VirtualMachinePowerStateSync {
 	void processHostVmStateReport(long hostId, Map<String, HostVmStateReportEntry> report);
 	
 	// to adapt legacy ping report
-	void processHostVmStatePingReport(long hostId, Map<String, PowerState> report);
+	void processHostVmStatePingReport(long hostId, Map<String, HostVmStateReportEntry> report);
 }