You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ro...@apache.org on 2021/02/12 07:12:08 UTC

[cloudstack] branch master updated: vmware: vm migration improvements (#4385)

This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/master by this push:
     new d6e8b53  vmware: vm migration improvements (#4385)
d6e8b53 is described below

commit d6e8b53736d92363d0f8689997259769356122de
Author: Abhishek Kumar <ab...@shapeblue.com>
AuthorDate: Fri Feb 12 12:41:41 2021 +0530

    vmware: vm migration improvements (#4385)
    
    - Fixes inter-cluster migration of VMs
    - Allows migration of stopped VM with disks attached to different and suitable pools
    - Improves inter-cluster detached volume migration
    - Allows inter-cluster migration (clusters of same Pod) for system VMs, VRs on VMware
    - Allows storage migration for stopped system VMs, VRs on VMware within same Pod if StoragePool cluster scopetype
    
    Linked Primate PR: https://github.com/apache/cloudstack-primate/pull/789 [Changes merged in this PR after new UI merge]
    Documentation PR: https://github.com/apache/cloudstack-documentation/pull/170
    
    Signed-off-by: Abhishek Kumar <ab...@gmail.com>
---
 .../java/com/cloud/hypervisor/HypervisorGuru.java  |   3 +-
 api/src/main/java/com/cloud/vm/UserVmService.java  |   2 +
 .../command/admin/systemvm/MigrateSystemVMCmd.java |  47 +-
 .../vm/MigrateVirtualMachineWithVolumeCmd.java     |  33 +-
 .../cloud/agent/api/MigrateVmToPoolCommand.java    |  33 +-
 .../agent/api/storage/MigrateVolumeCommand.java    |  12 +-
 .../cloudstack/storage/to/VolumeObjectTO.java      |  11 +-
 .../java/com/cloud/vm/VirtualMachineManager.java   |   3 +-
 .../service/VolumeOrchestrationService.java        |   2 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java    | 277 +++++---
 .../java/com/cloud/vm/VmWorkStorageMigration.java  |  12 +-
 .../engine/orchestration/VolumeOrchestrator.java   |  29 +-
 .../cloud/vm/VirtualMachineManagerImplTest.java    |  69 +-
 .../hypervisor/dao/HypervisorCapabilitiesDao.java  |   2 +
 .../dao/HypervisorCapabilitiesDaoImpl.java         |  17 +
 .../java/com/cloud/hypervisor/guru/VMwareGuru.java |  88 ++-
 .../hypervisor/vmware/resource/VmwareResource.java | 695 +++++++++++----------
 .../motion/VmwareStorageMotionStrategy.java        |  68 +-
 server/src/main/java/com/cloud/api/ApiDBUtils.java |  13 +-
 .../main/java/com/cloud/api/ApiResponseHelper.java |   9 +-
 .../com/cloud/hypervisor/HypervisorGuruBase.java   |   3 +-
 .../com/cloud/server/ManagementServerImpl.java     |  31 +-
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  | 195 +++---
 .../component/test_interpod_migration.py           | 464 ++++++++++++++
 ui/public/locales/en.json                          |   8 +
 ui/src/config/section/compute.js                   |  12 +-
 ui/src/config/section/infra/ilbvms.js              |  19 +-
 ui/src/config/section/infra/routers.js             |  23 +-
 ui/src/config/section/infra/systemVms.js           |  23 +-
 ui/src/views/compute/MigrateVMStorage.vue          | 228 +++++++
 ui/src/views/compute/MigrateWizard.vue             |  44 +-
 .../cloud/hypervisor/vmware/mo/DatastoreMO.java    |  15 +-
 .../hypervisor/vmware/mo/HypervisorHostHelper.java |  96 ++-
 .../cloud/hypervisor/vmware/util/VmwareHelper.java |  14 +
 34 files changed, 1896 insertions(+), 704 deletions(-)

diff --git a/api/src/main/java/com/cloud/hypervisor/HypervisorGuru.java b/api/src/main/java/com/cloud/hypervisor/HypervisorGuru.java
index 8a10964..96518ac 100644
--- a/api/src/main/java/com/cloud/hypervisor/HypervisorGuru.java
+++ b/api/src/main/java/com/cloud/hypervisor/HypervisorGuru.java
@@ -27,6 +27,7 @@ import com.cloud.agent.api.to.NicTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.storage.StoragePool;
+import com.cloud.storage.Volume;
 import com.cloud.utils.Pair;
 import com.cloud.utils.component.Adapter;
 import com.cloud.vm.NicProfile;
@@ -99,5 +100,5 @@ public interface HypervisorGuru extends Adapter {
      * @param destination the primary storage pool to migrate to
      * @return a list of commands to perform for a successful migration
      */
-    List<Command> finalizeMigrate(VirtualMachine vm, StoragePool destination);
+    List<Command> finalizeMigrate(VirtualMachine vm, Map<Volume, StoragePool> volumeToPool);
 }
diff --git a/api/src/main/java/com/cloud/vm/UserVmService.java b/api/src/main/java/com/cloud/vm/UserVmService.java
index 56a6dfd..eab9c73 100644
--- a/api/src/main/java/com/cloud/vm/UserVmService.java
+++ b/api/src/main/java/com/cloud/vm/UserVmService.java
@@ -488,6 +488,8 @@ public interface UserVmService {
 
     VirtualMachine vmStorageMigration(Long vmId, StoragePool destPool);
 
+    VirtualMachine vmStorageMigration(Long vmId, Map<String, String> volumeToPool);
+
     UserVm restoreVM(RestoreVMCmd cmd) throws InsufficientCapacityException, ResourceUnavailableException;
 
     UserVm restoreVirtualMachine(Account caller, long vmId, Long newTemplateId) throws InsufficientCapacityException, ResourceUnavailableException;
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/systemvm/MigrateSystemVMCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/systemvm/MigrateSystemVMCmd.java
index ab0018b..50129a5 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/systemvm/MigrateSystemVMCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/systemvm/MigrateSystemVMCmd.java
@@ -16,7 +16,7 @@
 // under the License.
 package org.apache.cloudstack.api.command.admin.systemvm;
 
-import org.apache.log4j.Logger;
+import java.util.HashMap;
 
 import org.apache.cloudstack.acl.SecurityChecker.AccessType;
 import org.apache.cloudstack.api.ACL;
@@ -27,8 +27,10 @@ import org.apache.cloudstack.api.BaseAsyncCmd;
 import org.apache.cloudstack.api.Parameter;
 import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.HostResponse;
+import org.apache.cloudstack.api.response.StoragePoolResponse;
 import org.apache.cloudstack.api.response.SystemVmResponse;
 import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
 
 import com.cloud.event.EventTypes;
 import com.cloud.exception.ConcurrentOperationException;
@@ -37,6 +39,7 @@ import com.cloud.exception.ManagementServerException;
 import com.cloud.exception.ResourceUnavailableException;
 import com.cloud.exception.VirtualMachineMigrationException;
 import com.cloud.host.Host;
+import com.cloud.storage.StoragePool;
 import com.cloud.user.Account;
 import com.cloud.vm.VirtualMachine;
 
@@ -54,7 +57,6 @@ public class MigrateSystemVMCmd extends BaseAsyncCmd {
     @Parameter(name = ApiConstants.HOST_ID,
                type = CommandType.UUID,
                entityType = HostResponse.class,
-               required = true,
                description = "destination Host ID to migrate VM to")
     private Long hostId;
 
@@ -66,6 +68,13 @@ public class MigrateSystemVMCmd extends BaseAsyncCmd {
                description = "the ID of the virtual machine")
     private Long virtualMachineId;
 
+    @Parameter(name = ApiConstants.STORAGE_ID,
+            since = "4.16.0",
+            type = CommandType.UUID,
+            entityType = StoragePoolResponse.class,
+            description = "Destination storage pool ID to migrate VM volumes to. Required for migrating the root disk volume")
+    private Long storageId;
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -78,6 +87,10 @@ public class MigrateSystemVMCmd extends BaseAsyncCmd {
         return virtualMachineId;
     }
 
+    public Long getStorageId() {
+        return storageId;
+    }
+
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
@@ -109,15 +122,35 @@ public class MigrateSystemVMCmd extends BaseAsyncCmd {
 
     @Override
     public void execute() {
+        if (getHostId() == null && getStorageId() == null) {
+            throw new InvalidParameterValueException("Either hostId or storageId must be specified");
+        }
 
-        Host destinationHost = _resourceService.getHost(getHostId());
-        if (destinationHost == null) {
-            throw new InvalidParameterValueException("Unable to find the host to migrate the VM, host id=" + getHostId());
+        if (getHostId() != null && getStorageId() != null) {
+            throw new InvalidParameterValueException("Only one of hostId and storageId can be specified");
         }
         try {
-            CallContext.current().setEventDetails("VM Id: " + this._uuidMgr.getUuid(VirtualMachine.class, getVirtualMachineId()) + " to host Id: " + this._uuidMgr.getUuid(Host.class, getHostId()));
             //FIXME : Should not be calling UserVmService to migrate all types of VMs - need a generic VM layer
-            VirtualMachine migratedVm = _userVmService.migrateVirtualMachine(getVirtualMachineId(), destinationHost);
+            VirtualMachine migratedVm = null;
+            if (getHostId() != null) {
+                Host destinationHost = _resourceService.getHost(getHostId());
+                if (destinationHost == null) {
+                    throw new InvalidParameterValueException("Unable to find the host to migrate the VM, host id=" + getHostId());
+                }
+                if (destinationHost.getType() != Host.Type.Routing) {
+                    throw new InvalidParameterValueException("The specified host(" + destinationHost.getName() + ") is not suitable to migrate the VM, please specify another one");
+                }
+                CallContext.current().setEventDetails("VM Id: " + getVirtualMachineId() + " to host Id: " + getHostId());
+                migratedVm = _userVmService.migrateVirtualMachineWithVolume(getVirtualMachineId(), destinationHost, new HashMap<String, String>());
+            } else if (getStorageId() != null) {
+                // OfflineMigration performed when this parameter is specified
+                StoragePool destStoragePool = _storageService.getStoragePool(getStorageId());
+                if (destStoragePool == null) {
+                    throw new InvalidParameterValueException("Unable to find the storage pool to migrate the VM");
+                }
+                CallContext.current().setEventDetails("VM Id: " + getVirtualMachineId() + " to storage pool Id: " + getStorageId());
+                migratedVm = _userVmService.vmStorageMigration(getVirtualMachineId(), destStoragePool);
+            }
             if (migratedVm != null) {
                 // return the generic system VM instance response
                 SystemVmResponse response = _responseGenerator.createSystemVmResponse(migratedVm);
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/MigrateVirtualMachineWithVolumeCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/MigrateVirtualMachineWithVolumeCmd.java
index 65d71cc..e4fa4f1 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/MigrateVirtualMachineWithVolumeCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/vm/MigrateVirtualMachineWithVolumeCmd.java
@@ -21,8 +21,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.api.APICommand;
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.ApiErrorCode;
@@ -32,6 +30,8 @@ import org.apache.cloudstack.api.ResponseObject.ResponseView;
 import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.HostResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
+import org.apache.commons.collections.MapUtils;
+import org.apache.log4j.Logger;
 
 import com.cloud.event.EventTypes;
 import com.cloud.exception.ConcurrentOperationException;
@@ -61,7 +61,6 @@ public class MigrateVirtualMachineWithVolumeCmd extends BaseAsyncCmd {
     @Parameter(name = ApiConstants.HOST_ID,
                type = CommandType.UUID,
                entityType = HostResponse.class,
-               required = true,
                description = "Destination Host ID to migrate VM to.")
     private Long hostId;
 
@@ -97,7 +96,7 @@ public class MigrateVirtualMachineWithVolumeCmd extends BaseAsyncCmd {
 
     public Map<String, String> getVolumeToPool() {
         Map<String, String> volumeToPoolMap = new HashMap<String, String>();
-        if (migrateVolumeTo != null && !migrateVolumeTo.isEmpty()) {
+        if (MapUtils.isNotEmpty(migrateVolumeTo)) {
             Collection<?> allValues = migrateVolumeTo.values();
             Iterator<?> iter = allValues.iterator();
             while (iter.hasNext()) {
@@ -141,19 +140,35 @@ public class MigrateVirtualMachineWithVolumeCmd extends BaseAsyncCmd {
 
     @Override
     public void execute() {
+        if (hostId == null && MapUtils.isEmpty(migrateVolumeTo)) {
+            throw new InvalidParameterValueException(String.format("Either %s or %s  must be passed for migrating the VM", ApiConstants.HOST_ID, ApiConstants.MIGRATE_TO));
+        }
+
         UserVm userVm = _userVmService.getUserVm(getVirtualMachineId());
         if (userVm == null) {
             throw new InvalidParameterValueException("Unable to find the VM by id=" + getVirtualMachineId());
         }
 
-        Host destinationHost = _resourceService.getHost(getHostId());
-        // OfflineVmwareMigration: destination host would have to not be a required parameter for stopped VMs
-        if (destinationHost == null) {
-            throw new InvalidParameterValueException("Unable to find the host to migrate the VM, host id =" + getHostId());
+        if (!VirtualMachine.State.Running.equals(userVm.getState()) && hostId != null) {
+            throw new InvalidParameterValueException(String.format("VM ID: %s is not in Running state to migrate it to new host", userVm.getUuid()));
+        }
+
+        if (!VirtualMachine.State.Stopped.equals(userVm.getState()) && hostId == null) {
+            throw new InvalidParameterValueException(String.format("VM ID: %s is not in Stopped state to migrate, use %s parameter to migrate it to a new host", userVm.getUuid(), ApiConstants.HOST_ID));
         }
 
         try {
-            VirtualMachine migratedVm = _userVmService.migrateVirtualMachineWithVolume(getVirtualMachineId(), destinationHost, getVolumeToPool());
+            VirtualMachine migratedVm = null;
+            if (hostId != null) {
+                Host destinationHost = _resourceService.getHost(getHostId());
+                // OfflineVmwareMigration: destination host would have to not be a required parameter for stopped VMs
+                if (destinationHost == null) {
+                    throw new InvalidParameterValueException("Unable to find the host to migrate the VM, host id =" + getHostId());
+                }
+                migratedVm = _userVmService.migrateVirtualMachineWithVolume(getVirtualMachineId(), destinationHost, getVolumeToPool());
+            } else if (MapUtils.isNotEmpty(migrateVolumeTo)) {
+                migratedVm = _userVmService.vmStorageMigration(getVirtualMachineId(), getVolumeToPool());
+            }
             if (migratedVm != null) {
                 UserVmResponse response = _responseGenerator.createUserVmResponse(ResponseView.Full, "virtualmachine", (UserVm)migratedVm).get(0);
                 response.setResponseName(getCommandName());
diff --git a/core/src/main/java/com/cloud/agent/api/MigrateVmToPoolCommand.java b/core/src/main/java/com/cloud/agent/api/MigrateVmToPoolCommand.java
index 91a911d..16e2533 100644
--- a/core/src/main/java/com/cloud/agent/api/MigrateVmToPoolCommand.java
+++ b/core/src/main/java/com/cloud/agent/api/MigrateVmToPoolCommand.java
@@ -18,9 +18,11 @@
 //
 package com.cloud.agent.api;
 
-import com.cloud.agent.api.to.VolumeTO;
+import java.util.List;
 
-import java.util.Collection;
+import com.cloud.agent.api.to.StorageFilerTO;
+import com.cloud.agent.api.to.VolumeTO;
+import com.cloud.utils.Pair;
 
 /**
  * used to tell the agent to migrate a vm to a different primary storage pool.
@@ -28,10 +30,10 @@ import java.util.Collection;
  *
  */
 public class MigrateVmToPoolCommand extends Command {
-    private Collection<VolumeTO> volumes;
     private String vmName;
-    private String destinationPool;
     private boolean executeInSequence = false;
+    private List<Pair<VolumeTO, StorageFilerTO>> volumeToFilerAsList;
+    private String hostGuidInTargetCluster;
 
     protected MigrateVmToPoolCommand() {
     }
@@ -39,27 +41,28 @@ public class MigrateVmToPoolCommand extends Command {
     /**
      *
      * @param vmName the name of the VM to migrate
-     * @param volumes used to supply feedback on vmware generated names
-     * @param destinationPool the primary storage pool to migrate the VM to
+     * @param volumeToFilerTo the volume to primary storage pool map to migrate the VM to
+     * @param hostGuidInTargetCluster GUID of host in target cluster when migrating across clusters
      * @param executeInSequence
      */
-    public MigrateVmToPoolCommand(String vmName, Collection<VolumeTO> volumes, String destinationPool, boolean executeInSequence) {
+    public MigrateVmToPoolCommand(String vmName, List<Pair<VolumeTO, StorageFilerTO>> volumeToFilerTo,
+                                  String hostGuidInTargetCluster, boolean executeInSequence) {
         this.vmName = vmName;
-        this.volumes = volumes;
-        this.destinationPool = destinationPool;
+        this.hostGuidInTargetCluster = hostGuidInTargetCluster;
+        this.volumeToFilerAsList = volumeToFilerTo;
         this.executeInSequence = executeInSequence;
     }
 
-    public Collection<VolumeTO> getVolumes() {
-        return volumes;
+    public String getVmName() {
+        return vmName;
     }
 
-    public String getDestinationPool() {
-        return destinationPool;
+    public List<Pair<VolumeTO, StorageFilerTO>> getVolumeToFilerAsList() {
+        return volumeToFilerAsList;
     }
 
-    public String getVmName() {
-        return vmName;
+    public String getHostGuidInTargetCluster() {
+        return hostGuidInTargetCluster;
     }
 
     @Override
diff --git a/core/src/main/java/com/cloud/agent/api/storage/MigrateVolumeCommand.java b/core/src/main/java/com/cloud/agent/api/storage/MigrateVolumeCommand.java
index 9902a86..f3ca63b 100644
--- a/core/src/main/java/com/cloud/agent/api/storage/MigrateVolumeCommand.java
+++ b/core/src/main/java/com/cloud/agent/api/storage/MigrateVolumeCommand.java
@@ -34,6 +34,7 @@ public class MigrateVolumeCommand extends Command {
     StorageFilerTO sourcePool;
     String attachedVmName;
     Volume.Type volumeType;
+    String hostGuidInTargetCluster;
 
     private DataTO srcData;
     private DataTO destData;
@@ -68,6 +69,11 @@ public class MigrateVolumeCommand extends Command {
         setWait(timeout);
     }
 
+    public MigrateVolumeCommand(long volumeId, String volumePath, StoragePool sourcePool, StoragePool targetPool, String targetClusterHost) {
+        this(volumeId, volumePath, sourcePool, targetPool);
+        this.hostGuidInTargetCluster = targetClusterHost;
+    }
+
     @Override
     public boolean executeInSequence() {
         return true;
@@ -125,7 +131,11 @@ public class MigrateVolumeCommand extends Command {
         return destDetails;
     }
 
+    public String getHostGuidInTargetCluster() {
+        return hostGuidInTargetCluster;
+    }
+
     public int getWaitInMillSeconds() {
         return getWait() * 1000;
     }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/java/org/apache/cloudstack/storage/to/VolumeObjectTO.java b/core/src/main/java/org/apache/cloudstack/storage/to/VolumeObjectTO.java
index 6cd27b1..a076b80 100644
--- a/core/src/main/java/org/apache/cloudstack/storage/to/VolumeObjectTO.java
+++ b/core/src/main/java/org/apache/cloudstack/storage/to/VolumeObjectTO.java
@@ -19,7 +19,6 @@
 
 package org.apache.cloudstack.storage.to;
 
-import com.cloud.storage.MigrationOptions;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 
 import com.cloud.agent.api.to.DataObjectType;
@@ -27,6 +26,7 @@ import com.cloud.agent.api.to.DataStoreTO;
 import com.cloud.agent.api.to.DataTO;
 import com.cloud.hypervisor.Hypervisor;
 import com.cloud.offering.DiskOffering.DiskCacheMode;
+import com.cloud.storage.MigrationOptions;
 import com.cloud.storage.Storage;
 import com.cloud.storage.Volume;
 
@@ -62,6 +62,7 @@ public class VolumeObjectTO implements DataTO {
     private Hypervisor.HypervisorType hypervisorType;
     private MigrationOptions migrationOptions;
     private boolean directDownload;
+    private String dataStoreUuid;
     private boolean deployAsIs;
     private String updatedDataStoreUUID;
     private String vSphereStoragePolicyId;
@@ -319,6 +320,14 @@ public class VolumeObjectTO implements DataTO {
         return directDownload;
     }
 
+    public String getDataStoreUuid() {
+        return dataStoreUuid;
+    }
+
+    public void setDataStoreUuid(String dataStoreUuid) {
+        this.dataStoreUuid = dataStoreUuid;
+    }
+
     public boolean isDeployAsIs() {
         return deployAsIs;
     }
diff --git a/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java b/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java
index 49b13e1..463d3a7 100644
--- a/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java
+++ b/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java
@@ -40,7 +40,6 @@ import com.cloud.network.Network;
 import com.cloud.offering.DiskOffering;
 import com.cloud.offering.DiskOfferingInfo;
 import com.cloud.offering.ServiceOffering;
-import com.cloud.storage.StoragePool;
 import com.cloud.template.VirtualMachineTemplate;
 import com.cloud.user.Account;
 import com.cloud.uservm.UserVm;
@@ -167,7 +166,7 @@ public interface VirtualMachineManager extends Manager {
 
     VirtualMachine findById(long vmId);
 
-    void storageMigration(String vmUuid, StoragePool storagePoolId);
+    void storageMigration(String vmUuid, Map<Long, Long> volumeToPool);
 
     /**
      * @param vmInstance
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/VolumeOrchestrationService.java b/engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/VolumeOrchestrationService.java
index 44e993f..ee264ac 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/VolumeOrchestrationService.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/VolumeOrchestrationService.java
@@ -112,7 +112,7 @@ public interface VolumeOrchestrationService {
 
     void migrateVolumes(VirtualMachine vm, VirtualMachineTO vmTo, Host srcHost, Host destHost, Map<Volume, StoragePool> volumeToPool);
 
-    boolean storageMigration(VirtualMachineProfile vm, StoragePool destPool) throws StorageUnavailableException;
+    boolean storageMigration(VirtualMachineProfile vm, Map<Volume, StoragePool> volumeToPool) throws StorageUnavailableException;
 
     void prepareForMigration(VirtualMachineProfile vm, DeployDestination dest);
 
diff --git a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
index f1ab9cd..de1ef20 100755
--- a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -23,14 +23,17 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.Executors;
@@ -40,7 +43,6 @@ import java.util.concurrent.TimeUnit;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
-import com.cloud.deployasis.dao.UserVmDeployAsIsDetailsDao;
 import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.command.admin.vm.MigrateVMCmd;
@@ -51,7 +53,6 @@ import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
 import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
-import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
 import org.apache.cloudstack.framework.ca.Certificate;
 import org.apache.cloudstack.framework.config.ConfigKey;
@@ -142,6 +143,7 @@ import com.cloud.deploy.DeploymentPlan;
 import com.cloud.deploy.DeploymentPlanner;
 import com.cloud.deploy.DeploymentPlanner.ExcludeList;
 import com.cloud.deploy.DeploymentPlanningManager;
+import com.cloud.deployasis.dao.UserVmDeployAsIsDetailsDao;
 import com.cloud.event.EventTypes;
 import com.cloud.event.UsageEventUtils;
 import com.cloud.event.UsageEventVO;
@@ -933,7 +935,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             } catch (final InterruptedException e) {
                 throw new RuntimeException("Operation is interrupted", e);
             } catch (final java.util.concurrent.ExecutionException e) {
-                throw new RuntimeException("Execution excetion", e);
+                throw new RuntimeException("Execution exception", e);
             }
 
             final Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
@@ -2143,7 +2145,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
     @Override
-    public void storageMigration(final String vmUuid, final StoragePool destPool) {
+    public void storageMigration(final String vmUuid, final Map<Long, Long> volumeToPool) {
         final AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
         if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
             // avoid re-entrance
@@ -2151,14 +2153,14 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             final VirtualMachine vm = _vmDao.findByUuid(vmUuid);
             placeHolder = createPlaceHolderWork(vm.getId());
             try {
-                orchestrateStorageMigration(vmUuid, destPool);
+                orchestrateStorageMigration(vmUuid, volumeToPool);
             } finally {
                 if (placeHolder != null) {
                     _workJobDao.expunge(placeHolder.getId());
                 }
             }
         } else {
-            final Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
+            final Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, volumeToPool);
 
             try {
                 final VirtualMachine vm = outcome.get();
@@ -2179,10 +2181,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         }
     }
 
-    private void orchestrateStorageMigration(final String vmUuid, final StoragePool destPool) {
+    private void orchestrateStorageMigration(final String vmUuid, final Map<Long, Long> volumeToPool) {
         final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
-        preStorageMigrationStateCheck(destPool, vm);
+        Map<Volume, StoragePool> volumeToPoolMap = prepareVmStorageMigration(vm, volumeToPool);
 
         try {
             if(s_logger.isDebugEnabled()) {
@@ -2191,7 +2193,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                                 vm.getInstanceName()));
             }
 
-            migrateThroughHypervisorOrStorage(destPool, vm);
+            migrateThroughHypervisorOrStorage(vm, volumeToPoolMap);
 
         } catch (ConcurrentOperationException
                 | InsufficientCapacityException // possibly InsufficientVirtualNetworkCapacityException or InsufficientAddressCapacityException
@@ -2210,24 +2212,18 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         }
     }
 
-    private Answer[] attemptHypervisorMigration(StoragePool destPool, VMInstanceVO vm) {
+    private Answer[] attemptHypervisorMigration(VMInstanceVO vm, Map<Volume, StoragePool> volumeToPool, Long hostId) {
+        if (hostId == null) {
+            return null;
+        }
         final HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
         // OfflineVmwareMigration: in case of vmware call vcenter to do it for us.
         // OfflineVmwareMigration: should we check the proximity of source and destination
         // OfflineVmwareMigration: if we are in the same cluster/datacentre/pool or whatever?
         // OfflineVmwareMigration: we are checking on success to optionally delete an old vm if we are not
-        List<Command> commandsToSend = hvGuru.finalizeMigrate(vm, destPool);
+        List<Command> commandsToSend = hvGuru.finalizeMigrate(vm, volumeToPool);
 
-        Long hostId = vm.getHostId();
-        // OfflineVmwareMigration: probably this is null when vm is stopped
-        if(hostId == null) {
-            hostId = vm.getLastHostId();
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug(String.format("host id is null, using last host id %d", hostId) );
-            }
-        }
-
-        if(CollectionUtils.isNotEmpty(commandsToSend)) {
+        if (CollectionUtils.isNotEmpty(commandsToSend)) {
             Commands commandsContainer = new Commands(Command.OnError.Stop);
             commandsContainer.addCommands(commandsToSend);
             try {
@@ -2241,86 +2237,164 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         return null;
     }
 
-    private void afterHypervisorMigrationCleanup(StoragePool destPool, VMInstanceVO vm, HostVO srcHost, Long srcClusterId, Answer[] hypervisorMigrationResults) throws InsufficientCapacityException {
+    private void afterHypervisorMigrationCleanup(VMInstanceVO vm, Map<Volume, StoragePool> volumeToPool, Long sourceClusterId, Answer[] hypervisorMigrationResults) throws InsufficientCapacityException {
         boolean isDebugEnabled = s_logger.isDebugEnabled();
         if(isDebugEnabled) {
-            String msg = String.format("cleaning up after hypervisor pool migration volumes for VM %s(%s) to pool %s(%s)", vm.getInstanceName(), vm.getUuid(), destPool.getName(), destPool.getUuid());
+            String msg = String.format("Cleaning up after hypervisor pool migration volumes for VM %s(%s)", vm.getInstanceName(), vm.getUuid());
             s_logger.debug(msg);
         }
-        setDestinationPoolAndReallocateNetwork(destPool, vm);
-        // OfflineVmwareMigration: don't set this to null or have another way to address the command; twice migrating will lead to an NPE
-        Long destPodId = destPool.getPodId();
-        Long vmPodId = vm.getPodIdToDeployIn();
-        if (destPodId == null || ! destPodId.equals(vmPodId)) {
+        StoragePool rootVolumePool = null;
+        if (MapUtils.isNotEmpty(volumeToPool)) {
+            for (Map.Entry<Volume, StoragePool> entry : volumeToPool.entrySet()) {
+                if (Type.ROOT.equals(entry.getKey().getVolumeType())) {
+                    rootVolumePool = entry.getValue();
+                    break;
+                }
+            }
+        }
+        setDestinationPoolAndReallocateNetwork(rootVolumePool, vm);
+        Long destClusterId = rootVolumePool != null ? rootVolumePool.getClusterId() : null;
+        if (destClusterId != null && !destClusterId.equals(sourceClusterId)) {
             if(isDebugEnabled) {
-                String msg = String.format("resetting lasHost for VM %s(%s) as pod (%s) is no good.", vm.getInstanceName(), vm.getUuid(), destPodId);
+                String msg = String.format("Resetting lastHost for VM %s(%s)", vm.getInstanceName(), vm.getUuid());
                 s_logger.debug(msg);
             }
 
             vm.setLastHostId(null);
-            vm.setPodIdToDeployIn(destPodId);
+            vm.setPodIdToDeployIn(rootVolumePool.getPodId());
             // OfflineVmwareMigration: a consecutive migration will fail probably (no host not pod)
         }// else keep last host set for this vm
-        markVolumesInPool(vm,destPool, hypervisorMigrationResults);
+        markVolumesInPool(vm, hypervisorMigrationResults);
         // OfflineVmwareMigration: deal with answers, if (hypervisorMigrationResults.length > 0)
         // OfflineVmwareMigration: iterate over the volumes for data updates
     }
 
-    private void markVolumesInPool(VMInstanceVO vm, StoragePool destPool, Answer[] hypervisorMigrationResults) {
+    private void markVolumesInPool(VMInstanceVO vm, Answer[] hypervisorMigrationResults) {
         MigrateVmToPoolAnswer relevantAnswer = null;
+        if (hypervisorMigrationResults.length == 1 && !hypervisorMigrationResults[0].getResult()) {
+            throw new CloudRuntimeException(String.format("VM ID: %s migration failed. %s", vm.getUuid(), hypervisorMigrationResults[0].getDetails()));
+        }
         for (Answer answer : hypervisorMigrationResults) {
             if (s_logger.isTraceEnabled()) {
-                s_logger.trace(String.format("received an %s: %s", answer.getClass().getSimpleName(), answer));
+                s_logger.trace(String.format("Received an %s: %s", answer.getClass().getSimpleName(), answer));
             }
             if (answer instanceof MigrateVmToPoolAnswer) {
                 relevantAnswer = (MigrateVmToPoolAnswer) answer;
             }
         }
         if (relevantAnswer == null) {
-            throw new CloudRuntimeException("no relevant migration results found");
+            throw new CloudRuntimeException("No relevant migration results found");
+        }
+        List<VolumeObjectTO> results = relevantAnswer.getVolumeTos();
+        if (results == null) {
+            results = new ArrayList<>();
         }
         List<VolumeVO> volumes = _volsDao.findUsableVolumesForInstance(vm.getId());
         if(s_logger.isDebugEnabled()) {
-            String msg = String.format("found %d volumes for VM %s(uuid:%s, id:%d)", volumes.size(), vm.getInstanceName(), vm.getUuid(), vm.getId());
+            String msg = String.format("Found %d volumes for VM %s(uuid:%s, id:%d)", results.size(), vm.getInstanceName(), vm.getUuid(), vm.getId());
             s_logger.debug(msg);
         }
-        for (VolumeObjectTO result : relevantAnswer.getVolumeTos() ) {
+        for (VolumeObjectTO result : results ) {
             if(s_logger.isDebugEnabled()) {
-                s_logger.debug(String.format("updating volume (%d) with path '%s' on pool '%d'", result.getId(), result.getPath(), destPool.getId()));
+                s_logger.debug(String.format("Updating volume (%d) with path '%s' on pool '%s'", result.getId(), result.getPath(), result.getDataStoreUuid()));
             }
             VolumeVO volume = _volsDao.findById(result.getId());
+            StoragePool pool = _storagePoolDao.findPoolByUUID(result.getDataStoreUuid());
+            if (volume == null || pool == null) {
+                continue;
+            }
             volume.setPath(result.getPath());
-            volume.setPoolId(destPool.getId());
+            volume.setPoolId(pool.getId());
             _volsDao.update(volume.getId(), volume);
         }
     }
 
-    private void migrateThroughHypervisorOrStorage(StoragePool destPool, VMInstanceVO vm) throws StorageUnavailableException, InsufficientCapacityException {
+    private Pair<Long, Long> findClusterAndHostIdForVm(VMInstanceVO vm) {
+        Long hostId = vm.getHostId();
+        Long clusterId = null;
+        // OfflineVmwareMigration: probably this is null when vm is stopped
+        if(hostId == null) {
+            hostId = vm.getLastHostId();
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug(String.format("host id is null, using last host id %d", hostId) );
+            }
+        }
+        if (hostId == null) {
+            List<VolumeVO> volumes = _volsDao.findByInstanceAndType(vm.getId(), Type.ROOT);
+            if (CollectionUtils.isNotEmpty(volumes)) {
+                VolumeVO rootVolume = volumes.get(0);
+                if (rootVolume.getPoolId() != null) {
+                    StoragePoolVO pool = _storagePoolDao.findById(rootVolume.getPoolId());
+                    if (pool != null && pool.getClusterId() != null) {
+                        clusterId = pool.getClusterId();
+                        List<HostVO> hosts = _hostDao.findHypervisorHostInCluster(pool.getClusterId());
+                        if (CollectionUtils.isNotEmpty(hosts)) {
+                            hostId = hosts.get(0).getId();
+                        }
+                    }
+                }
+            }
+        }
+        if (clusterId == null && hostId != null) {
+            HostVO host = _hostDao.findById(hostId);
+            if (host != null) {
+                clusterId = host.getClusterId();
+            }
+        }
+        return new Pair<>(clusterId, hostId);
+    }
+
+    private void migrateThroughHypervisorOrStorage(VMInstanceVO vm, Map<Volume, StoragePool> volumeToPool) throws StorageUnavailableException, InsufficientCapacityException {
         final VirtualMachineProfile profile = new VirtualMachineProfileImpl(vm);
-        final Long srchostId = vm.getHostId() != null ? vm.getHostId() : vm.getLastHostId();
-        final HostVO srcHost = _hostDao.findById(srchostId);
-        final Long srcClusterId = srcHost.getClusterId();
-        Answer[] hypervisorMigrationResults = attemptHypervisorMigration(destPool, vm);
+        Pair<Long, Long> vmClusterAndHost = findClusterAndHostIdForVm(vm);
+        final Long sourceClusterId = vmClusterAndHost.first();
+        final Long sourceHostId = vmClusterAndHost.second();
+        Answer[] hypervisorMigrationResults = attemptHypervisorMigration(vm, volumeToPool, sourceHostId);
         boolean migrationResult = false;
         if (hypervisorMigrationResults == null) {
             // OfflineVmwareMigration: if the HypervisorGuru can't do it, let the volume manager take care of it.
-            migrationResult = volumeMgr.storageMigration(profile, destPool);
+            migrationResult = volumeMgr.storageMigration(profile, volumeToPool);
             if (migrationResult) {
-                afterStorageMigrationCleanup(destPool, vm, srcHost, srcClusterId);
+                postStorageMigrationCleanup(vm, volumeToPool, _hostDao.findById(sourceHostId), sourceClusterId);
             } else {
                 s_logger.debug("Storage migration failed");
             }
         } else {
-            afterHypervisorMigrationCleanup(destPool, vm, srcHost, srcClusterId, hypervisorMigrationResults);
+            afterHypervisorMigrationCleanup(vm, volumeToPool, sourceClusterId, hypervisorMigrationResults);
         }
     }
 
-    private void preStorageMigrationStateCheck(StoragePool destPool, VMInstanceVO vm) {
-        if (destPool == null) {
-            throw new CloudRuntimeException("Unable to migrate vm: missing destination storage pool");
+    private Map<Volume, StoragePool> prepareVmStorageMigration(VMInstanceVO vm, Map<Long, Long> volumeToPool) {
+        Map<Volume, StoragePool> volumeToPoolMap = new HashMap<>();
+        if (MapUtils.isEmpty(volumeToPool)) {
+            throw new CloudRuntimeException("Unable to migrate vm: missing volume to pool mapping");
         }
-
-        checkDestinationForTags(destPool, vm);
+        Cluster cluster = null;
+        Long dataCenterId = null;
+        for (Map.Entry<Long, Long> entry: volumeToPool.entrySet()) {
+            StoragePool pool = _storagePoolDao.findById(entry.getValue());
+            if (pool.getClusterId() != null) {
+                cluster = _clusterDao.findById(pool.getClusterId());
+                break;
+            }
+            dataCenterId = pool.getDataCenterId();
+        }
+        Long podId = null;
+        Long clusterId = null;
+        if (cluster != null) {
+            dataCenterId = cluster.getDataCenterId();
+            podId = cluster.getPodId();
+            clusterId = cluster.getId();
+        }
+        if (dataCenterId == null) {
+            String msg = "Unable to migrate vm: failed to create deployment destination with given volume to pool map";
+            s_logger.debug(msg);
+            throw new CloudRuntimeException(msg);
+        }
+        final DataCenterDeployment destination = new DataCenterDeployment(dataCenterId, podId, clusterId, null, null, null);
+        // Create a map of which volume should go in which storage pool.
+        final VirtualMachineProfile profile = new VirtualMachineProfileImpl(vm);
+        volumeToPoolMap = createMappingVolumeAndStoragePool(profile, destination, volumeToPool);
         try {
             stateTransitTo(vm, Event.StorageMigrationRequested, null);
         } catch (final NoTransitionException e) {
@@ -2328,6 +2402,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             s_logger.debug(msg);
             throw new CloudRuntimeException(msg, e);
         }
+        return volumeToPoolMap;
     }
 
     private void checkDestinationForTags(StoragePool destPool, VMInstanceVO vm) {
@@ -2366,24 +2441,35 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
 
-    private void afterStorageMigrationCleanup(StoragePool destPool, VMInstanceVO vm, HostVO srcHost, Long srcClusterId) throws InsufficientCapacityException {
-        setDestinationPoolAndReallocateNetwork(destPool, vm);
+    private void postStorageMigrationCleanup(VMInstanceVO vm, Map<Volume, StoragePool> volumeToPool, HostVO srcHost, Long srcClusterId) throws InsufficientCapacityException {
+        StoragePool rootVolumePool = null;
+        if (MapUtils.isNotEmpty(volumeToPool)) {
+            for (Map.Entry<Volume, StoragePool> entry : volumeToPool.entrySet()) {
+                if (Type.ROOT.equals(entry.getKey().getVolumeType())) {
+                    rootVolumePool = entry.getValue();
+                    break;
+                }
+            }
+        }
+        setDestinationPoolAndReallocateNetwork(rootVolumePool, vm);
 
         //when start the vm next time, don;'t look at last_host_id, only choose the host based on volume/storage pool
         vm.setLastHostId(null);
-        vm.setPodIdToDeployIn(destPool.getPodId());
+        if (rootVolumePool != null) {
+            vm.setPodIdToDeployIn(rootVolumePool.getPodId());
+        }
 
         // If VM was cold migrated between clusters belonging to two different VMware DCs,
         // unregister the VM from the source host and cleanup the associated VM files.
         if (vm.getHypervisorType().equals(HypervisorType.VMware)) {
-            afterStorageMigrationVmwareVMcleanup(destPool, vm, srcHost, srcClusterId);
+            afterStorageMigrationVmwareVMcleanup(rootVolumePool, vm, srcHost, srcClusterId);
         }
     }
 
     private void setDestinationPoolAndReallocateNetwork(StoragePool destPool, VMInstanceVO vm) throws InsufficientCapacityException {
         //if the vm is migrated to different pod in basic mode, need to reallocate ip
 
-        if (destPool.getPodId() != null && !destPool.getPodId().equals(vm.getPodIdToDeployIn())) {
+        if (destPool != null && destPool.getPodId() != null && !destPool.getPodId().equals(vm.getPodIdToDeployIn())) {
             if (s_logger.isDebugEnabled()) {
                 String msg = String.format("as the pod for vm %s has changed we are reallocating its network", vm.getInstanceName());
                 s_logger.debug(msg);
@@ -2397,7 +2483,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     private void afterStorageMigrationVmwareVMcleanup(StoragePool destPool, VMInstanceVO vm, HostVO srcHost, Long srcClusterId) {
         // OfflineVmwareMigration: this should only happen on storage migration, else the guru would already have issued the command
         final Long destClusterId = destPool.getClusterId();
-        if (srcClusterId != null && destClusterId != null && ! srcClusterId.equals(destClusterId)) {
+        if (srcClusterId != null && destClusterId != null && ! srcClusterId.equals(destClusterId) && srcHost != null) {
             final String srcDcName = _clusterDetailsDao.getVmwareDcName(srcClusterId);
             final String destDcName = _clusterDetailsDao.getVmwareDcName(destClusterId);
             if (srcDcName != null && destDcName != null && !srcDcName.equals(destDcName)) {
@@ -2661,13 +2747,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
     /**
      * We create the mapping of volumes and storage pool to migrate the VMs according to the information sent by the user.
-     * If the user did not enter a complete mapping, the volumes that were left behind will be auto mapped using {@link #createStoragePoolMappingsForVolumes(VirtualMachineProfile, Host, Map, List)}
+     * If the user did not enter a complete mapping, the volumes that were left behind will be auto mapped using {@link #createStoragePoolMappingsForVolumes(VirtualMachineProfile, DataCenterDeployment, Map, List)}
      */
     protected Map<Volume, StoragePool> createMappingVolumeAndStoragePool(VirtualMachineProfile profile, Host targetHost, Map<Long, Long> userDefinedMapOfVolumesAndStoragePools) {
+        return createMappingVolumeAndStoragePool(profile,
+                new DataCenterDeployment(targetHost.getDataCenterId(), targetHost.getPodId(), targetHost.getClusterId(), targetHost.getId(), null, null),
+                userDefinedMapOfVolumesAndStoragePools);
+    }
+
+    private Map<Volume, StoragePool> createMappingVolumeAndStoragePool(final VirtualMachineProfile profile, final DataCenterDeployment plan, final Map<Long, Long> userDefinedMapOfVolumesAndStoragePools) {
+        Host targetHost = null;
+        if (plan.getHostId() != null) {
+            targetHost = _hostDao.findById(plan.getHostId());
+        }
         Map<Volume, StoragePool> volumeToPoolObjectMap = buildMapUsingUserInformation(profile, targetHost, userDefinedMapOfVolumesAndStoragePools);
 
         List<Volume> volumesNotMapped = findVolumesThatWereNotMappedByTheUser(profile, volumeToPoolObjectMap);
-        createStoragePoolMappingsForVolumes(profile, targetHost, volumeToPoolObjectMap, volumesNotMapped);
+        createStoragePoolMappingsForVolumes(profile, plan, volumeToPoolObjectMap, volumesNotMapped);
         return volumeToPoolObjectMap;
     }
 
@@ -2702,7 +2798,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             StoragePoolVO currentPool = _storagePoolDao.findById(volume.getPoolId());
 
             executeManagedStorageChecksWhenTargetStoragePoolProvided(currentPool, volume, targetPool);
-            if (_poolHostDao.findByPoolHost(targetPool.getId(), targetHost.getId()) == null) {
+            if (targetHost != null && _poolHostDao.findByPoolHost(targetPool.getId(), targetHost.getId()) == null) {
                 throw new CloudRuntimeException(
                         String.format("Cannot migrate the volume [%s] to the storage pool [%s] while migrating VM [%s] to target host [%s]. The host does not have access to the storage pool entered.",
                                 volume.getUuid(), targetPool.getUuid(), profile.getUuid(), targetHost.getUuid()));
@@ -2737,13 +2833,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
      * For each one of the volumes we will map it to a storage pool that is available via the target host.
      * An exception is thrown if we cannot find a storage pool that is accessible in the target host to migrate the volume to.
      */
-    protected void createStoragePoolMappingsForVolumes(VirtualMachineProfile profile, Host targetHost, Map<Volume, StoragePool> volumeToPoolObjectMap, List<Volume> allVolumes) {
+    protected void createStoragePoolMappingsForVolumes(VirtualMachineProfile profile, DataCenterDeployment plan, Map<Volume, StoragePool> volumeToPoolObjectMap, List<Volume> allVolumes) {
         for (Volume volume : allVolumes) {
             StoragePoolVO currentPool = _storagePoolDao.findById(volume.getPoolId());
 
+            Host targetHost = null;
+            if (plan.getHostId() != null) {
+                targetHost = _hostDao.findById(plan.getHostId());
+            }
             executeManagedStorageChecksWhenTargetStoragePoolNotProvided(targetHost, currentPool, volume);
-            if (ScopeType.HOST.equals(currentPool.getScope()) || isStorageCrossClusterMigration(targetHost, currentPool)) {
-                createVolumeToStoragePoolMappingIfPossible(profile, targetHost, volumeToPoolObjectMap, volume, currentPool);
+            if (ScopeType.HOST.equals(currentPool.getScope()) || isStorageCrossClusterMigration(plan.getClusterId(), currentPool)) {
+                createVolumeToStoragePoolMappingIfPossible(profile, plan, volumeToPoolObjectMap, volume, currentPool);
             } else {
                 volumeToPoolObjectMap.put(volume, currentPool);
             }
@@ -2761,17 +2861,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         if (!currentPool.isManaged()) {
             return;
         }
-        if (_poolHostDao.findByPoolHost(currentPool.getId(), targetHost.getId()) == null) {
+        if (targetHost != null && _poolHostDao.findByPoolHost(currentPool.getId(), targetHost.getId()) == null) {
             throw new CloudRuntimeException(String.format("The target host does not have access to the volume's managed storage pool. [volumeId=%s, storageId=%s, targetHostId=%s].", volume.getUuid(),
                     currentPool.getUuid(), targetHost.getUuid()));
         }
     }
 
     /**
-     *  Return true if the VM migration is a cross cluster migration. To execute that, we check if the volume current storage pool cluster is different from the target host cluster.
+     *  Return true if the VM migration is a cross cluster migration. To execute that, we check if the volume current storage pool cluster is different from the target cluster.
      */
-    protected boolean isStorageCrossClusterMigration(Host targetHost, StoragePoolVO currentPool) {
-        return ScopeType.CLUSTER.equals(currentPool.getScope()) && currentPool.getClusterId() != targetHost.getClusterId();
+    protected boolean isStorageCrossClusterMigration(Long clusterId, StoragePoolVO currentPool) {
+        return clusterId != null && ScopeType.CLUSTER.equals(currentPool.getScope()) && !currentPool.getClusterId().equals(clusterId);
     }
 
     /**
@@ -2783,37 +2883,44 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
      *
      * Side note: this method should only be called if the volume is on local storage or if we are executing a cross cluster migration.
      */
-    protected void createVolumeToStoragePoolMappingIfPossible(VirtualMachineProfile profile, Host targetHost, Map<Volume, StoragePool> volumeToPoolObjectMap, Volume volume,
+    protected void createVolumeToStoragePoolMappingIfPossible(VirtualMachineProfile profile, DataCenterDeployment plan, Map<Volume, StoragePool> volumeToPoolObjectMap, Volume volume,
             StoragePoolVO currentPool) {
-        List<StoragePool> storagePoolList = getCandidateStoragePoolsToMigrateLocalVolume(profile, targetHost, volume);
+        List<StoragePool> storagePoolList = getCandidateStoragePoolsToMigrateLocalVolume(profile, plan, volume);
 
         if (CollectionUtils.isEmpty(storagePoolList)) {
-            throw new CloudRuntimeException(String.format("There is not storage pools available at the target host [%s] to migrate volume [%s]", targetHost.getUuid(), volume.getUuid()));
+            String msg;
+            if (plan.getHostId() != null) {
+                Host targetHost = _hostDao.findById(plan.getHostId());
+                msg = String.format("There are no storage pools available at the target host [%s] to migrate volume [%s]", targetHost.getUuid(), volume.getUuid());
+            } else {
+                Cluster targetCluster = _clusterDao.findById(plan.getClusterId());
+                msg = String.format("There are no storage pools available in the target cluster [%s] to migrate volume [%s]", targetCluster.getUuid(), volume.getUuid());
+            }
+            throw new CloudRuntimeException(msg);
         }
 
         Collections.shuffle(storagePoolList);
-        boolean canTargetHostAccessVolumeCurrentStoragePool = false;
+        boolean candidatePoolsListContainsVolumeCurrentStoragePool = false;
         for (StoragePool storagePool : storagePoolList) {
             if (storagePool.getId() == currentPool.getId()) {
-                canTargetHostAccessVolumeCurrentStoragePool = true;
+                candidatePoolsListContainsVolumeCurrentStoragePool = true;
                 break;
             }
 
         }
-        if (!canTargetHostAccessVolumeCurrentStoragePool) {
+        if (!candidatePoolsListContainsVolumeCurrentStoragePool) {
             volumeToPoolObjectMap.put(volume, _storagePoolDao.findByUuid(storagePoolList.get(0).getUuid()));
         }
     }
 
     /**
-     * We use {@link StoragePoolAllocator} objects to find storage pools connected to the targetHost where we would be able to allocate the given volume.
+     * We use {@link StoragePoolAllocator} objects to find storage pools for given DataCenterDeployment where we would be able to allocate the given volume.
      */
-    protected List<StoragePool> getCandidateStoragePoolsToMigrateLocalVolume(VirtualMachineProfile profile, Host targetHost, Volume volume) {
+    protected List<StoragePool> getCandidateStoragePoolsToMigrateLocalVolume(VirtualMachineProfile profile, DataCenterDeployment plan, Volume volume) {
         List<StoragePool> poolList = new ArrayList<>();
 
         DiskOfferingVO diskOffering = _diskOfferingDao.findById(volume.getDiskOfferingId());
         DiskProfile diskProfile = new DiskProfile(volume, diskOffering, profile.getHypervisorType());
-        DataCenterDeployment plan = new DataCenterDeployment(targetHost.getDataCenterId(), targetHost.getPodId(), targetHost.getClusterId(), targetHost.getId(), null, null);
         ExcludeList avoid = new ExcludeList();
 
         StoragePoolVO volumeStoragePool = _storagePoolDao.findById(volume.getPoolId());
@@ -2826,7 +2933,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                 continue;
             }
             for (StoragePool pool : poolListFromAllocator) {
-                if (pool.isLocal() || isStorageCrossClusterMigration(targetHost, volumeStoragePool)) {
+                if (pool.isLocal() || isStorageCrossClusterMigration(plan.getClusterId(), volumeStoragePool)) {
                     poolList.add(pool);
                 }
             }
@@ -2964,7 +3071,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
             Nic defaultNic = _networkModel.getDefaultNic(vm.getId());
 
-            if (defaultNic != null) {
+            if (defaultNic != null && VirtualMachine.Type.User.equals(vm.getType())) {
                 UserVmVO userVm = _userVmDao.findById(vm.getId());
                 Map<String, String> details = userVmDetailsDao.listDetailsKeyPairs(vm.getId());
                 userVm.setDetails(details);
@@ -5243,12 +5350,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     }
 
     public Outcome<VirtualMachine> migrateVmStorageThroughJobQueue(
-            final String vmUuid, final StoragePool destPool) {
+            final String vmUuid, final Map<Long, Long> volumeToPool) {
 
         final CallContext context = CallContext.current();
         final User user = context.getCallingUser();
         final Account account = context.getCallingAccount();
 
+        Collection<Long> poolIds = volumeToPool.values();
+        Set<Long> uniquePoolIds = new HashSet<>(poolIds);
+        for (Long poolId : uniquePoolIds) {
+            StoragePoolVO pool = _storagePoolDao.findById(poolId);
+            checkConcurrentJobsPerDatastoreThreshhold(pool);
+        }
+
         final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
 
         final List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
@@ -5274,7 +5388,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
             // save work context info (there are some duplications)
             final VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
-                    VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId());
+                    VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, volumeToPool);
             workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 
             _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
@@ -5620,8 +5734,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
             s_logger.info("Unable to find vm " + work.getVmId());
         }
         assert vm != null;
-        final StoragePool pool = (PrimaryDataStoreInfo)dataStoreMgr.getPrimaryDataStore(work.getDestStoragePoolId());
-        orchestrateStorageMigration(vm.getUuid(), pool);
+        orchestrateStorageMigration(vm.getUuid(), work.getVolumeToPool());
 
         return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
     }
diff --git a/engine/orchestration/src/main/java/com/cloud/vm/VmWorkStorageMigration.java b/engine/orchestration/src/main/java/com/cloud/vm/VmWorkStorageMigration.java
index 1d7d55e..07e8549 100644
--- a/engine/orchestration/src/main/java/com/cloud/vm/VmWorkStorageMigration.java
+++ b/engine/orchestration/src/main/java/com/cloud/vm/VmWorkStorageMigration.java
@@ -16,18 +16,20 @@
 // under the License.
 package com.cloud.vm;
 
+import java.util.Map;
+
 public class VmWorkStorageMigration extends VmWork {
     private static final long serialVersionUID = -8677979691741157474L;
 
-    Long destPoolId;
+    Map<Long, Long> volumeToPool;
 
-    public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName, Long destPoolId) {
+    public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName, Map <Long, Long> volumeToPool) {
         super(userId, accountId, vmId, handlerName);
 
-        this.destPoolId = destPoolId;
+        this.volumeToPool = volumeToPool;
     }
 
-    public Long getDestStoragePoolId() {
-        return destPoolId;
+    public Map<Long, Long> getVolumeToPool() {
+        return volumeToPool;
     }
 }
diff --git a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
index e4cb893..8c97b47 100644
--- a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
+++ b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cloudstack.engine.orchestration;
 
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -80,6 +82,7 @@ import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.log4j.Logger;
 
@@ -152,7 +155,6 @@ import com.cloud.vm.VmWorkSerializer;
 import com.cloud.vm.VmWorkTakeVolumeSnapshot;
 import com.cloud.vm.dao.UserVmCloneSettingDao;
 import com.cloud.vm.dao.UserVmDao;
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
 
 import static com.cloud.storage.resource.StorageProcessor.REQUEST_TEMPLATE_RELOAD;
 
@@ -1193,35 +1195,32 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
     }
 
     @Override
-    public boolean storageMigration(VirtualMachineProfile vm, StoragePool destPool) throws StorageUnavailableException {
-        List<VolumeVO> vols = _volsDao.findUsableVolumesForInstance(vm.getId());
-        List<Volume> volumesNeedToMigrate = new ArrayList<Volume>();
-
-        for (VolumeVO volume : vols) {
+    public boolean storageMigration(VirtualMachineProfile vm, Map<Volume, StoragePool> volumeToPool) throws StorageUnavailableException {
+        Map<Volume, StoragePool> volumeStoragePoolMap = new HashMap<>();
+        for (Map.Entry<Volume, StoragePool> entry : volumeToPool.entrySet()) {
+            Volume volume = entry.getKey();
+            StoragePool pool = entry.getValue();
             if (volume.getState() != Volume.State.Ready) {
                 s_logger.debug("volume: " + volume.getId() + " is in " + volume.getState() + " state");
                 throw new CloudRuntimeException("volume: " + volume.getId() + " is in " + volume.getState() + " state");
             }
 
-            if (volume.getPoolId() == destPool.getId()) {
-                s_logger.debug("volume: " + volume.getId() + " is on the same storage pool: " + destPool.getId());
+            if (volume.getPoolId() == pool.getId()) {
+                s_logger.debug("volume: " + volume.getId() + " is on the same storage pool: " + pool.getId());
                 continue;
             }
-
-            volumesNeedToMigrate.add(volume);
+            volumeStoragePoolMap.put(volume, volumeToPool.get(volume));
         }
 
-        if (volumesNeedToMigrate.isEmpty()) {
+        if (MapUtils.isEmpty(volumeStoragePoolMap)) {
             s_logger.debug("No volume need to be migrated");
             return true;
         }
-
-        // OfflineVmwareMigration: in case we can (vmware?) don't itterate over volumes but tell the hypervisor to do the thing
         if (s_logger.isDebugEnabled()) {
             s_logger.debug("Offline vm migration was not done up the stack in VirtualMachineManager so trying here.");
         }
-        for (Volume vol : volumesNeedToMigrate) {
-            Volume result = migrateVolume(vol, destPool);
+        for (Map.Entry<Volume, StoragePool> entry : volumeStoragePoolMap.entrySet()) {
+            Volume result = migrateVolume(entry.getKey(), entry.getValue());
             if (result == null) {
                 return false;
             }
diff --git a/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java b/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java
index 1725a41..d2d5fc8 100644
--- a/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java
+++ b/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java
@@ -49,6 +49,7 @@ import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Command;
 import com.cloud.agent.api.StopAnswer;
 import com.cloud.agent.api.StopCommand;
+import com.cloud.deploy.DataCenterDeployment;
 import com.cloud.deploy.DeploymentPlan;
 import com.cloud.deploy.DeploymentPlanner;
 import com.cloud.deploy.DeploymentPlanner.ExcludeList;
@@ -96,6 +97,8 @@ public class VirtualMachineManagerImplTest {
     private long hostMockId = 1L;
     @Mock
     private HostVO hostMock;
+    @Mock
+    private DataCenterDeployment dataCenterDeploymentMock;
 
     @Mock
     private VirtualMachineProfile virtualMachineProfileMock;
@@ -127,6 +130,7 @@ public class VirtualMachineManagerImplTest {
         when(vmInstanceMock.getHostId()).thenReturn(2L);
         when(vmInstanceMock.getType()).thenReturn(VirtualMachine.Type.User);
         when(hostMock.getId()).thenReturn(hostMockId);
+        when(dataCenterDeploymentMock.getHostId()).thenReturn(hostMockId);
 
         Mockito.doReturn(vmInstanceVoMockId).when(virtualMachineProfileMock).getId();
 
@@ -227,33 +231,30 @@ public class VirtualMachineManagerImplTest {
 
     @Test
     public void isStorageCrossClusterMigrationTestStorageTypeEqualsCluster() {
-        Mockito.doReturn(1L).when(hostMock).getClusterId();
         Mockito.doReturn(2L).when(storagePoolVoMock).getClusterId();
         Mockito.doReturn(ScopeType.CLUSTER).when(storagePoolVoMock).getScope();
 
-        boolean returnedValue = virtualMachineManagerImpl.isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
+        boolean returnedValue = virtualMachineManagerImpl.isStorageCrossClusterMigration(1L, storagePoolVoMock);
 
         Assert.assertTrue(returnedValue);
     }
 
     @Test
     public void isStorageCrossClusterMigrationTestStorageSameCluster() {
-        Mockito.doReturn(1L).when(hostMock).getClusterId();
         Mockito.doReturn(1L).when(storagePoolVoMock).getClusterId();
         Mockito.doReturn(ScopeType.CLUSTER).when(storagePoolVoMock).getScope();
 
-        boolean returnedValue = virtualMachineManagerImpl.isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
+        boolean returnedValue = virtualMachineManagerImpl.isStorageCrossClusterMigration(1L, storagePoolVoMock);
 
         assertFalse(returnedValue);
     }
 
     @Test
     public void isStorageCrossClusterMigrationTestStorageTypeEqualsZone() {
-        Mockito.doReturn(1L).when(hostMock).getClusterId();
         Mockito.doReturn(2L).when(storagePoolVoMock).getClusterId();
         Mockito.doReturn(ScopeType.ZONE).when(storagePoolVoMock).getScope();
 
-        boolean returnedValue = virtualMachineManagerImpl.isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
+        boolean returnedValue = virtualMachineManagerImpl.isStorageCrossClusterMigration(1L, storagePoolVoMock);
 
         assertFalse(returnedValue);
     }
@@ -384,7 +385,7 @@ public class VirtualMachineManagerImplTest {
         Mockito.doReturn(poolListMock).when(storagePoolAllocatorMock).allocateToPool(Mockito.any(DiskProfile.class), Mockito.any(VirtualMachineProfile.class), Mockito.any(DeploymentPlan.class),
                 Mockito.any(ExcludeList.class), Mockito.eq(StoragePoolAllocator.RETURN_UPTO_ALL));
 
-        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
         Assert.assertEquals(1, poolList.size());
         Assert.assertEquals(storagePoolVoMock, poolList.get(0));
@@ -402,8 +403,8 @@ public class VirtualMachineManagerImplTest {
         Mockito.doReturn(poolListMock).when(storagePoolAllocatorMock).allocateToPool(Mockito.any(DiskProfile.class), Mockito.any(VirtualMachineProfile.class), Mockito.any(DeploymentPlan.class),
                 Mockito.any(ExcludeList.class), Mockito.eq(StoragePoolAllocator.RETURN_UPTO_ALL));
 
-        Mockito.doReturn(true).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
-        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        Mockito.doReturn(true).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
+        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
         Assert.assertEquals(1, poolList.size());
         Assert.assertEquals(storagePoolVoMock, poolList.get(0));
@@ -421,8 +422,8 @@ public class VirtualMachineManagerImplTest {
         Mockito.doReturn(poolListMock).when(storagePoolAllocatorMock).allocateToPool(Mockito.any(DiskProfile.class), Mockito.any(VirtualMachineProfile.class), Mockito.any(DeploymentPlan.class),
                 Mockito.any(ExcludeList.class), Mockito.eq(StoragePoolAllocator.RETURN_UPTO_ALL));
 
-        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
-        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
+        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
         Assert.assertTrue(poolList.isEmpty());
     }
@@ -455,8 +456,8 @@ public class VirtualMachineManagerImplTest {
         Mockito.doReturn(new ArrayList<>()).when(storagePoolAllocatorMock3).allocateToPool(Mockito.any(DiskProfile.class), Mockito.any(VirtualMachineProfile.class), Mockito.any(DeploymentPlan.class),
                 Mockito.any(ExcludeList.class), Mockito.eq(StoragePoolAllocator.RETURN_UPTO_ALL));
 
-        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
-        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
+        List<StoragePool> poolList = virtualMachineManagerImpl.getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
         Assert.assertTrue(poolList.isEmpty());
 
@@ -470,9 +471,9 @@ public class VirtualMachineManagerImplTest {
 
     @Test(expected = CloudRuntimeException.class)
     public void createVolumeToStoragePoolMappingIfPossibleTestNotStoragePoolsAvailable() {
-        Mockito.doReturn(null).when(virtualMachineManagerImpl).getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        Mockito.doReturn(null).when(virtualMachineManagerImpl).getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
-        virtualMachineManagerImpl.createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, new HashMap<>(), volumeVoMock, storagePoolVoMock);
+        virtualMachineManagerImpl.createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, new HashMap<>(), volumeVoMock, storagePoolVoMock);
     }
 
     @Test
@@ -480,10 +481,10 @@ public class VirtualMachineManagerImplTest {
         List<StoragePool> storagePoolList = new ArrayList<>();
         storagePoolList.add(storagePoolVoMock);
 
-        Mockito.doReturn(storagePoolList).when(virtualMachineManagerImpl).getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        Mockito.doReturn(storagePoolList).when(virtualMachineManagerImpl).getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
         HashMap<Volume, StoragePool> volumeToPoolObjectMap = new HashMap<>();
-        virtualMachineManagerImpl.createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
+        virtualMachineManagerImpl.createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
 
         Assert.assertTrue(volumeToPoolObjectMap.isEmpty());
     }
@@ -498,10 +499,10 @@ public class VirtualMachineManagerImplTest {
         List<StoragePool> storagePoolList = new ArrayList<>();
         storagePoolList.add(storagePoolMockOther);
 
-        Mockito.doReturn(storagePoolList).when(virtualMachineManagerImpl).getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, hostMock, volumeVoMock);
+        Mockito.doReturn(storagePoolList).when(virtualMachineManagerImpl).getCandidateStoragePoolsToMigrateLocalVolume(virtualMachineProfileMock, dataCenterDeploymentMock, volumeVoMock);
 
         HashMap<Volume, StoragePool> volumeToPoolObjectMap = new HashMap<>();
-        virtualMachineManagerImpl.createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
+        virtualMachineManagerImpl.createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
 
         assertFalse(volumeToPoolObjectMap.isEmpty());
         Assert.assertEquals(storagePoolMockOther, volumeToPoolObjectMap.get(volumeVoMock));
@@ -516,14 +517,14 @@ public class VirtualMachineManagerImplTest {
 
         Mockito.doReturn(ScopeType.HOST).when(storagePoolVoMock).getScope();
         Mockito.doNothing().when(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock,
+        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock,
                 storagePoolVoMock);
 
-        virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, allVolumes);
+        virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, allVolumes);
 
         Assert.assertTrue(volumeToPoolObjectMap.isEmpty());
         Mockito.verify(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.verify(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
+        Mockito.verify(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
     }
 
     @Test
@@ -535,15 +536,15 @@ public class VirtualMachineManagerImplTest {
 
         Mockito.doReturn(ScopeType.CLUSTER).when(storagePoolVoMock).getScope();
         Mockito.doNothing().when(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
-        Mockito.doReturn(true).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
+        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
+        Mockito.doReturn(true).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
 
-        virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, allVolumes);
+        virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, allVolumes);
 
         Assert.assertTrue(volumeToPoolObjectMap.isEmpty());
         Mockito.verify(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.verify(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
-        Mockito.verify(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
+        Mockito.verify(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
+        Mockito.verify(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
     }
 
     @Test
@@ -555,17 +556,17 @@ public class VirtualMachineManagerImplTest {
 
         Mockito.doReturn(ScopeType.CLUSTER).when(storagePoolVoMock).getScope();
         Mockito.doNothing().when(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
-        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
+        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
+        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
 
-        virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, allVolumes);
+        virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, allVolumes);
 
         assertFalse(volumeToPoolObjectMap.isEmpty());
         Assert.assertEquals(storagePoolVoMock, volumeToPoolObjectMap.get(volumeVoMock));
 
         Mockito.verify(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.verify(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMock, storagePoolVoMock);
-        Mockito.verify(virtualMachineManagerImpl, Mockito.times(0)).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumeVoMock,
+        Mockito.verify(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
+        Mockito.verify(virtualMachineManagerImpl, Mockito.times(0)).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock,
                 storagePoolVoMock);
     }
 
@@ -578,7 +579,7 @@ public class VirtualMachineManagerImplTest {
                 Mockito.anyMapOf(Long.class, Long.class));
 
         Mockito.doReturn(volumesNotMapped).when(virtualMachineManagerImpl).findVolumesThatWereNotMappedByTheUser(virtualMachineProfileMock, volumeToPoolObjectMap);
-        Mockito.doNothing().when(virtualMachineManagerImpl).createStoragePoolMappingsForVolumes(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumesNotMapped);
+        Mockito.doNothing().when(virtualMachineManagerImpl).createStoragePoolMappingsForVolumes(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumesNotMapped);
 
         Map<Volume, StoragePool> mappingVolumeAndStoragePool = virtualMachineManagerImpl.createMappingVolumeAndStoragePool(virtualMachineProfileMock, hostMock, new HashMap<>());
 
@@ -587,7 +588,7 @@ public class VirtualMachineManagerImplTest {
         InOrder inOrder = Mockito.inOrder(virtualMachineManagerImpl);
         inOrder.verify(virtualMachineManagerImpl).buildMapUsingUserInformation(Mockito.eq(virtualMachineProfileMock), Mockito.eq(hostMock), Mockito.anyMapOf(Long.class, Long.class));
         inOrder.verify(virtualMachineManagerImpl).findVolumesThatWereNotMappedByTheUser(virtualMachineProfileMock, volumeToPoolObjectMap);
-        inOrder.verify(virtualMachineManagerImpl).createStoragePoolMappingsForVolumes(virtualMachineProfileMock, hostMock, volumeToPoolObjectMap, volumesNotMapped);
+        inOrder.verify(virtualMachineManagerImpl).createStoragePoolMappingsForVolumes(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumesNotMapped);
     }
 
     @Test
diff --git a/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDao.java b/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDao.java
index 83c32b1..45c8806 100644
--- a/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDao.java
+++ b/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDao.java
@@ -37,4 +37,6 @@ public interface HypervisorCapabilitiesDao extends GenericDao<HypervisorCapabili
     Boolean isVmSnapshotEnabled(HypervisorType hypervisorType, String hypervisorVersion);
 
     List<HypervisorType> getHypervisorsWithDefaultEntries();
+
+    Boolean isStorageMotionSupported(HypervisorType hypervisorType, String hypervisorVersion);
 }
diff --git a/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDaoImpl.java b/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDaoImpl.java
index 5cecff2..09b3974 100644
--- a/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/hypervisor/dao/HypervisorCapabilitiesDaoImpl.java
@@ -119,4 +119,21 @@ public class HypervisorCapabilitiesDaoImpl extends GenericDaoBase<HypervisorCapa
         }
         return hvs;
     }
+
+    @Override
+    public Boolean isStorageMotionSupported(HypervisorType hypervisorType, String hypervisorVersion) {
+        HypervisorCapabilitiesVO hostCapabilities = findByHypervisorTypeAndVersion(hypervisorType, hypervisorVersion);
+        if (hostCapabilities == null && HypervisorType.KVM.equals(hypervisorType)) {
+            List<HypervisorCapabilitiesVO> hypervisorCapabilitiesList = listAllByHypervisorType(HypervisorType.KVM);
+            if (hypervisorCapabilitiesList != null) {
+                for (HypervisorCapabilitiesVO hypervisorCapabilities : hypervisorCapabilitiesList) {
+                    if (hypervisorCapabilities.isStorageMotionSupported()) {
+                        hostCapabilities = hypervisorCapabilities;
+                        break;
+                    }
+                }
+            }
+        }
+        return hostCapabilities != null && hostCapabilities.isStorageMotionSupported();
+    }
 }
diff --git a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/guru/VMwareGuru.java b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/guru/VMwareGuru.java
index d48a5d9..a592126 100644
--- a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/guru/VMwareGuru.java
+++ b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/guru/VMwareGuru.java
@@ -16,6 +16,8 @@
 // under the License.
 package com.cloud.hypervisor.guru;
 
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -62,6 +64,7 @@ import com.cloud.agent.api.to.DataObjectType;
 import com.cloud.agent.api.to.DataStoreTO;
 import com.cloud.agent.api.to.DataTO;
 import com.cloud.agent.api.to.DiskTO;
+import com.cloud.agent.api.to.StorageFilerTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
 import com.cloud.agent.api.to.VolumeTO;
 import com.cloud.cluster.ClusterManager;
@@ -149,8 +152,6 @@ import com.vmware.vim25.VirtualEthernetCardNetworkBackingInfo;
 import com.vmware.vim25.VirtualMachineConfigSummary;
 import com.vmware.vim25.VirtualMachineRuntimeInfo;
 
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
-
 public class VMwareGuru extends HypervisorGuruBase implements HypervisorGuru, Configurable {
     private static final Logger s_logger = Logger.getLogger(VMwareGuru.class);
 
@@ -209,16 +210,35 @@ public class VMwareGuru extends HypervisorGuruBase implements HypervisorGuru, Co
         return vmwareVmImplementer.implement(vm, toVirtualMachineTO(vm), getClusterId(vm.getId()));
     }
 
-    long getClusterId(long vmId) {
-        long clusterId;
-        Long hostId;
-
-        hostId = _vmDao.findById(vmId).getHostId();
-        if (hostId == null) {
+    Long getClusterId(long vmId) {
+        Long clusterId = null;
+        Long hostId = null;
+        VMInstanceVO vm = _vmDao.findById(vmId);
+        if (vm != null) {
+            hostId = _vmDao.findById(vmId).getHostId();
+        }
+        if (vm != null && hostId == null) {
             // If VM is in stopped state then hostId would be undefined. Hence read last host's Id instead.
             hostId = _vmDao.findById(vmId).getLastHostId();
         }
-        clusterId = _hostDao.findById(hostId).getClusterId();
+        HostVO host = null;
+        if (hostId != null) {
+            host = _hostDao.findById(hostId);
+        }
+        if (host != null) {
+            clusterId = host.getClusterId();
+        } else {
+            List<VolumeVO> volumes = _volumeDao.findByInstanceAndType(vmId, Volume.Type.ROOT);
+            if (CollectionUtils.isNotEmpty(volumes)) {
+                VolumeVO rootVolume = volumes.get(0);
+                if (rootVolume.getPoolId() != null) {
+                    StoragePoolVO pool = _storagePoolDao.findById(rootVolume.getPoolId());
+                    if (pool != null && pool.getClusterId() != null) {
+                        clusterId = pool.getClusterId();
+                    }
+                }
+            }
+        }
 
         return clusterId;
     }
@@ -418,9 +438,11 @@ public class VMwareGuru extends HypervisorGuruBase implements HypervisorGuru, Co
 
     @Override public Map<String, String> getClusterSettings(long vmId) {
         Map<String, String> details = new HashMap<String, String>();
-        long clusterId = getClusterId(vmId);
-        details.put(VmwareReserveCpu.key(), VmwareReserveCpu.valueIn(clusterId).toString());
-        details.put(VmwareReserveMemory.key(), VmwareReserveMemory.valueIn(clusterId).toString());
+        Long clusterId = getClusterId(vmId);
+        if (clusterId != null) {
+            details.put(VmwareReserveCpu.key(), VmwareReserveCpu.valueIn(clusterId).toString());
+            details.put(VmwareReserveMemory.key(), VmwareReserveMemory.valueIn(clusterId).toString());
+        }
         return details;
     }
 
@@ -1056,24 +1078,46 @@ public class VMwareGuru extends HypervisorGuruBase implements HypervisorGuru, Co
         return null;
     }
 
-    @Override public List<Command> finalizeMigrate(VirtualMachine vm, StoragePool destination) {
+    @Override public List<Command> finalizeMigrate(VirtualMachine vm, Map<Volume, StoragePool> volumeToPool) {
         List<Command> commands = new ArrayList<Command>();
 
         // OfflineVmwareMigration: specialised migration command
-        List<VolumeVO> volumes = _volumeDao.findByInstance(vm.getId());
         List<VolumeTO> vols = new ArrayList<>();
-        for (Volume volume : volumes) {
-            VolumeTO vol = new VolumeTO(volume, destination);
-            vols.add(vol);
+        List<Pair<VolumeTO, StorageFilerTO>> volumeToFilerTo = new ArrayList<Pair<VolumeTO, StorageFilerTO>>();
+        Long poolClusterId = null;
+        Host hostInTargetCluster = null;
+        for (Map.Entry<Volume, StoragePool> entry : volumeToPool.entrySet()) {
+            Volume volume = entry.getKey();
+            StoragePool pool = entry.getValue();
+            VolumeTO volumeTo = new VolumeTO(volume, _storagePoolDao.findById(pool.getId()));
+            StorageFilerTO filerTo = new StorageFilerTO(pool);
+            if (pool.getClusterId() != null) {
+                poolClusterId = pool.getClusterId();
+            }
+            volumeToFilerTo.add(new Pair<VolumeTO, StorageFilerTO>(volumeTo, filerTo));
+            vols.add(volumeTo);
         }
-        MigrateVmToPoolCommand migrateVmToPoolCommand = new MigrateVmToPoolCommand(vm.getInstanceName(), vols, destination.getUuid(), true);
+        final Long destClusterId = poolClusterId;
+        final Long srcClusterId = getClusterId(vm.getId());
+        final boolean isInterClusterMigration = srcClusterId != null && destClusterId != null && ! srcClusterId.equals(destClusterId);
+        if (isInterClusterMigration) {
+            // Without host vMotion might fail between non-shared storages with error similar to,
+            // https://kb.vmware.com/s/article/1003795
+            // As this is offline migration VM won't be started on this host
+            List<HostVO> hosts = _hostDao.findHypervisorHostInCluster(destClusterId);
+            if (CollectionUtils.isNotEmpty(hosts)) {
+                hostInTargetCluster = hosts.get(0);
+            }
+            if (hostInTargetCluster == null) {
+                throw new CloudRuntimeException("Migration failed, unable to find suitable target host for VM placement while migrating between storage pools of different clusters without shared storages");
+            }
+        }
+        MigrateVmToPoolCommand migrateVmToPoolCommand = new MigrateVmToPoolCommand(vm.getInstanceName(),
+                volumeToFilerTo, hostInTargetCluster == null ? null : hostInTargetCluster.getGuid(), true);
         commands.add(migrateVmToPoolCommand);
 
         // OfflineVmwareMigration: cleanup if needed
-        final Long destClusterId = destination.getClusterId();
-        final Long srcClusterId = getClusterId(vm.getId());
-
-        if (srcClusterId != null && destClusterId != null && !srcClusterId.equals(destClusterId)) {
+        if (isInterClusterMigration) {
             final String srcDcName = _clusterDetailsDao.getVmwareDcName(srcClusterId);
             final String destDcName = _clusterDetailsDao.getVmwareDcName(destClusterId);
             if (srcDcName != null && destDcName != null && !srcDcName.equals(destDcName)) {
diff --git a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
index 56d08a4..44add8e 100644
--- a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
+++ b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
@@ -16,6 +16,9 @@
 // under the License.
 package com.cloud.hypervisor.vmware.resource;
 
+import static com.cloud.utils.HumanReadableJson.getHumanReadableBytesJson;
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -45,16 +48,18 @@ import java.util.stream.Collectors;
 import javax.naming.ConfigurationException;
 import javax.xml.datatype.XMLGregorianCalendar;
 
-import com.cloud.agent.api.to.DataTO;
-import com.cloud.agent.api.to.DeployAsIsInfoTO;
-import com.cloud.agent.api.ValidateVcenterDetailsCommand;
 import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.storage.command.CopyCommand;
+import org.apache.cloudstack.storage.command.StorageSubSystemCommand;
 import org.apache.cloudstack.storage.configdrive.ConfigDrive;
+import org.apache.cloudstack.storage.resource.NfsSecondaryStorageResource;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 import org.apache.cloudstack.storage.to.TemplateObjectTO;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
 import org.apache.cloudstack.utils.volume.VirtualMachineDiskInfo;
 import org.apache.cloudstack.vm.UnmanagedInstanceTO;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.math.NumberUtils;
@@ -162,6 +167,7 @@ import com.cloud.agent.api.UnregisterVMCommand;
 import com.cloud.agent.api.UpgradeSnapshotCommand;
 import com.cloud.agent.api.ValidateSnapshotAnswer;
 import com.cloud.agent.api.ValidateSnapshotCommand;
+import com.cloud.agent.api.ValidateVcenterDetailsCommand;
 import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.VolumeStatsEntry;
@@ -178,12 +184,13 @@ import com.cloud.agent.api.storage.CreatePrivateTemplateAnswer;
 import com.cloud.agent.api.storage.DestroyCommand;
 import com.cloud.agent.api.storage.MigrateVolumeAnswer;
 import com.cloud.agent.api.storage.MigrateVolumeCommand;
-import com.cloud.agent.api.to.deployasis.OVFPropertyTO;
 import com.cloud.agent.api.storage.PrimaryStorageDownloadAnswer;
 import com.cloud.agent.api.storage.PrimaryStorageDownloadCommand;
 import com.cloud.agent.api.storage.ResizeVolumeAnswer;
 import com.cloud.agent.api.storage.ResizeVolumeCommand;
 import com.cloud.agent.api.to.DataStoreTO;
+import com.cloud.agent.api.to.DataTO;
+import com.cloud.agent.api.to.DeployAsIsInfoTO;
 import com.cloud.agent.api.to.DiskTO;
 import com.cloud.agent.api.to.IpAddressTO;
 import com.cloud.agent.api.to.NfsTO;
@@ -191,6 +198,7 @@ import com.cloud.agent.api.to.NicTO;
 import com.cloud.agent.api.to.StorageFilerTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
 import com.cloud.agent.api.to.VolumeTO;
+import com.cloud.agent.api.to.deployasis.OVFPropertyTO;
 import com.cloud.agent.resource.virtualnetwork.VRScripts;
 import com.cloud.agent.resource.virtualnetwork.VirtualRouterDeployer;
 import com.cloud.agent.resource.virtualnetwork.VirtualRoutingResource;
@@ -219,8 +227,8 @@ import com.cloud.hypervisor.vmware.mo.HostStorageSystemMO;
 import com.cloud.hypervisor.vmware.mo.HypervisorHostHelper;
 import com.cloud.hypervisor.vmware.mo.NetworkDetails;
 import com.cloud.hypervisor.vmware.mo.PbmProfileManagerMO;
-import com.cloud.hypervisor.vmware.mo.TaskMO;
 import com.cloud.hypervisor.vmware.mo.StoragepodMO;
+import com.cloud.hypervisor.vmware.mo.TaskMO;
 import com.cloud.hypervisor.vmware.mo.VirtualEthernetCardType;
 import com.cloud.hypervisor.vmware.mo.VirtualMachineDiskInfoBuilder;
 import com.cloud.hypervisor.vmware.mo.VirtualMachineMO;
@@ -289,7 +297,6 @@ import com.vmware.vim25.HostInternetScsiHba;
 import com.vmware.vim25.HostPortGroupSpec;
 import com.vmware.vim25.ManagedObjectReference;
 import com.vmware.vim25.NasDatastoreInfo;
-import com.vmware.vim25.VirtualMachineDefinedProfileSpec;
 import com.vmware.vim25.ObjectContent;
 import com.vmware.vim25.OptionValue;
 import com.vmware.vim25.PerfCounterInfo;
@@ -324,6 +331,7 @@ import com.vmware.vim25.VirtualEthernetCardOpaqueNetworkBackingInfo;
 import com.vmware.vim25.VirtualIDEController;
 import com.vmware.vim25.VirtualMachineBootOptions;
 import com.vmware.vim25.VirtualMachineConfigSpec;
+import com.vmware.vim25.VirtualMachineDefinedProfileSpec;
 import com.vmware.vim25.VirtualMachineFileInfo;
 import com.vmware.vim25.VirtualMachineFileLayoutEx;
 import com.vmware.vim25.VirtualMachineFileLayoutExFileInfo;
@@ -343,13 +351,6 @@ import com.vmware.vim25.VmConfigInfo;
 import com.vmware.vim25.VmConfigSpec;
 import com.vmware.vim25.VmwareDistributedVirtualSwitchPvlanSpec;
 import com.vmware.vim25.VmwareDistributedVirtualSwitchVlanIdSpec;
-import org.apache.cloudstack.storage.command.CopyCommand;
-import org.apache.cloudstack.storage.command.StorageSubSystemCommand;
-import org.apache.cloudstack.storage.resource.NfsSecondaryStorageResource;
-import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
-
-import static com.cloud.utils.HumanReadableJson.getHumanReadableBytesJson;
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
 
 public class VmwareResource implements StoragePoolResource, ServerResource, VmwareHostService, VirtualRouterDeployer {
     private static final Logger s_logger = Logger.getLogger(VmwareResource.class);
@@ -4376,7 +4377,7 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
 
     protected Answer execute(MigrateVmToPoolCommand cmd) {
         if (s_logger.isInfoEnabled()) {
-            s_logger.info(String.format("excuting MigrateVmToPoolCommand %s -> %s", cmd.getVmName(), cmd.getDestinationPool()));
+            s_logger.info(String.format("Executing MigrateVmToPoolCommand %s", cmd.getVmName()));
             if (s_logger.isDebugEnabled()) {
                 s_logger.debug("MigrateVmToPoolCommand: " + _gson.toJson(cmd));
             }
@@ -4388,13 +4389,17 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
         try {
             VirtualMachineMO vmMo = getVirtualMachineMO(vmName, hyperHost);
             if (vmMo == null) {
-                String msg = "VM " + vmName + " does not exist in VMware datacenter";
-                s_logger.error(msg);
-                throw new CloudRuntimeException(msg);
+                s_logger.info("VM " + vmName + " was not found in the cluster of host " + hyperHost.getHyperHostName() + ". Looking for the VM in datacenter.");
+                ManagedObjectReference dcMor = hyperHost.getHyperHostDatacenter();
+                DatacenterMO dcMo = new DatacenterMO(hyperHost.getContext(), dcMor);
+                vmMo = dcMo.findVm(vmName);
+                if (vmMo == null) {
+                    String msg = "VM " + vmName + " does not exist in VMware datacenter";
+                    s_logger.error(msg);
+                    throw new CloudRuntimeException(msg);
+                }
             }
-
-            String poolUuid = cmd.getDestinationPool();
-            return migrateAndAnswer(vmMo, poolUuid, hyperHost, cmd);
+            return migrateAndAnswer(vmMo, null, hyperHost, cmd);
         } catch (Throwable e) { // hopefully only CloudRuntimeException :/
             if (e instanceof Exception) {
                 return new Answer(cmd, (Exception) e);
@@ -4408,37 +4413,40 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
     }
 
     private Answer migrateAndAnswer(VirtualMachineMO vmMo, String poolUuid, VmwareHypervisorHost hyperHost, Command cmd) throws Exception {
-        ManagedObjectReference morDs = getTargetDatastoreMOReference(poolUuid, hyperHost);
+        String hostNameInTargetCluster = null;
+        List<Pair<VolumeTO, StorageFilerTO>> volToFiler = new ArrayList<>();
+        if (cmd instanceof MigrateVmToPoolCommand) {
+            MigrateVmToPoolCommand mcmd = (MigrateVmToPoolCommand)cmd;
+            hostNameInTargetCluster = mcmd.getHostGuidInTargetCluster();
+            volToFiler = mcmd.getVolumeToFilerAsList();
+        } else if (cmd instanceof MigrateVolumeCommand) {
+            hostNameInTargetCluster = ((MigrateVolumeCommand)cmd).getHostGuidInTargetCluster();
+        }
+        VmwareHypervisorHost hostInTargetCluster = VmwareHelper.getHostMOFromHostName(getServiceContext(),
+                hostNameInTargetCluster);
 
         try {
             // OfflineVmwareMigration: getVolumesFromCommand(cmd);
-            Map<Integer, Long> volumeDeviceKey = getVolumesFromCommand(vmMo, cmd);
-            if (s_logger.isTraceEnabled()) {
-                for (Integer diskId : volumeDeviceKey.keySet()) {
-                    s_logger.trace(String.format("disk to migrate has disk id %d and volumeId %d", diskId, volumeDeviceKey.get(diskId)));
-                }
-            }
-            if (vmMo.changeDatastore(morDs)) {
-                // OfflineVmwareMigration: create target specification to include in answer
-                // Consolidate VM disks after successful VM migration
-                // In case of a linked clone VM, if VM's disks are not consolidated, further VM operations such as volume snapshot, VM snapshot etc. will result in DB inconsistencies.
-                if (!vmMo.consolidateVmDisks()) {
-                    s_logger.warn("VM disk consolidation failed after storage migration. Yet proceeding with VM migration.");
-                } else {
-                    s_logger.debug("Successfully consolidated disks of VM " + vmMo.getVmName() + ".");
+            Map<Integer, Long> volumeDeviceKey = new HashMap<>();
+            if (cmd instanceof MigrateVolumeCommand) { // Else device keys will be found in relocateVirtualMachine
+                MigrateVolumeCommand mcmd = (MigrateVolumeCommand) cmd;
+                addVolumeDiskmapping(vmMo, volumeDeviceKey, mcmd.getVolumePath(), mcmd.getVolumeId());
+                if (s_logger.isTraceEnabled()) {
+                    for (Integer diskId: volumeDeviceKey.keySet()) {
+                        s_logger.trace(String.format("Disk to migrate has disk id %d and volumeId %d", diskId, volumeDeviceKey.get(diskId)));
+                    }
                 }
-                return createAnswerForCmd(vmMo, poolUuid, cmd, volumeDeviceKey);
-            } else {
-                return new Answer(cmd, false, "failed to changes data store for VM" + vmMo.getVmName());
             }
+            List<VolumeObjectTO> volumeToList = relocateVirtualMachine(hyperHost, vmMo.getName(), null, null, hostInTargetCluster, poolUuid, volToFiler);
+            return createAnswerForCmd(vmMo, volumeToList, cmd, volumeDeviceKey);
         } catch (Exception e) {
-            String msg = "change data store for VM " + vmMo.getVmName() + " failed";
+            String msg = "Change data store for VM " + vmMo.getVmName() + " failed";
             s_logger.error(msg + ": " + e.getLocalizedMessage());
             throw new CloudRuntimeException(msg, e);
         }
     }
 
-    Answer createAnswerForCmd(VirtualMachineMO vmMo, String poolUuid, Command cmd, Map<Integer, Long> volumeDeviceKey) throws Exception {
+    Answer createAnswerForCmd(VirtualMachineMO vmMo, List<VolumeObjectTO> volumeObjectToList, Command cmd, Map<Integer, Long> volumeDeviceKey) throws Exception {
         List<VolumeObjectTO> volumeToList = new ArrayList<>();
         VirtualMachineDiskInfoBuilder diskInfoBuilder = vmMo.getDiskInfoBuilder();
         VirtualDisk[] disks = vmMo.getAllDiskDevice();
@@ -4453,34 +4461,12 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
             }
             throw new CloudRuntimeException("not expecting more then  one disk after migrate volume command");
         } else if (cmd instanceof MigrateVmToPoolCommand) {
-            for (VirtualDisk disk : disks) {
-                VolumeObjectTO newVol = new VolumeObjectTO();
-                String newPath = vmMo.getVmdkFileBaseName(disk);
-                VirtualMachineDiskInfo diskInfo = diskInfoBuilder.getDiskInfoByBackingFileBaseName(newPath, poolUuid);
-                newVol.setId(volumeDeviceKey.get(disk.getKey()));
-                newVol.setPath(newPath);
-                newVol.setChainInfo(_gson.toJson(diskInfo));
-                volumeToList.add(newVol);
-            }
-            return new MigrateVmToPoolAnswer((MigrateVmToPoolCommand) cmd, volumeToList);
+            volumeToList = volumeObjectToList;
+            return new MigrateVmToPoolAnswer((MigrateVmToPoolCommand)cmd, volumeToList);
         }
         return new Answer(cmd, false, null);
     }
 
-    private Map<Integer, Long> getVolumesFromCommand(VirtualMachineMO vmMo, Command cmd) throws Exception {
-        Map<Integer, Long> volumeDeviceKey = new HashMap<Integer, Long>();
-        if (cmd instanceof MigrateVmToPoolCommand) {
-            MigrateVmToPoolCommand mcmd = (MigrateVmToPoolCommand) cmd;
-            for (VolumeTO volume : mcmd.getVolumes()) {
-                addVolumeDiskmapping(vmMo, volumeDeviceKey, volume.getPath(), volume.getId());
-            }
-        } else if (cmd instanceof MigrateVolumeCommand) {
-            MigrateVolumeCommand mcmd = (MigrateVolumeCommand) cmd;
-            addVolumeDiskmapping(vmMo, volumeDeviceKey, mcmd.getVolumePath(), mcmd.getVolumeId());
-        }
-        return volumeDeviceKey;
-    }
-
     private void addVolumeDiskmapping(VirtualMachineMO vmMo, Map<Integer, Long> volumeDeviceKey, String volumePath, long volumeId) throws Exception {
         if (s_logger.isDebugEnabled()) {
             s_logger.debug(String.format("locating disk for volume (%d) using path %s", volumeId, volumePath));
@@ -4577,254 +4563,21 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
     }
 
     protected Answer execute(MigrateWithStorageCommand cmd) {
-
         if (s_logger.isInfoEnabled()) {
             s_logger.info("Executing resource MigrateWithStorageCommand: " + getHumanReadableBytesJson(_gson.toJson(cmd)));
         }
 
-        VirtualMachineTO vmTo = cmd.getVirtualMachine();
-        String vmName = vmTo.getName();
-
-        VmwareHypervisorHost srcHyperHost = null;
-        VmwareHypervisorHost tgtHyperHost = null;
-        VirtualMachineMO vmMo = null;
-
-        ManagedObjectReference morDsAtTarget = null;
-        ManagedObjectReference morDsAtSource = null;
-        ManagedObjectReference morDc = null;
-        ManagedObjectReference morDcOfTargetHost = null;
-        ManagedObjectReference morTgtHost = new ManagedObjectReference();
-        ManagedObjectReference morTgtDatastore = new ManagedObjectReference();
-        VirtualMachineRelocateSpec relocateSpec = new VirtualMachineRelocateSpec();
-        List<VirtualMachineRelocateSpecDiskLocator> diskLocators = new ArrayList<VirtualMachineRelocateSpecDiskLocator>();
-        VirtualMachineRelocateSpecDiskLocator diskLocator = null;
-
-        String tgtDsName = "";
-        String tgtDsHost;
-        String tgtDsPath;
-        int tgtDsPort;
-        VolumeTO volume;
-        StorageFilerTO filerTo;
-        Set<String> mountedDatastoresAtSource = new HashSet<String>();
-        List<VolumeObjectTO> volumeToList = new ArrayList<VolumeObjectTO>();
-        Map<Long, Integer> volumeDeviceKey = new HashMap<Long, Integer>();
-
-        List<Pair<VolumeTO, StorageFilerTO>> volToFiler = cmd.getVolumeToFilerAsList();
-        String tgtHost = cmd.getTargetHost();
-        String tgtHostMorInfo = tgtHost.split("@")[0];
-        morTgtHost.setType(tgtHostMorInfo.split(":")[0]);
-        morTgtHost.setValue(tgtHostMorInfo.split(":")[1]);
+        final VirtualMachineTO vmTo = cmd.getVirtualMachine();
+        final List<Pair<VolumeTO, StorageFilerTO>> volToFiler = cmd.getVolumeToFilerAsList();
+        final String targetHost = cmd.getTargetHost();
 
         try {
-            srcHyperHost = getHyperHost(getServiceContext());
-            tgtHyperHost = new HostMO(getServiceContext(), morTgtHost);
-            morDc = srcHyperHost.getHyperHostDatacenter();
-            morDcOfTargetHost = tgtHyperHost.getHyperHostDatacenter();
-            if (!morDc.getValue().equalsIgnoreCase(morDcOfTargetHost.getValue())) {
-                String msg = "Source host & target host are in different datacentesr";
-                throw new CloudRuntimeException(msg);
-            }
-            VmwareManager mgr = tgtHyperHost.getContext().getStockObject(VmwareManager.CONTEXT_STOCK_NAME);
-            String srcHostApiVersion = ((HostMO) srcHyperHost).getHostAboutInfo().getApiVersion();
-
-            // find VM through datacenter (VM is not at the target host yet)
-            vmMo = srcHyperHost.findVmOnPeerHyperHost(vmName);
-            if (vmMo == null) {
-                String msg = "VM " + vmName + " does not exist in VMware datacenter " + morDc.getValue();
-                s_logger.error(msg);
-                throw new Exception(msg);
-            }
-            vmName = vmMo.getName();
-
-            // Specify destination datastore location for each volume
-            for (Pair<VolumeTO, StorageFilerTO> entry : volToFiler) {
-                volume = entry.first();
-                filerTo = entry.second();
-
-                s_logger.debug("Preparing spec for volume : " + volume.getName());
-                morDsAtTarget = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(tgtHyperHost, filerTo.getUuid());
-                morDsAtSource = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(srcHyperHost, volume.getPoolUuid());
-
-                if (morDsAtTarget == null) {
-                    String msg = "Unable to find the target datastore: " + filerTo.getUuid() + " on target host: " + tgtHyperHost.getHyperHostName()
-                            + " to execute MigrateWithStorageCommand";
-                    s_logger.error(msg);
-                    throw new Exception(msg);
-                }
-                morTgtDatastore = morDsAtTarget;
-
-                // If host version is below 5.1 then simultaneous change of VM's datastore and host is not supported.
-                // So since only the datastore will be changed first, ensure the target datastore is mounted on source host.
-                if (srcHostApiVersion.compareTo("5.1") < 0) {
-                    tgtDsName = filerTo.getUuid().replace("-", "");
-                    tgtDsHost = filerTo.getHost();
-                    tgtDsPath = filerTo.getPath();
-                    tgtDsPort = filerTo.getPort();
-
-                    // If datastore is NFS and target datastore is not already mounted on source host then mount the datastore.
-                    if (filerTo.getType().equals(StoragePoolType.NetworkFilesystem)) {
-                        if (morDsAtSource == null) {
-                            morDsAtSource = srcHyperHost.mountDatastore(false, tgtDsHost, tgtDsPort, tgtDsPath, tgtDsName, true);
-                            if (morDsAtSource == null) {
-                                throw new Exception("Unable to mount NFS datastore " + tgtDsHost + ":/" + tgtDsPath + " on " + _hostName);
-                            }
-                            mountedDatastoresAtSource.add(tgtDsName);
-                            s_logger.debug("Mounted datastore " + tgtDsHost + ":/" + tgtDsPath + " on " + _hostName);
-                        }
-                    }
-                    // If datastore is VMFS and target datastore is not mounted or accessible to source host then fail migration.
-                    if (filerTo.getType().equals(StoragePoolType.VMFS) || filerTo.getType().equals(StoragePoolType.PreSetup)) {
-                        if (morDsAtSource == null) {
-                            s_logger.warn(
-                                    "If host version is below 5.1, then target VMFS datastore(s) need to manually mounted on source host for a successful live storage migration.");
-                            throw new Exception("Target VMFS datastore: " + tgtDsPath + " is not mounted on source host: " + _hostName);
-                        }
-                        DatastoreMO dsAtSourceMo = new DatastoreMO(getServiceContext(), morDsAtSource);
-                        String srcHostValue = srcHyperHost.getMor().getValue();
-                        if (!dsAtSourceMo.isAccessibleToHost(srcHostValue)) {
-                            s_logger.warn("If host version is below 5.1, then target VMFS datastore(s) need to accessible to source host for a successful live storage migration.");
-                            throw new Exception("Target VMFS datastore: " + tgtDsPath + " is not accessible on source host: " + _hostName);
-                        }
-                    }
-                    morTgtDatastore = morDsAtSource;
-                }
-
-                if (volume.getType() == Volume.Type.ROOT) {
-                    relocateSpec.setDatastore(morTgtDatastore);
-                }
-
-                diskLocator = new VirtualMachineRelocateSpecDiskLocator();
-                diskLocator.setDatastore(morTgtDatastore);
-                Pair<VirtualDisk, String> diskInfo = getVirtualDiskInfo(vmMo, appendFileType(volume.getPath(), VMDK_EXTENSION));
-                String vmdkAbsFile = getAbsoluteVmdkFile(diskInfo.first());
-                if (vmdkAbsFile != null && !vmdkAbsFile.isEmpty()) {
-                    vmMo.updateAdapterTypeIfRequired(vmdkAbsFile);
-                }
-                int diskId = diskInfo.first().getKey();
-                diskLocator.setDiskId(diskId);
-
-                diskLocators.add(diskLocator);
-                volumeDeviceKey.put(volume.getId(), diskId);
-            }
-            // If a target datastore is provided for the VM, then by default all volumes associated with the VM will be migrated to that target datastore.
-            // Hence set the existing datastore as target datastore for volumes that are not to be migrated.
-            List<Pair<Integer, ManagedObjectReference>> diskDatastores = vmMo.getAllDiskDatastores();
-            for (Pair<Integer, ManagedObjectReference> diskDatastore : diskDatastores) {
-                if (!volumeDeviceKey.containsValue(diskDatastore.first().intValue())) {
-                    diskLocator = new VirtualMachineRelocateSpecDiskLocator();
-                    diskLocator.setDiskId(diskDatastore.first().intValue());
-                    diskLocator.setDatastore(diskDatastore.second());
-                    diskLocators.add(diskLocator);
-                }
-            }
-            relocateSpec.getDisk().addAll(diskLocators);
-
-            // Prepare network at target before migration
-            NicTO[] nics = vmTo.getNics();
-            for (NicTO nic : nics) {
-                // prepare network on the host
-                prepareNetworkFromNicInfo(new HostMO(getServiceContext(), morTgtHost), nic, false, vmTo.getType());
-            }
-
-            // Ensure all secondary storage mounted on target host
-            List<Pair<String, Long>> secStoreUrlAndIdList = mgr.getSecondaryStorageStoresUrlAndIdList(Long.parseLong(_dcId));
-            for (Pair<String, Long> secStoreUrlAndId : secStoreUrlAndIdList) {
-                String secStoreUrl = secStoreUrlAndId.first();
-                Long secStoreId = secStoreUrlAndId.second();
-                if (secStoreUrl == null) {
-                    String msg = String.format("Secondary storage for dc %s is not ready yet?", _dcId);
-                    throw new Exception(msg);
-                }
-
-                if (vmTo.getType() != VirtualMachine.Type.User) {
-                    mgr.prepareSecondaryStorageStore(secStoreUrl, secStoreId);
-                }
-
-                ManagedObjectReference morSecDs = prepareSecondaryDatastoreOnSpecificHost(secStoreUrl, tgtHyperHost);
-                if (morSecDs == null) {
-                    String msg = "Failed to prepare secondary storage on host, secondary store url: " + secStoreUrl;
-                    throw new Exception(msg);
-                }
-            }
-
-            if (srcHostApiVersion.compareTo("5.1") < 0) {
-                // Migrate VM's volumes to target datastore(s).
-                if (!vmMo.changeDatastore(relocateSpec)) {
-                    throw new Exception("Change datastore operation failed during storage migration");
-                } else {
-                    s_logger.debug("Successfully migrated storage of VM " + vmName + " to target datastore(s)");
-                }
-
-                // Migrate VM to target host.
-                ManagedObjectReference morPool = tgtHyperHost.getHyperHostOwnerResourcePool();
-                if (!vmMo.migrate(morPool, tgtHyperHost.getMor())) {
-                    throw new Exception("VM migration to target host failed during storage migration");
-                } else {
-                    s_logger.debug("Successfully migrated VM " + vmName + " from " + _hostName + " to " + tgtHyperHost.getHyperHostName());
-                }
-            } else {
-                // Simultaneously migrate VM's volumes to target datastore and VM to target host.
-                relocateSpec.setHost(tgtHyperHost.getMor());
-                relocateSpec.setPool(tgtHyperHost.getHyperHostOwnerResourcePool());
-                if (!vmMo.changeDatastore(relocateSpec)) {
-                    throw new Exception("Change datastore operation failed during storage migration");
-                } else {
-                    s_logger.debug(
-                            "Successfully migrated VM " + vmName + " from " + _hostName + " to " + tgtHyperHost.getHyperHostName() + " and its storage to target datastore(s)");
-                }
-            }
-
-            // Consolidate VM disks.
-            // In case of a linked clone VM, if VM's disks are not consolidated, further VM operations such as volume snapshot, VM snapshot etc. will result in DB inconsistencies.
-            if (!vmMo.consolidateVmDisks()) {
-                s_logger.warn("VM disk consolidation failed after storage migration. Yet proceeding with VM migration.");
-            } else {
-                s_logger.debug("Successfully consolidated disks of VM " + vmName + ".");
-            }
-
-            // Update and return volume path and chain info for every disk because that could have changed after migration
-            VirtualMachineDiskInfoBuilder diskInfoBuilder = vmMo.getDiskInfoBuilder();
-            for (Pair<VolumeTO, StorageFilerTO> entry : volToFiler) {
-                volume = entry.first();
-                long volumeId = volume.getId();
-                VirtualDisk[] disks = vmMo.getAllDiskDevice();
-                for (VirtualDisk disk : disks) {
-                    if (volumeDeviceKey.get(volumeId) == disk.getKey()) {
-                        VolumeObjectTO newVol = new VolumeObjectTO();
-                        String newPath = vmMo.getVmdkFileBaseName(disk);
-                        String poolName = entry.second().getUuid().replace("-", "");
-                        VirtualMachineDiskInfo diskInfo = diskInfoBuilder.getDiskInfoByBackingFileBaseName(newPath, poolName);
-                        newVol.setId(volumeId);
-                        newVol.setPath(newPath);
-                        newVol.setChainInfo(_gson.toJson(diskInfo));
-                        volumeToList.add(newVol);
-                        break;
-                    }
-                }
-            }
-
+            List<VolumeObjectTO> volumeToList =  relocateVirtualMachine(null, null, vmTo, targetHost, null, null, volToFiler);
             return new MigrateWithStorageAnswer(cmd, volumeToList);
         } catch (Throwable e) {
-            if (e instanceof RemoteException) {
-                s_logger.warn("Encountered remote exception at vCenter, invalidating VMware session context");
-                invalidateServiceContext();
-            }
-
-            String msg = "MigrationCommand failed due to " + VmwareHelper.getExceptionMessage(e);
+            String msg = "MigrateWithStorageCommand failed due to " + VmwareHelper.getExceptionMessage(e);
             s_logger.warn(msg, e);
-            return new MigrateWithStorageAnswer(cmd, (Exception) e);
-        } finally {
-            // Cleanup datastores mounted on source host
-            for (String mountedDatastore : mountedDatastoresAtSource) {
-                s_logger.debug("Attempting to unmount datastore " + mountedDatastore + " at " + _hostName);
-                try {
-                    srcHyperHost.unmountDatastore(mountedDatastore);
-                } catch (Exception unmountEx) {
-                    s_logger.debug("Failed to unmount datastore " + mountedDatastore + " at " + _hostName + ". Seems the datastore is still being used by " + _hostName
-                            + ". Please unmount manually to cleanup.");
-                }
-                s_logger.debug("Successfully unmounted datastore " + mountedDatastore + " at " + _hostName);
-            }
+            return new MigrateWithStorageAnswer(cmd, (Exception)e);
         }
     }
 
@@ -4834,10 +4587,10 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
 
         VmwareHypervisorHost hyperHost = getHyperHost(getServiceContext());
         VirtualMachineMO vmMo = null;
-        DatastoreMO dsMo = null;
+        DatastoreMO sourceDsMo = null;
         DatastoreMO destinationDsMo = null;
         ManagedObjectReference morSourceDS = null;
-        ManagedObjectReference morDestintionDS = null;
+        ManagedObjectReference morDestinationDS = null;
         String vmdkDataStorePath = null;
         boolean isvVolsInvolved = false;
 
@@ -4847,24 +4600,41 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
             // OfflineVmwareMigration: this method is 100 lines and needs refactorring anyway
             // we need to spawn a worker VM to attach the volume to and move it
             morSourceDS = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(hyperHost, cmd.getSourcePool().getUuid());
-            dsMo = new DatastoreMO(hyperHost.getContext(), morSourceDS);
-            morDestintionDS = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(hyperHost, cmd.getTargetPool().getUuid());
-            destinationDsMo = new DatastoreMO(hyperHost.getContext(), morDestintionDS);
+            sourceDsMo = new DatastoreMO(hyperHost.getContext(), morSourceDS);
+            VmwareHypervisorHost hostInTargetCluster = VmwareHelper.getHostMOFromHostName(getServiceContext(),
+                    cmd.getHostGuidInTargetCluster());
+            VmwareHypervisorHost dsHost = hostInTargetCluster == null ? hyperHost : hostInTargetCluster;
+            String targetDsName = cmd.getTargetPool().getUuid();
+            morDestinationDS = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(dsHost, targetDsName);
+            if(morDestinationDS == null) {
+                String msg = "Unable to find the target datastore: " + targetDsName + " on host: " + dsHost.getHyperHostName();
+                s_logger.error(msg);
+                throw new CloudRuntimeException(msg);
+            }
+            destinationDsMo = new DatastoreMO(hyperHost.getContext(), morDestinationDS);
 
-            vmName = getWorkerName(getServiceContext(), cmd, 0, dsMo);
+            vmName = getWorkerName(getServiceContext(), cmd, 0, sourceDsMo);
             if (destinationDsMo.getDatastoreType().equalsIgnoreCase("VVOL")) {
                 isvVolsInvolved = true;
                 vmName = getWorkerName(getServiceContext(), cmd, 0, destinationDsMo);
             }
+            String hardwareVersion = null;
+            if (hostInTargetCluster != null) {
+                Integer sourceHardwareVersion = HypervisorHostHelper.getHostHardwareVersion(hyperHost);
+                Integer destinationHardwareVersion = HypervisorHostHelper.getHostHardwareVersion(dsHost);
+                if (sourceHardwareVersion != null && destinationHardwareVersion != null && !sourceHardwareVersion.equals(destinationHardwareVersion)) {
+                    hardwareVersion = String.valueOf(Math.min(sourceHardwareVersion, destinationHardwareVersion));
+                }
+            }
 
             // OfflineVmwareMigration: refactor for re-use
             // OfflineVmwareMigration: 1. find data(store)
             // OfflineVmwareMigration: more robust would be to find the store given the volume as it might have been moved out of band or due to error
-// example:            DatastoreMO existingVmDsMo = new DatastoreMO(dcMo.getContext(), dcMo.findDatastore(fileInDatastore.getDatastoreName()));
+            // example: DatastoreMO existingVmDsMo = new DatastoreMO(dcMo.getContext(), dcMo.findDatastore(fileInDatastore.getDatastoreName()));
 
             s_logger.info("Create worker VM " + vmName);
             // OfflineVmwareMigration: 2. create the worker with access to the data(store)
-            vmMo = HypervisorHostHelper.createWorkerVM(hyperHost, dsMo, vmName, null);
+            vmMo = HypervisorHostHelper.createWorkerVM(hyperHost, sourceDsMo, vmName, hardwareVersion);
             if (vmMo == null) {
                 // OfflineVmwareMigration: don't throw a general Exception but think of a specific one
                 throw new CloudRuntimeException("Unable to create a worker VM for volume operation");
@@ -4873,21 +4643,21 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
             synchronized (this) {
                 // OfflineVmwareMigration: 3. attach the disk to the worker
                 String vmdkFileName = path + VMDK_EXTENSION;
-                vmdkDataStorePath = VmwareStorageLayoutHelper.getLegacyDatastorePathFromVmdkFileName(dsMo, vmdkFileName);
-                if (!dsMo.fileExists(vmdkDataStorePath)) {
+                vmdkDataStorePath = VmwareStorageLayoutHelper.getLegacyDatastorePathFromVmdkFileName(sourceDsMo, vmdkFileName);
+                if (!sourceDsMo.fileExists(vmdkDataStorePath)) {
                     if (s_logger.isDebugEnabled()) {
                         s_logger.debug(String.format("path not found (%s), trying under '%s'", vmdkFileName, path));
                     }
-                    vmdkDataStorePath = VmwareStorageLayoutHelper.getVmwareDatastorePathFromVmdkFileName(dsMo, path, vmdkFileName);
+                    vmdkDataStorePath = VmwareStorageLayoutHelper.getVmwareDatastorePathFromVmdkFileName(sourceDsMo, path, vmdkFileName);
                 }
-                if (!dsMo.folderExists(String.format("[%s]", dsMo.getName()), path) || !dsMo.fileExists(vmdkDataStorePath)) {
+                if (!sourceDsMo.folderExists(String.format("[%s]", sourceDsMo.getName()), path) || !sourceDsMo.fileExists(vmdkDataStorePath)) {
                     if (s_logger.isDebugEnabled()) {
                         s_logger.debug(String.format("path not found (%s), trying under '%s'", vmdkFileName, vmName));
                     }
-                    vmdkDataStorePath = VmwareStorageLayoutHelper.getVmwareDatastorePathFromVmdkFileName(dsMo, vmName, vmdkFileName);
+                    vmdkDataStorePath = VmwareStorageLayoutHelper.getVmwareDatastorePathFromVmdkFileName(sourceDsMo, vmName, vmdkFileName);
                 }
-                if (!dsMo.folderExists(String.format("[%s]", dsMo.getName()), vmName) || !dsMo.fileExists(vmdkDataStorePath)) {
-                    vmdkDataStorePath = dsMo.searchFileInSubFolders(vmdkFileName, true, null);
+                if (!sourceDsMo.folderExists(String.format("[%s]", sourceDsMo.getName()), vmName) || !sourceDsMo.fileExists(vmdkDataStorePath)) {
+                    vmdkDataStorePath = sourceDsMo.searchFileInSubFolders(vmdkFileName, true, null);
                 }
 
                 if (s_logger.isDebugEnabled()) {
@@ -4937,8 +4707,6 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
             try {
                 // OfflineVmwareMigration: worker *may* have been renamed
                 vmName = vmMo.getVmName();
-                morSourceDS = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(hyperHost, cmd.getTargetPool().getUuid());
-                dsMo = new DatastoreMO(hyperHost.getContext(), morSourceDS);
                 s_logger.info("Dettaching disks before destroying worker VM '" + vmName + "' after volume migration");
                 VirtualDisk[] disks = vmMo.getAllDiskDevice();
                 String format = "disk %d was migrated to %s";
@@ -4946,7 +4714,7 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
                     if (s_logger.isTraceEnabled()) {
                         s_logger.trace(String.format(format, disk.getKey(), vmMo.getVmdkFileBaseName(disk)));
                     }
-                    vmdkDataStorePath = VmwareStorageLayoutHelper.getLegacyDatastorePathFromVmdkFileName(dsMo, vmMo.getVmdkFileBaseName(disk) + VMDK_EXTENSION);
+                    vmdkDataStorePath = VmwareStorageLayoutHelper.getLegacyDatastorePathFromVmdkFileName(destinationDsMo, vmMo.getVmdkFileBaseName(disk) + VMDK_EXTENSION);
                     vmMo.detachDisk(vmdkDataStorePath, false);
                 }
                 s_logger.info("Destroy worker VM '" + vmName + "' after volume migration");
@@ -4960,10 +4728,10 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
                 String newPath = ((MigrateVolumeAnswer) answer).getVolumePath();
                 String vmdkFileName = newPath + VMDK_EXTENSION;
                 try {
-                    VmwareStorageLayoutHelper.syncVolumeToRootFolder(dsMo.getOwnerDatacenter().first(), dsMo, newPath, vmName);
-                    vmdkDataStorePath = VmwareStorageLayoutHelper.getLegacyDatastorePathFromVmdkFileName(dsMo, vmdkFileName);
+                    VmwareStorageLayoutHelper.syncVolumeToRootFolder(destinationDsMo.getOwnerDatacenter().first(), destinationDsMo, newPath, vmName);
+                    vmdkDataStorePath = VmwareStorageLayoutHelper.getLegacyDatastorePathFromVmdkFileName(destinationDsMo, vmdkFileName);
 
-                    if (!dsMo.fileExists(vmdkDataStorePath)) {
+                    if (!destinationDsMo.fileExists(vmdkDataStorePath)) {
                         String msg = String.format("Migration of volume '%s' failed; file (%s) not found as path '%s'", cmd.getVolumePath(), vmdkFileName, vmdkDataStorePath);
                         s_logger.error(msg);
                         answer = new Answer(cmd, false, msg);
@@ -7058,10 +6826,10 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
 
     @Override
     @DB
-    public String getWorkerName(VmwareContext context, Command cmd, int workerSequence, DatastoreMO dsMo) throws Exception {
+    public String getWorkerName(VmwareContext context, Command cmd, int workerSequence, DatastoreMO sourceDsMo) throws Exception {
         VmwareManager mgr = context.getStockObject(VmwareManager.CONTEXT_STOCK_NAME);
         String vmName = mgr.composeWorkerName();
-        if (dsMo!= null && dsMo.getDatastoreType().equalsIgnoreCase("VVOL")) {
+        if (sourceDsMo!= null && sourceDsMo.getDatastoreType().equalsIgnoreCase("VVOL")) {
             vmName = CustomFieldConstants.CLOUD_UUID + "-" + vmName;
         }
 
@@ -7515,6 +7283,281 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
         return new PrepareUnmanageVMInstanceAnswer(cmd, true, "OK");
     }
 
+    /*
+     * Method to relocate a virtual machine. This migrates VM and its volumes to given host, datastores.
+     * It is used for MigrateVolumeCommand (detached volume case), MigrateVmToPoolCommand and MigrateVmWithStorageCommand.
+     */
+
+    private List<VolumeObjectTO> relocateVirtualMachine(final VmwareHypervisorHost hypervisorHost,
+                                                        final String name, final VirtualMachineTO vmTo,
+                                                        final String targetHost, final VmwareHypervisorHost hostInTargetCluster,
+                                                        final String poolUuid, final List<Pair<VolumeTO, StorageFilerTO>> volToFiler) throws Exception {
+        String vmName = name;
+        if (vmName == null && vmTo != null) {
+            vmName = vmTo.getName();
+        }
+        VmwareHypervisorHost sourceHyperHost = hypervisorHost;
+        VmwareHypervisorHost targetHyperHost = hostInTargetCluster;
+        VirtualMachineMO vmMo = null;
+        ManagedObjectReference morSourceHostDc = null;
+        VirtualMachineRelocateSpec relocateSpec = new VirtualMachineRelocateSpec();
+        List<VirtualMachineRelocateSpecDiskLocator> diskLocators = new ArrayList<VirtualMachineRelocateSpecDiskLocator>();
+        Set<String> mountedDatastoresAtSource = new HashSet<String>();
+        List<VolumeObjectTO> volumeToList =  new ArrayList<>();
+        Map<Long, Integer> volumeDeviceKey = new HashMap<Long, Integer>();
+
+        try {
+            if (sourceHyperHost == null) {
+                sourceHyperHost = getHyperHost(getServiceContext());
+            }
+            if (targetHyperHost == null && StringUtils.isNotBlank(targetHost)) {
+                targetHyperHost = VmwareHelper.getHostMOFromHostName(getServiceContext(), targetHost);
+            }
+            morSourceHostDc = sourceHyperHost.getHyperHostDatacenter();
+            DatacenterMO dcMo = new DatacenterMO(sourceHyperHost.getContext(), morSourceHostDc);
+            if (targetHyperHost != null) {
+                ManagedObjectReference morTargetHostDc = targetHyperHost.getHyperHostDatacenter();
+                if (!morSourceHostDc.getValue().equalsIgnoreCase(morTargetHostDc.getValue())) {
+                    String msg = "VM " + vmName + " cannot be migrated between different datacenter";
+                    throw new CloudRuntimeException(msg);
+                }
+            }
+
+            // find VM through source host (VM is not at the target host yet)
+            vmMo = sourceHyperHost.findVmOnHyperHost(vmName);
+            if (vmMo == null) {
+                String msg = "VM " + vmName + " does not exist on host: " + sourceHyperHost.getHyperHostName();
+                s_logger.warn(msg);
+                // find VM through source host (VM is not at the target host yet)
+                vmMo = dcMo.findVm(vmName);
+                if (vmMo == null) {
+                    msg = "VM " + vmName + " does not exist on datacenter: " + dcMo.getName();
+                    s_logger.error(msg);
+                    throw new Exception(msg);
+                }
+                // VM host has changed
+                sourceHyperHost = vmMo.getRunningHost();
+            }
+
+            vmName = vmMo.getName();
+            String srcHostApiVersion = ((HostMO)sourceHyperHost).getHostAboutInfo().getApiVersion();
+
+            if (StringUtils.isNotBlank(poolUuid)) {
+                VmwareHypervisorHost dsHost = targetHyperHost == null ? sourceHyperHost : targetHyperHost;
+                ManagedObjectReference morDatastore = null;
+                String msg;
+                morDatastore = getTargetDatastoreMOReference(poolUuid, dsHost);
+                if (morDatastore == null) {
+                    msg = "Unable to find the target datastore: " + poolUuid + " on host: " + dsHost.getHyperHostName() +
+                            " to execute migration";
+                    s_logger.error(msg);
+                    throw new CloudRuntimeException(msg);
+                }
+                relocateSpec.setDatastore(morDatastore);
+            } else if (CollectionUtils.isNotEmpty(volToFiler)) {
+                // Specify destination datastore location for each volume
+                VmwareHypervisorHost dsHost = targetHyperHost == null ? sourceHyperHost : targetHyperHost;
+                for (Pair<VolumeTO, StorageFilerTO> entry : volToFiler) {
+                    VolumeTO volume = entry.first();
+                    StorageFilerTO filerTo = entry.second();
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug(String.format("Preparing spec for volume: %s to migrate it to datastore: %s", volume.getName(), filerTo.getUuid()));
+                    }
+                    ManagedObjectReference morVolumeDatastore = getTargetDatastoreMOReference(filerTo.getUuid(), dsHost);
+                    if (morVolumeDatastore == null) {
+                        String msg = "Unable to find the target datastore: " + filerTo.getUuid() + " in datacenter: " + dcMo.getName() + " to execute migration";
+                        s_logger.error(msg);
+                        throw new CloudRuntimeException(msg);
+                    }
+
+                    String mountedDs = getMountedDatastoreName(sourceHyperHost, srcHostApiVersion, filerTo);
+                    if (mountedDs != null) {
+                        mountedDatastoresAtSource.add(mountedDs);
+                    }
+
+                    if (volume.getType() == Volume.Type.ROOT) {
+                        relocateSpec.setDatastore(morVolumeDatastore);
+                    }
+                    VirtualMachineRelocateSpecDiskLocator diskLocator = new VirtualMachineRelocateSpecDiskLocator();
+                    diskLocator.setDatastore(morVolumeDatastore);
+                    Pair<VirtualDisk, String> diskInfo = getVirtualDiskInfo(vmMo, volume.getPath() + VMDK_EXTENSION);
+                    String vmdkAbsFile = getAbsoluteVmdkFile(diskInfo.first());
+                    if (vmdkAbsFile != null && !vmdkAbsFile.isEmpty()) {
+                        vmMo.updateAdapterTypeIfRequired(vmdkAbsFile);
+                    }
+                    int diskId = diskInfo.first().getKey();
+                    diskLocator.setDiskId(diskId);
+
+                    diskLocators.add(diskLocator);
+                    volumeDeviceKey.put(volume.getId(), diskId);
+                }
+                // If a target datastore is provided for the VM, then by default all volumes associated with the VM will be migrated to that target datastore.
+                // Hence set the existing datastore as target datastore for volumes that are not to be migrated.
+                List<Pair<Integer, ManagedObjectReference>> diskDatastores = vmMo.getAllDiskDatastores();
+                for (Pair<Integer, ManagedObjectReference> diskDatastore : diskDatastores) {
+                    if (!volumeDeviceKey.containsValue(diskDatastore.first().intValue())) {
+                        VirtualMachineRelocateSpecDiskLocator diskLocator = new VirtualMachineRelocateSpecDiskLocator();
+                        diskLocator.setDiskId(diskDatastore.first().intValue());
+                        diskLocator.setDatastore(diskDatastore.second());
+                        diskLocators.add(diskLocator);
+                    }
+                }
+
+                relocateSpec.getDisk().addAll(diskLocators);
+            }
+
+            // Specific section for MigrateVmWithStorageCommand
+            if (vmTo != null) {
+                // Prepare network at target before migration
+                NicTO[] nics = vmTo.getNics();
+                for (NicTO nic : nics) {
+                    // prepare network on the host
+                    prepareNetworkFromNicInfo((HostMO)targetHyperHost, nic, false, vmTo.getType());
+                }
+                // Ensure secondary storage mounted on target host
+                VmwareManager mgr = targetHyperHost.getContext().getStockObject(VmwareManager.CONTEXT_STOCK_NAME);
+                Pair<String, Long> secStoreUrlAndId = mgr.getSecondaryStorageStoreUrlAndId(Long.parseLong(_dcId));
+                String secStoreUrl = secStoreUrlAndId.first();
+                Long secStoreId = secStoreUrlAndId.second();
+                if (secStoreUrl == null) {
+                    String msg = "secondary storage for dc " + _dcId + " is not ready yet?";
+                    throw new Exception(msg);
+                }
+                mgr.prepareSecondaryStorageStore(secStoreUrl, secStoreId);
+                ManagedObjectReference morSecDs = prepareSecondaryDatastoreOnSpecificHost(secStoreUrl, targetHyperHost);
+                if (morSecDs == null) {
+                    String msg = "Failed to prepare secondary storage on host, secondary store url: " + secStoreUrl;
+                    throw new Exception(msg);
+                }
+            }
+
+            if (srcHostApiVersion.compareTo("5.1") < 0) {
+                // Migrate VM's volumes to target datastore(s).
+                if (!vmMo.changeDatastore(relocateSpec)) {
+                    throw new Exception("Change datastore operation failed during storage migration");
+                } else {
+                    s_logger.debug("Successfully migrated storage of VM " + vmName + " to target datastore(s)");
+                }
+                // Migrate VM to target host.
+                if (targetHyperHost != null) {
+                    ManagedObjectReference morPool = targetHyperHost.getHyperHostOwnerResourcePool();
+                    if (!vmMo.migrate(morPool, targetHyperHost.getMor())) {
+                        throw new Exception("VM migration to target host failed during storage migration");
+                    } else {
+                        s_logger.debug("Successfully migrated VM " + vmName + " from " + sourceHyperHost.getHyperHostName() + " to " + targetHyperHost.getHyperHostName());
+                    }
+                }
+            } else {
+                // Add target host to relocate spec
+                if (targetHyperHost != null) {
+                    relocateSpec.setHost(targetHyperHost.getMor());
+                    relocateSpec.setPool(targetHyperHost.getHyperHostOwnerResourcePool());
+                }
+                if (!vmMo.changeDatastore(relocateSpec)) {
+                    throw new Exception("Change datastore operation failed during storage migration");
+                } else {
+                    s_logger.debug("Successfully migrated VM " + vmName +
+                            (hostInTargetCluster != null ? " from " + sourceHyperHost.getHyperHostName() + " to " + targetHyperHost.getHyperHostName() + " and " : " with ") +
+                            "its storage to target datastore(s)");
+                }
+            }
+
+            // Consolidate VM disks.
+            // In case of a linked clone VM, if VM's disks are not consolidated, further VM operations such as volume snapshot, VM snapshot etc. will result in DB inconsistencies.
+            if (!vmMo.consolidateVmDisks()) {
+                s_logger.warn("VM disk consolidation failed after storage migration. Yet proceeding with VM migration.");
+            } else {
+                s_logger.debug("Successfully consolidated disks of VM " + vmName + ".");
+            }
+
+            if (MapUtils.isNotEmpty(volumeDeviceKey)) {
+                // Update and return volume path and chain info for every disk because that could have changed after migration
+                VirtualMachineDiskInfoBuilder diskInfoBuilder = vmMo.getDiskInfoBuilder();
+                for (Pair<VolumeTO, StorageFilerTO> entry : volToFiler) {
+                    final VolumeTO volume = entry.first();
+                    final long volumeId = volume.getId();
+                    VirtualDisk[] disks = vmMo.getAllDiskDevice();
+                    for (VirtualDisk disk : disks) {
+                        if (volumeDeviceKey.get(volumeId) == disk.getKey()) {
+                            VolumeObjectTO newVol = new VolumeObjectTO();
+                            newVol.setDataStoreUuid(entry.second().getUuid());
+                            String newPath = vmMo.getVmdkFileBaseName(disk);
+                            String poolName = entry.second().getUuid().replace("-", "");
+                            VirtualMachineDiskInfo diskInfo = diskInfoBuilder.getDiskInfoByBackingFileBaseName(newPath, poolName);
+                            newVol.setId(volumeId);
+                            newVol.setPath(newPath);
+                            newVol.setChainInfo(_gson.toJson(diskInfo));
+                            volumeToList.add(newVol);
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (Throwable e) {
+            if (e instanceof RemoteException) {
+                s_logger.warn("Encountered remote exception at vCenter, invalidating VMware session context");
+                invalidateServiceContext();
+            }
+            throw e;
+        } finally {
+            // Cleanup datastores mounted on source host
+            for (String mountedDatastore : mountedDatastoresAtSource) {
+                s_logger.debug("Attempting to unmount datastore " + mountedDatastore + " at " + sourceHyperHost.getHyperHostName());
+                try {
+                    sourceHyperHost.unmountDatastore(mountedDatastore);
+                } catch (Exception unmountEx) {
+                    s_logger.warn("Failed to unmount datastore " + mountedDatastore + " at " + sourceHyperHost.getHyperHostName() + ". Seems the datastore is still being used by " + sourceHyperHost.getHyperHostName() +
+                            ". Please unmount manually to cleanup.");
+                }
+                s_logger.debug("Successfully unmounted datastore " + mountedDatastore + " at " + sourceHyperHost.getHyperHostName());
+            }
+        }
+
+        // Only when volToFiler is not empty a filled list of VolumeObjectTO is returned else it will be empty
+        return volumeToList;
+    }
+
+    private String getMountedDatastoreName(VmwareHypervisorHost sourceHyperHost, String sourceHostApiVersion, StorageFilerTO filerTo) throws Exception {
+        String mountedDatastoreName = null;
+        // If host version is below 5.1 then simultaneous change of VM's datastore and host is not supported.
+        // So since only the datastore will be changed first, ensure the target datastore is mounted on source host.
+        if (sourceHostApiVersion.compareTo("5.1") < 0) {
+            s_logger.debug(String.format("Host: %s version is %s, vMotion without shared storage cannot be done. Check source host has target datastore mounted or can be mounted", sourceHyperHost.getHyperHostName(), sourceHostApiVersion));
+            ManagedObjectReference morVolumeDatastoreAtSource = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(sourceHyperHost, filerTo.getUuid());
+            String volumeDatastoreName = filerTo.getUuid().replace("-", "");
+            String volumeDatastoreHost = filerTo.getHost();
+            String volumeDatastorePath = filerTo.getPath();
+            int volumeDatastorePort = filerTo.getPort();
+
+            // If datastore is NFS and target datastore is not already mounted on source host then mount the datastore.
+            if (filerTo.getType().equals(StoragePoolType.NetworkFilesystem)) {
+                if (morVolumeDatastoreAtSource == null) {
+                    morVolumeDatastoreAtSource = sourceHyperHost.mountDatastore(false, volumeDatastoreHost, volumeDatastorePort, volumeDatastorePath, volumeDatastoreName, false);
+                    if (morVolumeDatastoreAtSource == null) {
+                        throw new Exception("Unable to mount NFS datastore " + volumeDatastoreHost + ":/" + volumeDatastorePath + " on host: " + sourceHyperHost.getHyperHostName());
+                    }
+                    mountedDatastoreName = volumeDatastoreName;
+                    s_logger.debug("Mounted NFS datastore " + volumeDatastoreHost + ":/" + volumeDatastorePath + " on host: " + sourceHyperHost.getHyperHostName());
+                }
+            }
+
+            // If datastore is VMFS and target datastore is not mounted or accessible to source host then fail migration.
+            if (filerTo.getType().equals(StoragePoolType.VMFS)) {
+                if (morVolumeDatastoreAtSource == null) {
+                    s_logger.warn("Host: " + sourceHyperHost.getHyperHostName() + " version is below 5.1, target VMFS datastore(s) need to be manually mounted on host for successful storage migration.");
+                    throw new Exception("Target VMFS datastore: " + volumeDatastorePath + " is not mounted on host: " + sourceHyperHost.getHyperHostName());
+                }
+                DatastoreMO dsAtSourceMo = new DatastoreMO(getServiceContext(), morVolumeDatastoreAtSource);
+                String srcHostValue = sourceHyperHost.getMor().getValue();
+                if (!dsAtSourceMo.isAccessibleToHost(srcHostValue)) {
+                    s_logger.warn("Host " + sourceHyperHost.getHyperHostName() + " version is below 5.1, target VMFS datastore(s) need to be accessible to host for a successful storage migration.");
+                    throw new Exception("Target VMFS datastore: " + volumeDatastorePath + " is not accessible on host: " + sourceHyperHost.getHyperHostName());
+                }
+            }
+        }
+        return mountedDatastoreName;
+    }
+
     private Answer execute(ValidateVcenterDetailsCommand cmd) {
         if (s_logger.isInfoEnabled()) {
             s_logger.info("Executing resource ValidateVcenterDetailsCommand " + _gson.toJson(cmd));
diff --git a/plugins/hypervisors/vmware/src/main/java/org/apache/cloudstack/storage/motion/VmwareStorageMotionStrategy.java b/plugins/hypervisors/vmware/src/main/java/org/apache/cloudstack/storage/motion/VmwareStorageMotionStrategy.java
index 5a7b4c4..8d08c38 100644
--- a/plugins/hypervisors/vmware/src/main/java/org/apache/cloudstack/storage/motion/VmwareStorageMotionStrategy.java
+++ b/plugins/hypervisors/vmware/src/main/java/org/apache/cloudstack/storage/motion/VmwareStorageMotionStrategy.java
@@ -26,6 +26,20 @@ import java.util.Map;
 
 import javax.inject.Inject;
 
+import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionStrategy;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Answer;
 import com.cloud.agent.api.MigrateWithStorageAnswer;
@@ -53,18 +67,6 @@ import com.cloud.utils.Pair;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.dao.VMInstanceDao;
-import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionStrategy;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
-import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
-import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
-import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
-import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
-import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
-import org.apache.cloudstack.storage.to.VolumeObjectTO;
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 @Component
 public class VmwareStorageMotionStrategy implements DataMotionStrategy {
@@ -88,9 +90,7 @@ public class VmwareStorageMotionStrategy implements DataMotionStrategy {
         if (isOnVmware(srcData, destData)
                 && isOnPrimary(srcData, destData)
                 && isVolumesOnly(srcData, destData)
-                && isDettached(srcData)
-                && isIntraCluster(srcData, destData)
-                && isStoreScopeEqual(srcData, destData)) {
+                && isDettached(srcData)) {
             if (s_logger.isDebugEnabled()) {
                 String msg = String.format("%s can handle the request because %d(%s) and %d(%s) share the VMware cluster %s (== %s)"
                         , this.getClass()
@@ -188,20 +188,42 @@ public class VmwareStorageMotionStrategy implements DataMotionStrategy {
             throw new UnsupportedOperationException();
         }
         StoragePool sourcePool = (StoragePool) srcData.getDataStore();
+        ScopeType sourceScopeType = srcData.getDataStore().getScope().getScopeType();
         StoragePool targetPool = (StoragePool) destData.getDataStore();
+        ScopeType targetScopeType = destData.getDataStore().getScope().getScopeType();
+        Long hostId = null;
+        String hostGuidInTargetCluster = null;
+        if (ScopeType.CLUSTER.equals(sourceScopeType)) {
+            // Find Volume source cluster and select any Vmware hypervisor host to attach worker VM
+            hostId = findSuitableHostIdForWorkerVmPlacement(sourcePool.getClusterId());
+            if (hostId == null) {
+                throw new CloudRuntimeException("Offline Migration failed, unable to find suitable host for worker VM placement in the cluster of storage pool: " + sourcePool.getName());
+            }
+            if (ScopeType.CLUSTER.equals(targetScopeType) && !sourcePool.getClusterId().equals(targetPool.getClusterId())) {
+                // Without host vMotion might fail between non-shared storages with error similar to,
+                // https://kb.vmware.com/s/article/1003795
+                List<HostVO> hosts = hostDao.findHypervisorHostInCluster(targetPool.getClusterId());
+                if (CollectionUtils.isNotEmpty(hosts)) {
+                    hostGuidInTargetCluster = hosts.get(0).getGuid();
+                }
+                if (hostGuidInTargetCluster == null) {
+                    throw new CloudRuntimeException("Offline Migration failed, unable to find suitable target host for worker VM placement while migrating between storage pools of different cluster without shared storages");
+                }
+            }
+        } else if (ScopeType.CLUSTER.equals(targetScopeType)) {
+            hostId = findSuitableHostIdForWorkerVmPlacement(targetPool.getClusterId());
+            if (hostId == null) {
+                throw new CloudRuntimeException("Offline Migration failed, unable to find suitable host for worker VM placement in the cluster of storage pool: " + targetPool.getName());
+            }
+        }
         MigrateVolumeCommand cmd = new MigrateVolumeCommand(srcData.getId()
                 , srcData.getTO().getPath()
                 , sourcePool
-                , targetPool);
+                , targetPool
+                , hostGuidInTargetCluster);
         // OfflineVmwareMigration: should be ((StoragePool)srcData.getDataStore()).getHypervisor() but that is NULL, so hardcoding
         Answer answer;
-        ScopeType scopeType = srcData.getDataStore().getScope().getScopeType();
-        if (ScopeType.CLUSTER == scopeType) {
-            // Find Volume source cluster and select any Vmware hypervisor host to attach worker VM
-            Long hostId = findSuitableHostIdForWorkerVmPlacement(sourcePool.getClusterId());
-            if (hostId == null) {
-                throw new CloudRuntimeException("Offline Migration failed, unable to find suitable host for worker VM placement in cluster: " + sourcePool.getName());
-            }
+        if (hostId != null) {
             answer = agentMgr.easySend(hostId, cmd);
         } else {
             answer = agentMgr.sendTo(sourcePool.getDataCenterId(), HypervisorType.VMware, cmd);
diff --git a/server/src/main/java/com/cloud/api/ApiDBUtils.java b/server/src/main/java/com/cloud/api/ApiDBUtils.java
index 87b9f36..9fade85 100644
--- a/server/src/main/java/com/cloud/api/ApiDBUtils.java
+++ b/server/src/main/java/com/cloud/api/ApiDBUtils.java
@@ -311,6 +311,7 @@ import com.cloud.user.dao.UserStatisticsDao;
 import com.cloud.uservm.UserVm;
 import com.cloud.utils.EnumUtils;
 import com.cloud.utils.Pair;
+import com.cloud.utils.StringUtils;
 import com.cloud.vm.ConsoleProxyVO;
 import com.cloud.vm.DomainRouterVO;
 import com.cloud.vm.InstanceGroup;
@@ -1736,7 +1737,17 @@ public class ApiDBUtils {
     ///////////////////////////////////////////////////////////////////////
 
     public static DomainRouterResponse newDomainRouterResponse(DomainRouterJoinVO vr, Account caller) {
-        return s_domainRouterJoinDao.newDomainRouterResponse(vr, caller);
+        DomainRouterResponse response = s_domainRouterJoinDao.newDomainRouterResponse(vr, caller);
+        if (StringUtils.isBlank(response.getHypervisor())) {
+            VMInstanceVO vm = ApiDBUtils.findVMInstanceById(vr.getId());
+            if (vm.getLastHostId() != null) {
+                HostVO lastHost = ApiDBUtils.findHostById(vm.getLastHostId());
+                if (lastHost != null) {
+                    response.setHypervisor(lastHost.getHypervisorType().toString());
+                }
+            }
+        }
+        return  response;
     }
 
     public static DomainRouterResponse fillRouterDetails(DomainRouterResponse vrData, DomainRouterJoinVO vr) {
diff --git a/server/src/main/java/com/cloud/api/ApiResponseHelper.java b/server/src/main/java/com/cloud/api/ApiResponseHelper.java
index b03fa7d..730130f 100644
--- a/server/src/main/java/com/cloud/api/ApiResponseHelper.java
+++ b/server/src/main/java/com/cloud/api/ApiResponseHelper.java
@@ -16,6 +16,8 @@
 // under the License.
 package com.cloud.api;
 
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
@@ -350,8 +352,6 @@ import com.cloud.vm.snapshot.VMSnapshot;
 import com.cloud.vm.snapshot.VMSnapshotVO;
 import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
-
 public class ApiResponseHelper implements ResponseGenerator {
 
     private static final Logger s_logger = Logger.getLogger(ApiResponseHelper.class);
@@ -1399,6 +1399,11 @@ public class ApiResponseHelper implements ResponseGenerator {
                     vmResponse.setHostName(host.getName());
                     vmResponse.setHypervisor(host.getHypervisorType().toString());
                 }
+            } else if (vm.getLastHostId() != null) {
+                Host lastHost = ApiDBUtils.findHostById(vm.getLastHostId());
+                if (lastHost != null) {
+                    vmResponse.setHypervisor(lastHost.getHypervisorType().toString());
+                }
             }
 
             if (vm.getType() == Type.SecondaryStorageVm || vm.getType() == Type.ConsoleProxy) {
diff --git a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruBase.java b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruBase.java
index 2ae35fc..c320a7a 100644
--- a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruBase.java
+++ b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruBase.java
@@ -47,6 +47,7 @@ import com.cloud.service.ServiceOfferingDetailsVO;
 import com.cloud.service.dao.ServiceOfferingDao;
 import com.cloud.service.dao.ServiceOfferingDetailsDao;
 import com.cloud.storage.StoragePool;
+import com.cloud.storage.Volume;
 import com.cloud.utils.Pair;
 import com.cloud.utils.StringUtils;
 import com.cloud.utils.component.AdapterBase;
@@ -297,7 +298,7 @@ public abstract class HypervisorGuruBase extends AdapterBase implements Hypervis
         return false;
     }
 
-    public List<Command> finalizeMigrate(VirtualMachine vm, StoragePool destination) {
+    public List<Command> finalizeMigrate(VirtualMachine vm, Map<Volume, StoragePool> volumeToPool) {
         return null;
     }
 }
diff --git a/server/src/main/java/com/cloud/server/ManagementServerImpl.java b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
index 8df118d..99d9221 100644
--- a/server/src/main/java/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
@@ -40,7 +40,6 @@ import javax.crypto.spec.SecretKeySpec;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
-import com.cloud.storage.Storage;
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.affinity.AffinityGroupProcessor;
 import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
@@ -657,6 +656,7 @@ import com.cloud.storage.GuestOSHypervisorVO;
 import com.cloud.storage.GuestOSVO;
 import com.cloud.storage.GuestOsCategory;
 import com.cloud.storage.ScopeType;
+import com.cloud.storage.Storage;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.Volume;
@@ -1257,15 +1257,16 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
             ex.addProxyObject(vm.getUuid(), "vmId");
             throw ex;
         }
+        String srcHostVersion = srcHost.getHypervisorVersion();
+        if (HypervisorType.KVM.equals(srcHost.getHypervisorType()) && srcHostVersion == null) {
+            srcHostVersion = "";
+        }
 
         // Check if the vm can be migrated with storage.
         boolean canMigrateWithStorage = false;
 
-        if (vm.getType() == VirtualMachine.Type.User) {
-            final HypervisorCapabilitiesVO capabilities = _hypervisorCapabilitiesDao.findByHypervisorTypeAndVersion(srcHost.getHypervisorType(), srcHost.getHypervisorVersion());
-            if (capabilities != null) {
-                canMigrateWithStorage = capabilities.isStorageMotionSupported();
-            }
+        if (VirtualMachine.Type.User.equals(vm.getType()) || HypervisorType.VMware.equals(vm.getHypervisorType())) {
+            canMigrateWithStorage = Boolean.TRUE.equals(_hypervisorCapabilitiesDao.isStorageMotionSupported(srcHost.getHypervisorType(), srcHostVersion));
         }
 
         // Check if the vm is using any disks on local storage.
@@ -1292,8 +1293,9 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
         final Map<Host, Boolean> requiresStorageMotion = new HashMap<Host, Boolean>();
         DataCenterDeployment plan = null;
         if (canMigrateWithStorage) {
-            allHostsPair = searchForServers(startIndex, pageSize, null, hostType, null, srcHost.getDataCenterId(), null, null, null, keyword,
-                null, null, srcHost.getHypervisorType(), srcHost.getHypervisorVersion(), srcHost.getId());
+            Long podId = !VirtualMachine.Type.User.equals(vm.getType()) ? srcHost.getPodId() : null;
+            allHostsPair = searchForServers(startIndex, pageSize, null, hostType, null, srcHost.getDataCenterId(), podId, null, null, keyword,
+                null, null, srcHost.getHypervisorType(), null, srcHost.getId());
             allHosts = allHostsPair.first();
             hostsForMigrationWithStorage = new ArrayList<>(allHosts);
 
@@ -1303,6 +1305,10 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
 
                 for (Iterator<HostVO> iterator = hostsForMigrationWithStorage.iterator(); iterator.hasNext();) {
                     final Host host = iterator.next();
+                    String hostVersion = host.getHypervisorVersion();
+                    if (HypervisorType.KVM.equals(host.getHypervisorType()) && hostVersion == null) {
+                        hostVersion = "";
+                    }
 
                     if (volClusterId != null) {
                         if (storagePool.isLocal() || !host.getClusterId().equals(volClusterId) || usesLocal) {
@@ -1314,7 +1320,12 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
                                 // source volume.
                                 iterator.remove();
                             } else {
-                                if (hasSuitablePoolsForVolume(volume, host, vmProfile)) {
+                                boolean hostSupportsStorageMigration = false;
+                                if ((srcHostVersion != null && srcHostVersion.equals(hostVersion)) ||
+                                        Boolean.TRUE.equals(_hypervisorCapabilitiesDao.isStorageMotionSupported(host.getHypervisorType(), hostVersion))) {
+                                    hostSupportsStorageMigration = true;
+                                }
+                                if (hostSupportsStorageMigration && hasSuitablePoolsForVolume(volume, host, vmProfile)) {
                                     requiresStorageMotion.put(host, true);
                                 } else {
                                     iterator.remove();
@@ -1334,7 +1345,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
                 }
             }
 
-            plan = new DataCenterDeployment(srcHost.getDataCenterId(), null, null, null, null, null);
+            plan = new DataCenterDeployment(srcHost.getDataCenterId(), podId, null, null, null, null);
         } else {
             final Long cluster = srcHost.getClusterId();
             if (s_logger.isDebugEnabled()) {
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index 181d537..d4d2cd2 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -16,6 +16,8 @@
 // under the License.
 package com.cloud.vm;
 
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.UnsupportedEncodingException;
@@ -48,12 +50,6 @@ import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
-import com.cloud.agent.api.to.deployasis.OVFPropertyTO;
-import com.cloud.deployasis.UserVmDeployAsIsDetailVO;
-import com.cloud.deployasis.dao.UserVmDeployAsIsDetailsDao;
-import com.cloud.exception.UnsupportedServiceException;
-import com.cloud.hypervisor.Hypervisor;
-import com.cloud.deployasis.dao.TemplateDeployAsIsDetailsDao;
 import org.apache.cloudstack.acl.ControlledEntity.ACLType;
 import org.apache.cloudstack.acl.SecurityChecker.AccessType;
 import org.apache.cloudstack.affinity.AffinityGroupService;
@@ -84,7 +80,6 @@ import org.apache.cloudstack.api.command.user.vm.UpgradeVMCmd;
 import org.apache.cloudstack.api.command.user.vmgroup.CreateVMGroupCmd;
 import org.apache.cloudstack.api.command.user.vmgroup.DeleteVMGroupCmd;
 import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
-import com.cloud.agent.api.to.deployasis.OVFNetworkTO;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.cloud.entity.api.VirtualMachineEntity;
 import org.apache.cloudstack.engine.cloud.entity.api.db.dao.VMNetworkMapDao;
@@ -145,6 +140,8 @@ import com.cloud.agent.api.VolumeStatsEntry;
 import com.cloud.agent.api.to.DiskTO;
 import com.cloud.agent.api.to.NicTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
+import com.cloud.agent.api.to.deployasis.OVFNetworkTO;
+import com.cloud.agent.api.to.deployasis.OVFPropertyTO;
 import com.cloud.agent.manager.Commands;
 import com.cloud.alert.AlertManager;
 import com.cloud.api.ApiDBUtils;
@@ -174,6 +171,9 @@ import com.cloud.deploy.DeploymentPlanner.ExcludeList;
 import com.cloud.deploy.DeploymentPlanningManager;
 import com.cloud.deploy.PlannerHostReservationVO;
 import com.cloud.deploy.dao.PlannerHostReservationDao;
+import com.cloud.deployasis.UserVmDeployAsIsDetailVO;
+import com.cloud.deployasis.dao.TemplateDeployAsIsDetailsDao;
+import com.cloud.deployasis.dao.UserVmDeployAsIsDetailsDao;
 import com.cloud.domain.Domain;
 import com.cloud.domain.DomainVO;
 import com.cloud.domain.dao.DomainDao;
@@ -195,6 +195,7 @@ import com.cloud.exception.PermissionDeniedException;
 import com.cloud.exception.ResourceAllocationException;
 import com.cloud.exception.ResourceUnavailableException;
 import com.cloud.exception.StorageUnavailableException;
+import com.cloud.exception.UnsupportedServiceException;
 import com.cloud.exception.VirtualMachineMigrationException;
 import com.cloud.gpu.GPU;
 import com.cloud.ha.HighAvailabilityManager;
@@ -202,8 +203,8 @@ import com.cloud.host.Host;
 import com.cloud.host.HostVO;
 import com.cloud.host.Status;
 import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.hypervisor.HypervisorCapabilitiesVO;
 import com.cloud.hypervisor.dao.HypervisorCapabilitiesDao;
 import com.cloud.hypervisor.kvm.dpdk.DpdkHelper;
 import com.cloud.network.IpAddressManager;
@@ -334,8 +335,6 @@ import com.cloud.vm.snapshot.VMSnapshotManager;
 import com.cloud.vm.snapshot.VMSnapshotVO;
 import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
-
 public class UserVmManagerImpl extends ManagerBase implements UserVmManager, VirtualMachineGuru, UserVmService, Configurable {
     private static final Logger s_logger = Logger.getLogger(UserVmManagerImpl.class);
 
@@ -5665,8 +5664,7 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         return _vmDao.findById(vmId);
     }
 
-    @Override
-    public VirtualMachine vmStorageMigration(Long vmId, StoragePool destPool) {
+    private VMInstanceVO preVmStorageMigrationCheck(Long vmId) {
         // access check - only root admin can migrate VM
         Account caller = CallContext.current().getCallingAccount();
         if (!_accountMgr.isRootAdmin(caller.getId())) {
@@ -5687,9 +5685,8 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
             throw ex;
         }
 
-        if (vm.getType() != VirtualMachine.Type.User) {
-            // OffLineVmwareMigration: *WHY* ?
-            throw new InvalidParameterValueException("can only do storage migration on user vm");
+        if (vm.getType() != VirtualMachine.Type.User && !HypervisorType.VMware.equals(vm.getHypervisorType())) {
+            throw new InvalidParameterValueException("cannot do storage migration on non-user vm for hypervisor: " + vm.getHypervisorType().toString() + ", only supported for VMware");
         }
 
         List<VolumeVO> vols = _volsDao.findByInstance(vm.getId());
@@ -5707,17 +5704,62 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
             throw new InvalidParameterValueException("VM's disk cannot be migrated, please remove all the VM Snapshots for this VM");
         }
 
-        checkDestinationHypervisorType(destPool, vm);
-        if (destPool.getPoolType() == Storage.StoragePoolType.DatastoreCluster) {
-            DataCenter dc = _entityMgr.findById(DataCenter.class, vm.getDataCenterId());
-            Pod destPoolPod = _entityMgr.findById(Pod.class, destPool.getPodId());
+        return vm;
+    }
 
-            destPool = volumeMgr.findChildDataStoreInDataStoreCluster(dc, destPoolPod, destPool.getClusterId(), null, null, destPool.getId());
+    private VirtualMachine findMigratedVm(long vmId, VirtualMachine.Type vmType) {
+        if (VirtualMachine.Type.User.equals(vmType)) {
+            return _vmDao.findById(vmId);
         }
+        return _vmInstanceDao.findById(vmId);
+    }
 
-        _itMgr.storageMigration(vm.getUuid(), destPool);
-        return _vmDao.findById(vm.getId());
+    @Override
+    public VirtualMachine vmStorageMigration(Long vmId, StoragePool destPool) {
+        VMInstanceVO vm = preVmStorageMigrationCheck(vmId);
+        Map<Long, Long> volumeToPoolIds = new HashMap<>();
+        checkDestinationHypervisorType(destPool, vm);
+        List<VolumeVO> volumes = _volsDao.findByInstance(vm.getId());
+        StoragePoolVO destinationPoolVo = _storagePoolDao.findById(destPool.getId());
+        Long destPoolPodId = ScopeType.CLUSTER.equals(destinationPoolVo.getScope()) || ScopeType.HOST.equals(destinationPoolVo.getScope()) ?
+                destinationPoolVo.getPodId() : null;
+        for (VolumeVO volume : volumes) {
+            if (!VirtualMachine.Type.User.equals(vm.getType())) {
+                // Migrate within same pod as source storage and same cluster for all disks only. Hypervisor check already done
+                StoragePoolVO pool = _storagePoolDao.findById(volume.getPoolId());
+                if (destPoolPodId != null &&
+                        (ScopeType.CLUSTER.equals(pool.getScope()) || ScopeType.HOST.equals(pool.getScope())) &&
+                        !destPoolPodId.equals(pool.getPodId())) {
+                    throw new InvalidParameterValueException("Storage migration of non-user VMs cannot be done between storage pools of different pods");
+                }
+            }
+            volumeToPoolIds.put(volume.getId(), destPool.getId());
+        }
+        _itMgr.storageMigration(vm.getUuid(), volumeToPoolIds);
+        return findMigratedVm(vm.getId(), vm.getType());
+    }
 
+    @Override
+    public VirtualMachine vmStorageMigration(Long vmId, Map<String, String> volumeToPool) {
+        VMInstanceVO vm = preVmStorageMigrationCheck(vmId);
+        Map<Long, Long> volumeToPoolIds = new HashMap<>();
+        Long poolClusterId = null;
+        for (Map.Entry<String, String> entry : volumeToPool.entrySet()) {
+            Volume volume = _volsDao.findByUuid(entry.getKey());
+            StoragePoolVO pool = _storagePoolDao.findPoolByUUID(entry.getValue());
+            if (poolClusterId != null &&
+                    (ScopeType.CLUSTER.equals(pool.getScope()) || ScopeType.HOST.equals(pool.getScope())) &&
+                    !poolClusterId.equals(pool.getClusterId())) {
+                throw new InvalidParameterValueException("VM's disk cannot be migrated, input destination storage pools belong to different clusters");
+            }
+            if (pool.getClusterId() != null) {
+                poolClusterId = pool.getClusterId();
+            }
+            checkDestinationHypervisorType(pool, vm);
+            volumeToPoolIds.put(volume.getId(), pool.getId());
+        }
+        _itMgr.storageMigration(vm.getUuid(), volumeToPoolIds);
+        return findMigratedVm(vm.getId(), vm.getType());
     }
 
     private void checkDestinationHypervisorType(StoragePool destPool, VMInstanceVO vm) {
@@ -5805,6 +5847,12 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
 
         // check if migrating to same host
         long srcHostId = vm.getHostId();
+        Host srcHost = _resourceMgr.getHost(srcHostId);
+        if (srcHost == null) {
+            throw new InvalidParameterValueException("Cannot migrate VM, host with id: " + srcHostId + " for VM not found");
+        }
+
+
         if (destinationHost.getId() == srcHostId) {
             throw new InvalidParameterValueException("Cannot migrate VM, VM is already present on this host, please specify valid destination host to migrate the VM");
         }
@@ -5816,13 +5864,9 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         }
 
         if (vm.getType() != VirtualMachine.Type.User) {
-            // for System VMs check that the destination host is within the same
-            // cluster
-            HostVO srcHost = _hostDao.findById(srcHostId);
-            if (srcHost != null && srcHost.getClusterId() != null && destinationHost.getClusterId() != null) {
-                if (srcHost.getClusterId().longValue() != destinationHost.getClusterId().longValue()) {
-                    throw new InvalidParameterValueException("Cannot migrate the VM, destination host is not in the same cluster as current host of the VM");
-                }
+            // for System VMs check that the destination host is within the same pod
+            if (srcHost.getPodId() != null && !srcHost.getPodId().equals(destinationHost.getPodId())) {
+                throw new InvalidParameterValueException("Cannot migrate the VM, destination host is not in the same pod as current host of the VM");
             }
         }
 
@@ -5861,12 +5905,7 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
             collectVmNetworkStatistics(uservm);
         }
         _itMgr.migrate(vm.getUuid(), srcHostId, dest);
-        VMInstanceVO vmInstance = _vmInstanceDao.findById(vmId);
-        if (vmInstance.getType().equals(VirtualMachine.Type.User)) {
-            return _vmDao.findById(vmId);
-        } else {
-            return vmInstance;
-        }
+        return findMigratedVm(vm.getId(), vm.getType());
     }
 
     private boolean isOnSupportedHypevisorForMigration(VMInstanceVO vm) {
@@ -6104,6 +6143,23 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         return implicitPlannerUsed;
     }
 
+    private boolean isVmVolumesOnZoneWideStore(VMInstanceVO vm) {
+        final List<VolumeVO> volumes = _volsDao.findCreatedByInstance(vm.getId());
+        if (CollectionUtils.isEmpty(volumes)) {
+            return false;
+        }
+        for (Volume volume : volumes) {
+            if (volume == null || volume.getPoolId() == null) {
+                return false;
+            }
+            StoragePoolVO pool = _storagePoolDao.findById(volume.getPoolId());
+            if (pool == null || !ScopeType.ZONE.equals(pool.getScope())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @Override
     @ActionEvent(eventType = EventTypes.EVENT_VM_MIGRATE, eventDescription = "migrating VM", async = true)
     public VirtualMachine migrateVirtualMachineWithVolume(Long vmId, Host destinationHost, Map<String, String> volumeToPool) throws ResourceUnavailableException,
@@ -6147,61 +6203,52 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         long srcHostId = vm.getHostId();
         Host srcHost = _resourceMgr.getHost(srcHostId);
 
-        if(srcHost == null ){
-            throw new InvalidParameterValueException("Cannot migrate VM, there is not Host with id: " + srcHostId);
+        if (srcHost == null) {
+            throw new InvalidParameterValueException("Cannot migrate VM, host with id: " + srcHostId + " for VM not found");
         }
 
-        // Check if src and destination hosts are valid and migrating to same host
+        // Check if source and destination hosts are valid and migrating to same host
         if (destinationHost.getId() == srcHostId) {
             throw new InvalidParameterValueException("Cannot migrate VM, VM is already present on this host, please" + " specify valid destination host to migrate the VM");
         }
 
+        String srcHostVersion = srcHost.getHypervisorVersion();
+        String destHostVersion = destinationHost.getHypervisorVersion();
+
         // Check if the source and destination hosts are of the same type and support storage motion.
         if (!srcHost.getHypervisorType().equals(destinationHost.getHypervisorType())) {
             throw new CloudRuntimeException("The source and destination hosts are not of the same type and version. Source hypervisor type and version: " +
-                    srcHost.getHypervisorType().toString() + " " + srcHost.getHypervisorVersion() + ", Destination hypervisor type and version: " +
-                    destinationHost.getHypervisorType().toString() + " " + destinationHost.getHypervisorVersion());
+                    srcHost.getHypervisorType().toString() + " " + srcHostVersion + ", Destination hypervisor type and version: " +
+                    destinationHost.getHypervisorType().toString() + " " + destHostVersion);
         }
 
-        String srcHostVersion = srcHost.getHypervisorVersion();
-        String destinationHostVersion = destinationHost.getHypervisorVersion();
+        if (!VirtualMachine.Type.User.equals(vm.getType())) {
+            // for System VMs check that the destination host is within the same pod
+            if (srcHost.getPodId() != null && !srcHost.getPodId().equals(destinationHost.getPodId())) {
+                throw new InvalidParameterValueException("Cannot migrate the VM, destination host is not in the same pod as current host of the VM");
+            }
+        }
 
         if (HypervisorType.KVM.equals(srcHost.getHypervisorType())) {
             if (srcHostVersion == null) {
                 srcHostVersion = "";
             }
 
-            if (destinationHostVersion == null) {
-                destinationHostVersion = "";
+            if (destHostVersion == null) {
+                destHostVersion = "";
             }
         }
 
-        if (!srcHostVersion.equals(destinationHostVersion)) {
-            throw new CloudRuntimeException("The source and destination hosts are not of the same type and version. Source hypervisor type and version: " +
-                    srcHost.getHypervisorType().toString() + " " + srcHost.getHypervisorVersion() + ", Destination hypervisor type and version: " +
-                    destinationHost.getHypervisorType().toString() + " " + destinationHost.getHypervisorVersion());
+        if (!Boolean.TRUE.equals(_hypervisorCapabilitiesDao.isStorageMotionSupported(srcHost.getHypervisorType(), srcHostVersion))) {
+            throw new CloudRuntimeException("Migration with storage isn't supported for source host ID: " + srcHost.getUuid() + " on hypervisor " + srcHost.getHypervisorType() + " of version " + srcHost.getHypervisorVersion());
         }
 
-        HypervisorCapabilitiesVO capabilities = _hypervisorCapabilitiesDao.findByHypervisorTypeAndVersion(srcHost.getHypervisorType(), srcHost.getHypervisorVersion());
-
-        if (capabilities == null && HypervisorType.KVM.equals(srcHost.getHypervisorType())) {
-            List<HypervisorCapabilitiesVO> lstHypervisorCapabilities = _hypervisorCapabilitiesDao.listAllByHypervisorType(HypervisorType.KVM);
-
-            if (lstHypervisorCapabilities != null) {
-                for (HypervisorCapabilitiesVO hypervisorCapabilities : lstHypervisorCapabilities) {
-                    if (hypervisorCapabilities.isStorageMotionSupported()) {
-                        capabilities = hypervisorCapabilities;
-
-                        break;
-                    }
-                }
+        if (srcHostVersion == null || !srcHostVersion.equals(destHostVersion)) {
+            if (!Boolean.TRUE.equals(_hypervisorCapabilitiesDao.isStorageMotionSupported(destinationHost.getHypervisorType(), destHostVersion))) {
+                throw new CloudRuntimeException("Migration with storage isn't supported for target host ID: " + srcHost.getUuid() + " on hypervisor " + srcHost.getHypervisorType() + " of version " + srcHost.getHypervisorVersion());
             }
         }
 
-        if (!capabilities.isStorageMotionSupported()) {
-            throw new CloudRuntimeException("Migration with storage isn't supported on hypervisor " + srcHost.getHypervisorType() + " of version " + srcHost.getHypervisorVersion());
-        }
-
         // Check if destination host is up.
         if (destinationHost.getState() != com.cloud.host.Status.Up || destinationHost.getResourceState() != ResourceState.Enabled) {
             throw new CloudRuntimeException("Cannot migrate VM, destination host is not in correct state, has " + "status: " + destinationHost.getState() + ", state: "
@@ -6215,16 +6262,18 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
 
         List<VolumeVO> vmVolumes = _volsDao.findUsableVolumesForInstance(vm.getId());
         Map<Long, Long> volToPoolObjectMap = new HashMap<Long, Long>();
-        if (!isVMUsingLocalStorage(vm) && destinationHost.getClusterId().equals(srcHost.getClusterId())) {
-            if (volumeToPool.isEmpty()) {
-                // If the destination host is in the same cluster and volumes do not have to be migrated across pools
-                // then fail the call. migrateVirtualMachine api should have been used.
-                throw new InvalidParameterValueException("Migration of the vm " + vm + "from host " + srcHost + " to destination host " + destinationHost
-                        + " doesn't involve migrating the volumes.");
+        if (!isVMUsingLocalStorage(vm) && MapUtils.isEmpty(volumeToPool)
+            && (destinationHost.getClusterId().equals(srcHost.getClusterId()) || isVmVolumesOnZoneWideStore(vm))){
+            // If volumes do not have to be migrated
+            // call migrateVirtualMachine for non-user VMs else throw exception
+            if (!VirtualMachine.Type.User.equals(vm.getType())) {
+                return migrateVirtualMachine(vmId, destinationHost);
             }
+            throw new InvalidParameterValueException("Migration of the vm " + vm + "from host " + srcHost + " to destination host " + destinationHost
+                    + " doesn't involve migrating the volumes.");
         }
 
-        if (!volumeToPool.isEmpty()) {
+        if (MapUtils.isNotEmpty(volumeToPool)) {
             // Check if all the volumes and pools passed as parameters are valid.
             for (Map.Entry<String, String> entry : volumeToPool.entrySet()) {
                 VolumeVO volume = _volsDao.findByUuid(entry.getKey());
@@ -6274,7 +6323,7 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         checkHostsDedication(vm, srcHostId, destinationHost.getId());
 
         _itMgr.migrateWithStorage(vm.getUuid(), srcHostId, destinationHost.getId(), volToPoolObjectMap);
-        return _vmDao.findById(vm.getId());
+        return findMigratedVm(vm.getId(), vm.getType());
     }
 
     @DB
diff --git a/test/integration/component/test_interpod_migration.py b/test/integration/component/test_interpod_migration.py
new file mode 100644
index 0000000..b8b2e97
--- /dev/null
+++ b/test/integration/component/test_interpod_migration.py
@@ -0,0 +1,464 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+""" BVT tests for Virtual Machine Life Cycle
+"""
+# Import Local Modules
+from marvin.cloudstackTestCase import cloudstackTestCase
+from marvin.cloudstackAPI import (attachVolume,
+                                  detachVolume,
+                                  deleteVolume,
+                                  attachIso,
+                                  detachIso,
+                                  deleteIso,
+                                  startVirtualMachine,
+                                  stopVirtualMachine,
+                                  migrateVirtualMachineWithVolume)
+from marvin.lib.utils import (cleanup_resources)
+from marvin.lib.base import (Account,
+                             Host,
+                             Pod,
+                             StoragePool,
+                             ServiceOffering,
+                             DiskOffering,
+                             VirtualMachine,
+                             Iso,
+                             Volume)
+from marvin.lib.common import (get_domain,
+                               get_zone,
+                               get_template)
+from marvin.lib.decoratorGenerators import skipTestIf
+from marvin.codes import FAILED, PASS
+from nose.plugins.attrib import attr
+# Import System modules
+import time
+
+_multiprocess_shared_ = True
+
+
+class TestVMMigration(cloudstackTestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        testClient = super(TestVMMigration, cls).getClsTestClient()
+        cls.apiclient = testClient.getApiClient()
+        cls.services = testClient.getParsedTestDataConfig()
+
+        # Get Zone, Domain and templates
+        cls.domain = get_domain(cls.apiclient)
+        cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
+        cls.services['mode'] = cls.zone.networktype
+
+        cls.cleanup = []
+        cls.hypervisorNotSupported = False
+        cls.hypervisor = cls.testClient.getHypervisorInfo()
+        if cls.hypervisor.lower() not in ['vmware']:
+            cls.hypervisorNotSupported = True
+
+        if cls.hypervisorNotSupported == False:
+            cls.pods = Pod.list(cls.apiclient, zoneid=cls.zone.id, listall=True)
+            if len(cls.pods) < 2:
+                assert False, "Not enough pods found: %d" % len(cls.pods)
+            cls.computeOfferingStorageTags = None
+            cls.diskOfferingStorageTags = None
+
+            for pod in cls.pods:
+                podStoragePools = StoragePool.list(
+                    cls.apiclient,
+                    scope='CLUSTER',
+                    podid=pod.id)
+                if len(podStoragePools) < 1:
+                    assert False, "Not enough CLUSTER scope storage pools found for pod: %s" % pod.id
+                taggedPool = []
+                for pool in podStoragePools:
+                    if pool.tags != None and len(pool.tags) > 0:
+                        taggedPool.append(pool)
+                if len(taggedPool) < 2:
+                    assert False, "No CLUSTER scope, tagged storage pools found for pod: %s" % pod.id
+                if cls.computeOfferingStorageTags == None:
+                    cls.computeOfferingStorageTags = taggedPool[0].tags
+                if cls.diskOfferingStorageTags == None:
+                    cls.diskOfferingStorageTags = taggedPool[1].tags
+
+            template = get_template(
+                cls.apiclient,
+                cls.zone.id,
+                cls.services["ostype"])
+            if template == FAILED:
+                assert False, "get_template() failed to return template with description %s" % cls.services["ostype"]
+
+            # Set Zones and disk offerings
+            cls.services["small"]["zoneid"] = cls.zone.id
+            cls.services["small"]["template"] = template.id
+
+            cls.services["iso"]["zoneid"] = cls.zone.id
+
+            cls.account = Account.create(
+                cls.apiclient,
+                cls.services["account"],
+                domainid=cls.domain.id)
+            cls.debug(cls.account.id)
+
+            compute_offering_service = cls.services["service_offerings"]["tiny"].copy()
+            compute_offering_service["tags"] = cls.computeOfferingStorageTags
+            cls.service_offering = ServiceOffering.create(
+                cls.apiclient,
+                compute_offering_service)
+            disk_offering_service = cls.services["disk_offering"].copy()
+            disk_offering_service["disksize"] = 1
+            cls.untagged_disk_offering = DiskOffering.create(
+                cls.apiclient,
+                disk_offering_service)
+            disk_offering_service["tags"] = cls.diskOfferingStorageTags
+            cls.tagged_disk_offering = DiskOffering.create(
+                cls.apiclient,
+                disk_offering_service)
+            cls.hostId = None
+            host = cls.getOldestHost(cls.pods[0].id, cls.pods[1].id)
+            if host != None:
+                cls.hostId = host.id
+
+            cls.cleanup = [
+                cls.service_offering,
+                cls.untagged_disk_offering,
+                cls.tagged_disk_offering,
+                cls.account
+            ]
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            cleanup_resources(cls.apiclient, cls.cleanup)
+        except Exception as e:
+            raise Exception("Warning: Exception during cleanup : %s" % e)
+
+    def setUp(self):
+        self.apiclient = self.testClient.getApiClient()
+        self.dbclient = self.testClient.getDbConnection()
+        self.virtual_machine = None
+        if self.hypervisorNotSupported == False:
+            self.virtual_machine = VirtualMachine.create(
+                self.apiclient,
+                self.services["small"],
+                accountid=self.account.name,
+                domainid=self.account.domainid,
+                serviceofferingid=self.service_offering.id,
+                mode=self.services['mode'],
+                hostid=self.hostId
+            )
+
+        self.cleanup = []
+
+    def tearDown(self):
+        try:
+            if self.virtual_machine != None:
+                self.virtual_machine.delete(self.apiclient, expunge=True)
+            # Clean up, terminate the created accounts, domains etc
+            cleanup_resources(self.apiclient, self.cleanup)
+        except Exception as e:
+            raise Exception("Warning: Exception during cleanup : %s" % e)
+
+        return
+
+    @classmethod
+    def getOldestHost(cls, pod1_id, pod2_id):
+        selectedHost = None
+        hosts = Host.list(cls.apiclient, type='Routing', podid=pod1_id)
+        morehosts = Host.list(cls.apiclient, type='Routing', podid=pod2_id)
+        if isinstance(morehosts, list) and len(morehosts)>0:
+            if isinstance(hosts, list) and len(hosts)>0:
+                hosts.extend(morehosts)
+        if isinstance(hosts, list) and len(hosts)>0:
+            selectedHost = hosts[0]
+            # Very basic way to get lowest version host
+            for host in hosts:
+                if int(host.hypervisorversion.replace(".",  "")) < int(selectedHost.hypervisorversion.replace(".",  "")):
+                    selectedHost = host
+        return selectedHost
+
+    @skipTestIf("hypervisorNotSupported")
+    @attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"], required_hardware="false")
+    def test_01_migrate_running_vm(self):
+        """Test Running Virtual Machine Migration Without DATA disk or ISO
+        """
+        # Validate the following:
+        # 1. Start VM if not running
+        # 2. Migrate VM to a different pod multiple times
+        vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id)
+        if vmResponse.state != 'Running':
+            self.startVm(vmResponse.id)
+        migrationCount = 1
+        while migrationCount > 0:
+            vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id, 'Running')
+            hostId = self.getDifferentPodHost(vmResponse.id, vmResponse.hostid).id
+            self.debug("#%d migration, current host ID: %s, new host ID: %s" % ((2-migrationCount), vmResponse.hostid, hostId))
+            self.migrateVmWithVolume(vmResponse.id, hostId)
+            migrationCount = migrationCount - 1
+            if migrationCount > 0:
+                time.sleep(self.services["sleep"])
+        return
+
+    @skipTestIf("hypervisorNotSupported")
+    @attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"], required_hardware="false")
+    def test_02_migrate_running_vm_with_disk_and_iso(self):
+        """Test Running Virtual Machine Migration With DATA disks or ISO
+        """
+        # Validate the following:
+        # 1. Start VM if not running
+        # 2. Add disks and ISO to the VM
+        # 3. Migrate VM to a different pod multiple times
+        # 4. Remove disks and ISO from the VM
+        vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id)
+        if vmResponse.state != 'Running':
+            self.startVm(vmResponse.id)
+        vol1 = self.addVolumeToVm(vmResponse.id, self.tagged_disk_offering)
+        vol2 = self.addVolumeToVm(vmResponse.id, self.untagged_disk_offering)
+        # self.addIsoToVm(vmResponse.id)
+        migrationCount = 1
+        while migrationCount > 0:
+            vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id, 'Running')
+            hostId = self.getDifferentPodHost(vmResponse.id, vmResponse.hostid).id
+            self.debug("#%d migration, current host ID: %s, new host ID: %s" % ((2-migrationCount), vmResponse.hostid, hostId))
+            self.migrateVmWithVolume(vmResponse.id, hostId)
+            migrationCount = migrationCount - 1
+            if migrationCount > 0:
+                time.sleep(self.services["sleep"])
+        self.removeVolumeFromVm(vol1.id)
+        self.removeVolumeFromVm(vol2.id)
+        # self.removeIsoFromVm(vmResponse.id, vmResponse.isoid)
+        return
+
+    @skipTestIf("hypervisorNotSupported")
+    @attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"], required_hardware="false")
+    def test_03_migrate_stopped_vm(self):
+        """Test Stopped Virtual Machine Migration Without DATA disk or ISO
+        """
+        # Validate the following:
+        # 1. Stop VM if not already stopped
+        # 2. Migrate VM to a different pod multiple times with volume to pool mapping
+        vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id)
+        if vmResponse.state != 'Stopped':
+            self.stopVm(vmResponse.id)
+        migrationCount = 3
+        while migrationCount > 0:
+            vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id, 'Stopped')
+            migrateTo = self.getDifferentPodVolumeStoragePoolMapping(vmResponse.id)
+            self.debug("#%d migration, mapping: %s" % ((4-migrationCount), migrateTo))
+            self.migrateVmWithVolume(vmResponse.id, None, migrateTo)
+            migrationCount = migrationCount - 1
+            if migrationCount > 0:
+                time.sleep(self.services["sleep"])
+        return
+
+    @skipTestIf("hypervisorNotSupported")
+    @attr(tags=["devcloud", "advanced", "advancedns", "smoke", "basic", "sg"], required_hardware="false")
+    def test_04_migrate_stopped_vm_with_disk_and_iso(self):
+        """Test Stopped Virtual Machine Migration With DATA disk or ISO
+        """
+        # Validate the following:
+        # 1. Start VM if not running
+        # 2. Add disks and ISO to the VM
+        # 3. Stop the VM
+        # 4. Migrate VM to a different pod multiple times with volume to pool mapping
+        # 5. Start VM and remove disks and ISO from the VM
+        vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id)
+        if vmResponse.state != 'Running':
+            self.startVm(vmResponse.id)
+        vol1 = self.addVolumeToVm(vmResponse.id, self.tagged_disk_offering)
+        vol2 = self.addVolumeToVm(vmResponse.id, self.untagged_disk_offering)
+        # self.addIsoToVm(vmResponse.id)
+        self.stopVm(vmResponse.id)
+        migrationCount = 3
+        while migrationCount > 0:
+            vmResponse = self.getVmVerifiedResponse(self.virtual_machine.id, 'Stopped')
+            migrateTo = self.getDifferentPodVolumeStoragePoolMapping(vmResponse.id)
+            self.debug("#%d migration, mapping: %s" % ((4-migrationCount), migrateTo))
+            self.migrateVmWithVolume(vmResponse.id, None, migrateTo)
+            migrationCount = migrationCount - 1
+            if migrationCount > 0:
+                time.sleep(self.services["sleep"])
+        self.removeVolumeFromVm(vol1.id)
+        self.removeVolumeFromVm(vol2.id)
+        # self.removeIsoFromVm(vmResponse.id, vmResponse.isoid)
+        return
+
+    def startVm(self, vm_id):
+        startVirtualMachineCmd = startVirtualMachine.startVirtualMachineCmd()
+        startVirtualMachineCmd.id = vm_id
+        self.apiclient.startVirtualMachine(startVirtualMachineCmd)
+
+    def stopVm(self, vm_id):
+        stopVirtualMachineCmd = stopVirtualMachine.stopVirtualMachineCmd()
+        stopVirtualMachineCmd.id = vm_id
+        self.apiclient.stopVirtualMachine(stopVirtualMachineCmd)
+
+    def addVolumeToVm(self, vm_id, disk_offering):
+        volume = Volume.create(
+            self.apiclient,
+            self.services["volume"],
+            zoneid=self.zone.id,
+            diskofferingid=disk_offering.id,
+            account=self.account.name,
+            domainid=self.account.domainid)
+        cmd = attachVolume.attachVolumeCmd()
+        cmd.id = volume.id
+        cmd.virtualmachineid = vm_id
+        attachedVolume = self.apiclient.attachVolume(cmd)
+        return attachedVolume
+
+    def removeVolumeFromVm(self, volume_id):
+        cmd = detachVolume.detachVolumeCmd()
+        cmd.id = volume_id
+        detachedVolume = self.apiclient.detachVolume(cmd)
+        cmd = deleteVolume.deleteVolumeCmd()
+        cmd.id = volume_id
+        self.apiclient.deleteVolume(cmd)
+        return
+
+    def addIsoToVm(self, vm_id):
+        iso = Iso.create(
+           self.apiclient,
+           self.services["iso"],
+           account=self.account.name,
+           domainid=self.account.domainid)
+        cmd = attachIso.attachIsoCmd()
+        cmd.id = iso.id
+        cmd.virtualmachineid = vm_id
+        attachedIso = self.apiclient.attachIso(cmd)
+        return
+
+    def removeIsoFromVm(self, vm_id, iso_id):
+        cmd = detachIso.detachIsoCmd()
+        cmd.virtualmachineid = vm_id
+        self.apiclient.detachIso(cmd)
+        cmd = deleteIso.deleteIsoCmd()
+        cmd.id = iso_id
+        self.apiclient.deleteIso(cmd)
+        return
+
+    def getVmVerifiedResponse(self, vm_id, state=None):
+        list_vm_response = VirtualMachine.list(
+            self.apiclient,
+            id=self.virtual_machine.id)
+        self.debug(
+            "Verify listVirtualMachines response for virtual machine: %s" \
+            % self.virtual_machine.id)
+        self.assertEqual(
+            isinstance(list_vm_response, list),
+            True,
+            "Check list response returns a valid list")
+        self.assertNotEqual(
+            len(list_vm_response),
+            0,
+            "Check VM available in List Virtual Machines")
+        vmResponse = list_vm_response[0]
+        if state != None:
+            self.assertEqual(
+                vmResponse.state,
+                state,
+                "VM not in state: %s" % state)
+        return vmResponse
+
+    def getDifferentPodHost(self, vm_id, host_id):
+        host = None
+        currentHost = Host.list(self.apiclient, id=host_id)
+        self.assertEqual(
+            isinstance(currentHost, list),
+            True,
+            "Check host list response returns a valid list")
+        self.assertNotEqual(
+            len(currentHost),
+            0,
+            "Check current host for VM  ID: %s available in List Hosts" % vm_id)
+        currentHost = currentHost[0]
+        hosts = Host.listForMigration(self.apiclient, virtualmachineid=vm_id)
+        self.assertEqual(
+            isinstance(hosts, list),
+            True,
+            "Check host list response returns a valid list")
+        self.assertNotEqual(
+            len(hosts),
+            0,
+            "Hosts suitable for migration for VM ID: %s not found" % vm_id)
+        for hostForMigration in hosts:
+            if hostForMigration.podid != currentHost.podid:
+                host = hostForMigration
+                break
+        self.assertNotEqual(
+            host,
+            None,
+            "Host suitable for migration for VM ID: %s in a different pod not found" % vm_id)
+        return host
+
+    def getPodStoragePoolWithTags(self, pod_id, tags=None):
+        pool = None
+        storage_pools = StoragePool.list(
+            self.apiclient,
+            podid=pod_id,
+            listall=True)
+        if isinstance(storage_pools, list) and len(storage_pools) > 0:
+            if tags != None:
+                for storage_pool in storage_pools:
+                    if storage_pool.tags == tags:
+                        pool = storage_pool
+                        break
+            else:
+                pool = storage_pool[0]
+        return pool
+
+    def getDifferentPodVolumeStoragePoolMapping(self, vm_id):
+        rootVolume = Volume.list(self.apiclient, virtualmachineid=vm_id, listall=True, type='ROOT')
+        self.assertEqual(
+            isinstance(rootVolume, list),
+            True,
+            "Check VM volumes list response returns a valid list")
+        self.assertNotEqual(
+            len(rootVolume),
+            0,
+            "Check VM ROOT volume available in List Volumes")
+        rootVolume = rootVolume[0]
+        volumeStoragePool = StoragePool.list(
+            self.apiclient,
+            id=rootVolume.storageid)
+        self.assertEqual(
+            isinstance(volumeStoragePool, list),
+            True,
+            "Check VM ROOT Volume storage list response returns a valid list")
+        self.assertNotEqual(
+            len(volumeStoragePool),
+            0,
+            "Check VM ROOT Volume storage available in List Storage Pools")
+        volumeStoragePool = volumeStoragePool[0]
+        podId = self.pods[0].id
+        if volumeStoragePool.podid == podId:
+            podId = self.pods[1].id
+        pool = self.getPodStoragePoolWithTags(podId, self.computeOfferingStorageTags)
+        self.assertNotEqual(
+            pool,
+            None,
+            "Target storage pool mapping for VM ID: %s failed" % vm_id)
+        migrateTo = { "volume": rootVolume.id, "pool": pool.id}
+        return [migrateTo]
+
+    def migrateVmWithVolume(self, vm_id, host_id, migrate_to=None):
+        migrateVirtualMachineWithVolumeCmd = migrateVirtualMachineWithVolume.migrateVirtualMachineWithVolumeCmd()
+        migrateVirtualMachineWithVolumeCmd.virtualmachineid = vm_id
+        if host_id != None:
+            migrateVirtualMachineWithVolumeCmd.hostid = host_id
+        if migrate_to != None:
+            migrateVirtualMachineWithVolumeCmd.migrateto = migrate_to
+        response = self.apiclient.migrateVirtualMachineWithVolume(migrateVirtualMachineWithVolumeCmd)
+        return response
diff --git a/ui/public/locales/en.json b/ui/public/locales/en.json
index 5b1cc6a..f14878c 100644
--- a/ui/public/locales/en.json
+++ b/ui/public/locales/en.json
@@ -213,8 +213,10 @@
 "label.action.migrate.instance.processing": "Migrating Instance....",
 "label.action.migrate.router": "Migrate Router",
 "label.action.migrate.router.processing": "Migrating Router....",
+"label.action.migrate.router.to.ps": "Migrate router to another primary storage",
 "label.action.migrate.systemvm": "Migrate System VM",
 "label.action.migrate.systemvm.processing": "Migrating System VM....",
+"label.action.migrate.systemvm.to.ps": "Migrate system VM to another primary storage",
 "label.action.project.add.account": "Add Account to Project",
 "label.action.project.add.user": "Add User to Project",
 "label.action.reboot.instance": "Reboot Instance",
@@ -1358,6 +1360,7 @@
 "label.migrate.instance.to.host": "Migrate instance to another host",
 "label.migrate.instance.to.ps": "Migrate instance to another primary storage",
 "label.migrate.lb.vm": "Migrate LB VM",
+"label.migrate.lb.vm.to.ps": "Migrate LB VM to another primary storage",
 "label.migrate.router.to": "Migrate Router to",
 "label.migrate.systemvm.to": "Migrate System VM to",
 "label.migrate.to.host": "Migrate to host",
@@ -2030,6 +2033,7 @@
 "label.storage.tags": "Storage Tags",
 "label.storage.traffic": "Storage Traffic",
 "label.storageid": "Primary Storage",
+"label.storage.migration.required": "Storage Migration Required",
 "label.storagemotionenabled": "Storage Motion Enabled",
 "label.storagepolicy": "Storage policy",
 "label.storagepool": "Storage Pool",
@@ -2931,12 +2935,16 @@
 "message.migrate.instance.to.ps": "Please confirm that you want to migrate instance to another primary storage.",
 "message.migrate.router.confirm": "Please confirm the host you wish to migrate the router to:",
 "message.migrate.systemvm.confirm": "Please confirm the host you wish to migrate the system VM to:",
+"message.migrate.lb.vm.to.ps": "Please confirm that you want to migrate LB VM to another primary storage.",
+"message.migrate.router.to.ps": "Please confirm that you want to migrate router to another primary storage.",
+"message.migrate.system.vm.to.ps": "Please confirm that you want to migrate system VM to another primary storage.",
 "message.migrate.volume": "Please confirm that you want to migrate volume to another primary storage.",
 "message.migrate.volume.failed": "Migrating volume failed",
 "message.migrate.volume.processing": "Migrating volume...",
 "message.migrating.failed": "Migration failed",
 "message.migrating.processing": "Migration in progress for",
 "message.migrating.vm.to.host.failed": "Failed to migrate VM to host",
+"message.migrating.vm.to.storage.failed": "Failed to migrate VM to storage",
 "message.move.acl.order": "Move ACL rule order",
 "message.move.acl.order.failed": "Failed to move ACL rule",
 "message.move.acl.order.processing": "Moving ACL rule...",
diff --git a/ui/src/config/section/compute.js b/ui/src/config/section/compute.js
index 8950140..e55e516 100644
--- a/ui/src/config/section/compute.js
+++ b/ui/src/config/section/compute.js
@@ -299,16 +299,8 @@ export default {
           docHelp: 'adminguide/virtual_machines.html#moving-vms-between-hosts-manual-live-migration',
           dataView: true,
           show: (record, store) => { return ['Stopped'].includes(record.state) && ['Admin'].includes(store.userInfo.roletype) },
-          args: ['storageid', 'virtualmachineid'],
-          mapping: {
-            storageid: {
-              api: 'listStoragePools',
-              params: (record) => { return { zoneid: record.zoneid } }
-            },
-            virtualmachineid: {
-              value: (record) => { return record.id }
-            }
-          }
+          component: () => import('@/views/compute/MigrateVMStorage'),
+          popup: true
         },
         {
           api: 'resetPasswordForVirtualMachine',
diff --git a/ui/src/config/section/infra/ilbvms.js b/ui/src/config/section/infra/ilbvms.js
index 8b2434e..393a769 100644
--- a/ui/src/config/section/infra/ilbvms.js
+++ b/ui/src/config/section/infra/ilbvms.js
@@ -45,13 +45,18 @@ export default {
       icon: 'drag',
       label: 'label.action.migrate.router',
       dataView: true,
-      show: (record) => { return record.state === 'Running' },
-      args: ['virtualmachineid', 'hostid'],
-      mapping: {
-        virtualmachineid: {
-          value: (record) => { return record.id }
-        }
-      }
+      show: (record, store) => { return record.state === 'Running' && ['Admin'].includes(store.userInfo.roletype) },
+      component: () => import('@/views/compute/MigrateWizard'),
+      popup: true
+    },
+    {
+      api: 'migrateSystemVm',
+      icon: 'drag',
+      label: 'label.action.migrate.systemvm.to.ps',
+      dataView: true,
+      show: (record, store) => { return ['Stopped'].includes(record.state) && ['VMware'].includes(record.hypervisor) },
+      component: () => import('@/views/compute/MigrateVMStorage'),
+      popup: true
     }
   ]
 }
diff --git a/ui/src/config/section/infra/routers.js b/ui/src/config/section/infra/routers.js
index b109914..6ffa468 100644
--- a/ui/src/config/section/infra/routers.js
+++ b/ui/src/config/section/infra/routers.js
@@ -104,17 +104,18 @@ export default {
       icon: 'drag',
       label: 'label.action.migrate.router',
       dataView: true,
-      show: (record, store) => { return ['Running'].includes(record.state) && ['Admin'].includes(store.userInfo.roletype) },
-      args: ['virtualmachineid', 'hostid'],
-      mapping: {
-        virtualmachineid: {
-          value: (record) => { return record.id }
-        },
-        hostid: {
-          api: 'findHostsForMigration',
-          params: (record) => { return { virtualmachineid: record.id } }
-        }
-      }
+      show: (record, store) => { return record.state === 'Running' && ['Admin'].includes(store.userInfo.roletype) },
+      component: () => import('@/views/compute/MigrateWizard'),
+      popup: true
+    },
+    {
+      api: 'migrateSystemVm',
+      icon: 'drag',
+      label: 'label.action.migrate.systemvm.to.ps',
+      dataView: true,
+      show: (record, store) => { return ['Stopped'].includes(record.state) && ['VMware'].includes(record.hypervisor) },
+      component: () => import('@/views/compute/MigrateVMStorage'),
+      popup: true
     },
     {
       api: 'runDiagnostics',
diff --git a/ui/src/config/section/infra/systemVms.js b/ui/src/config/section/infra/systemVms.js
index 8b3c66a..bc20b90 100644
--- a/ui/src/config/section/infra/systemVms.js
+++ b/ui/src/config/section/infra/systemVms.js
@@ -69,17 +69,18 @@ export default {
       icon: 'drag',
       label: 'label.action.migrate.systemvm',
       dataView: true,
-      show: (record) => { return record.state === 'Running' },
-      args: ['virtualmachineid', 'hostid'],
-      mapping: {
-        virtualmachineid: {
-          value: (record) => { return record.id }
-        },
-        hostid: {
-          api: 'findHostsForMigration',
-          params: (record) => { return { virtualmachineid: record.id } }
-        }
-      }
+      show: (record, store) => { return record.state === 'Running' && ['Admin'].includes(store.userInfo.roletype) },
+      component: () => import('@/views/compute/MigrateWizard'),
+      popup: true
+    },
+    {
+      api: 'migrateSystemVm',
+      icon: 'drag',
+      label: 'label.action.migrate.systemvm.to.ps',
+      dataView: true,
+      show: (record, store) => { return ['Stopped'].includes(record.state) && ['VMware'].includes(record.hypervisor) },
+      component: () => import('@/views/compute/MigrateVMStorage'),
+      popup: true
     },
     {
       api: 'runDiagnostics',
diff --git a/ui/src/views/compute/MigrateVMStorage.vue b/ui/src/views/compute/MigrateVMStorage.vue
new file mode 100644
index 0000000..7744c57
--- /dev/null
+++ b/ui/src/views/compute/MigrateVMStorage.vue
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+<template>
+  <div class="form-layout">
+    <a-spin :spinning="loading">
+      <a-form
+        :form="form"
+        @submit="handleSubmit"
+        layout="vertical">
+        <a-form-item>
+          <span slot="label">
+            {{ $t('label.storageid') }}
+            <a-tooltip :title="apiParams.storageid.description" v-if="!(apiParams.hostid && apiParams.hostid.required === false)">
+              <a-icon type="info-circle" style="color: rgba(0,0,0,.45)" />
+            </a-tooltip>
+          </span>
+          <a-select
+            :loading="loading"
+            v-decorator="['storageid', {
+              rules: [{ required: true, message: `${this.$t('message.error.required.input')}` }]
+            }]">
+            <a-select-option v-for="storagePool in storagePools" :key="storagePool.id">
+              {{ storagePool.name || storagePool.id }}
+            </a-select-option>
+          </a-select>
+        </a-form-item>
+
+        <div :span="24" class="action-button">
+          <a-button @click="closeAction">{{ this.$t('label.cancel') }}</a-button>
+          <a-button :loading="loading" type="primary" @click="handleSubmit">{{ this.$t('label.ok') }}</a-button>
+        </div>
+      </a-form>
+    </a-spin>
+  </div>
+</template>
+
+<script>
+import { api } from '@/api'
+
+export default {
+  name: 'MigrateVMStorage',
+  props: {
+    resource: {
+      type: Object,
+      required: true
+    }
+  },
+  data () {
+    return {
+      loading: false,
+      storagePools: []
+    }
+  },
+  beforeCreate () {
+    this.form = this.$form.createForm(this)
+    this.apiParams = {}
+    if (this.$route.meta.name === 'vm') {
+      this.apiConfig = this.$store.getters.apis.migrateVirtualMachineWithVolume || {}
+      this.apiConfig.params.forEach(param => {
+        this.apiParams[param.name] = param
+      })
+      this.apiConfig = this.$store.getters.apis.migrateVirtualMachine || {}
+      this.apiConfig.params.forEach(param => {
+        if (!(param.name in this.apiParams)) {
+          this.apiParams[param.name] = param
+        }
+      })
+    } else {
+      this.apiConfig = this.$store.getters.apis.migrateSystemVm || {}
+      this.apiConfig.params.forEach(param => {
+        if (!(param.name in this.apiParams)) {
+          this.apiParams[param.name] = param
+        }
+      })
+    }
+  },
+  created () {
+  },
+  mounted () {
+    this.fetchData()
+  },
+  methods: {
+    fetchData () {
+      this.loading = true
+      api('listStoragePools', {
+        zoneid: this.resource.zoneid
+      }).then(response => {
+        if (this.arrayHasItems(response.liststoragepoolsresponse.storagepool)) {
+          this.storagePools = response.liststoragepoolsresponse.storagepool
+        }
+      }).finally(() => {
+        this.loading = false
+      })
+    },
+    isValidValueForKey (obj, key) {
+      return key in obj && obj[key] != null
+    },
+    arrayHasItems (array) {
+      return array !== null && array !== undefined && Array.isArray(array) && array.length > 0
+    },
+    isObjectEmpty (obj) {
+      return !(obj !== null && obj !== undefined && Object.keys(obj).length > 0 && obj.constructor === Object)
+    },
+    handleSubmit (e) {
+      e.preventDefault()
+      this.form.validateFields((err, values) => {
+        if (err) {
+          return
+        }
+        this.loading = true
+        var isUserVm = true
+        if (this.$route.meta.name !== 'vm') {
+          isUserVm = false
+        }
+        var migrateApi = isUserVm ? 'migrateVirtualMachine' : 'migrateSystemVm'
+        if (isUserVm && this.apiParams.hostid && this.apiParams.hostid.required === false) {
+          migrateApi = 'migrateVirtualMachineWithVolume'
+          var rootVolume = null
+          api('listVolumes', {
+            listAll: true,
+            virtualmachineid: this.resource.id
+          }).then(response => {
+            var volumes = response.listvolumesresponse.volume
+            if (volumes && volumes.length > 0) {
+              volumes = volumes.filter(item => item.type === 'ROOT')
+              if (volumes && volumes.length > 0) {
+                rootVolume = volumes[0]
+              }
+              if (rootVolume == null) {
+                this.$message.error('Failed to find ROOT volume for the VM ' + this.resource.id)
+                this.closeAction()
+              }
+              this.migrateVm(migrateApi, values.storageid, rootVolume.id)
+            }
+          })
+          return
+        }
+        this.migrateVm(migrateApi, values.storageid, null)
+      })
+    },
+    migrateVm (migrateApi, storageId, rootVolumeId) {
+      var params = {
+        virtualmachineid: this.resource.id,
+        storageid: storageId
+      }
+      if (rootVolumeId !== null) {
+        params = {
+          virtualmachineid: this.resource.id,
+          'migrateto[0].volume': rootVolumeId,
+          'migrateto[0].pool': storageId
+        }
+      }
+      api(migrateApi, params).then(response => {
+        var jobId = ''
+        if (migrateApi === 'migrateVirtualMachineWithVolume') {
+          jobId = response.migratevirtualmachinewithvolumeresponse.jobid
+        } else if (migrateApi === 'migrateSystemVm') {
+          jobId = response.migratesystemvmresponse.jobid
+        } else {
+          jobId = response.migratevirtualmachine.jobid
+        }
+        this.$store.dispatch('AddAsyncJob', {
+          title: `${this.$t('label.migrating')} ${this.resource.name}`,
+          jobid: jobId,
+          description: this.resource.name,
+          status: 'progress'
+        })
+        this.$pollJob({
+          jobId: jobId,
+          successMessage: `${this.$t('message.success.migrating')} ${this.resource.name}`,
+          successMethod: () => {
+            this.$parent.$parent.close()
+          },
+          errorMessage: this.$t('message.migrating.failed'),
+          errorMethod: () => {
+            this.$parent.$parent.close()
+          },
+          loadingMessage: `${this.$t('message.migrating.processing')} ${this.resource.name}`,
+          catchMessage: this.$t('error.fetching.async.job.result'),
+          catchMethod: () => {
+            this.$parent.$parent.close()
+          }
+        })
+        this.$parent.$parent.close()
+      }).catch(error => {
+        console.error(error)
+        this.$message.error(`${this.$t('message.migrating.vm.to.storage.failed')} ${storageId}`)
+      })
+    },
+    closeAction () {
+      this.$emit('close-action')
+    }
+  }
+}
+</script>
+
+<style scoped lang="less">
+  .form-layout {
+    width: 60vw;
+
+    @media (min-width: 500px) {
+      width: 450px;
+    }
+  }
+
+  .action-button {
+    text-align: right;
+
+    button {
+      margin-right: 5px;
+    }
+  }
+</style>
diff --git a/ui/src/views/compute/MigrateWizard.vue b/ui/src/views/compute/MigrateWizard.vue
index 46346e9..f5082e9 100644
--- a/ui/src/views/compute/MigrateWizard.vue
+++ b/ui/src/views/compute/MigrateWizard.vue
@@ -47,6 +47,15 @@
       <div slot="memused" slot-scope="record">
         {{ record.memoryused | byteToGigabyte }} GB
       </div>
+      <div slot="cluster" slot-scope="record">
+        {{ record.clustername }}
+      </div>
+      <div slot="pod" slot-scope="record">
+        {{ record.podname }}
+      </div>
+      <div slot="requiresstoragemigration" slot-scope="record">
+        {{ record.requiresStorageMotion ? $t('label.yes') : $t('label.no') }}
+      </div>
       <template slot="select" slot-scope="record">
         <a-radio
           class="host-item__radio"
@@ -118,6 +127,18 @@ export default {
           scopedSlots: { customRender: 'memused' }
         },
         {
+          title: this.$t('label.cluster'),
+          scopedSlots: { customRender: 'cluster' }
+        },
+        {
+          title: this.$t('label.pod'),
+          scopedSlots: { customRender: 'pod' }
+        },
+        {
+          title: this.$t('label.storage.migration.required'),
+          scopedSlots: { customRender: 'requiresstoragemigration' }
+        },
+        {
           title: this.$t('label.select'),
           scopedSlots: { customRender: 'select' }
         }
@@ -149,19 +170,28 @@ export default {
     },
     submitForm () {
       this.loading = true
-      api(this.selectedHost.requiresStorageMotion ? 'migrateVirtualMachineWithVolume' : 'migrateVirtualMachine', {
+      var isUserVm = true
+      if (this.$route.meta.name !== 'vm') {
+        isUserVm = false
+      }
+      var migrateApi = isUserVm
+        ? this.selectedHost.requiresStorageMotion ? 'migrateVirtualMachineWithVolume' : 'migrateVirtualMachine'
+        : 'migrateSystemVm'
+      api(migrateApi, {
         hostid: this.selectedHost.id,
         virtualmachineid: this.resource.id
       }).then(response => {
-        const jobid = this.selectedHost.requiresStorageMotion ? response.migratevirtualmachinewithvolumeresponse.jobid : response.migratevirtualmachineresponse.jobid
+        var migrateResponse = isUserVm
+          ? this.selectedHost.requiresStorageMotion ? response.migratevirtualmachinewithvolumeresponse : response.migratevirtualmachineresponse
+          : response.migratesystemvmresponse
         this.$store.dispatch('AddAsyncJob', {
           title: `${this.$t('label.migrating')} ${this.resource.name}`,
-          jobid: jobid,
+          jobid: migrateResponse.jobid,
           description: this.resource.name,
           status: 'progress'
         })
         this.$pollJob({
-          jobId: jobid,
+          jobId: migrateResponse.jobid,
           successMessage: `${this.$t('message.success.migrating')} ${this.resource.name}`,
           successMethod: () => {
             this.$emit('close-action')
@@ -209,9 +239,9 @@ export default {
 <style scoped lang="scss">
 
   .form {
-    width: 85vw;
-    @media (min-width: 800px) {
-      width: 750px;
+    width: 95vw;
+    @media (min-width: 900px) {
+      width: 850px;
     }
   }
 
diff --git a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/DatastoreMO.java b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/DatastoreMO.java
index 804af62..432a1de 100644
--- a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/DatastoreMO.java
+++ b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/DatastoreMO.java
@@ -16,6 +16,13 @@
 // under the License.
 package com.cloud.hypervisor.vmware.mo;
 
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
 import com.cloud.exception.CloudException;
 import com.cloud.hypervisor.vmware.util.VmwareContext;
 import com.cloud.utils.Pair;
@@ -34,12 +41,6 @@ import com.vmware.vim25.PropertyFilterSpec;
 import com.vmware.vim25.PropertySpec;
 import com.vmware.vim25.SelectionSpec;
 import com.vmware.vim25.TraversalSpec;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
 
 public class DatastoreMO extends BaseMO {
     private static final Logger s_logger = Logger.getLogger(DatastoreMO.class);
@@ -459,6 +460,6 @@ public class DatastoreMO extends BaseMO {
 
     public String getDatastoreType() throws Exception {
         DatastoreSummary summary = _context.getVimClient().getDynamicProperty(getMor(), "summary");
-        return summary.getType();
+        return summary.getType() == null ? "" : summary.getType();
     }
 }
diff --git a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/HypervisorHostHelper.java b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/HypervisorHostHelper.java
index 1b94ca8..7774645 100644
--- a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/HypervisorHostHelper.java
+++ b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/HypervisorHostHelper.java
@@ -18,6 +18,7 @@ package com.cloud.hypervisor.vmware.mo;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.net.URI;
@@ -28,6 +29,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
@@ -37,17 +39,6 @@ import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
 
-import com.vmware.vim25.ConcurrentAccessFaultMsg;
-import com.vmware.vim25.DuplicateNameFaultMsg;
-import com.vmware.vim25.FileFaultFaultMsg;
-import com.vmware.vim25.InsufficientResourcesFaultFaultMsg;
-import com.vmware.vim25.InvalidDatastoreFaultMsg;
-import com.vmware.vim25.InvalidNameFaultMsg;
-import com.vmware.vim25.InvalidStateFaultMsg;
-import com.vmware.vim25.OutOfBoundsFaultMsg;
-import com.vmware.vim25.RuntimeFaultFaultMsg;
-import com.vmware.vim25.TaskInProgressFaultMsg;
-import com.vmware.vim25.VmConfigFaultFaultMsg;
 import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -80,19 +71,20 @@ import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.net.NetUtils;
 import com.cloud.utils.nicira.nvp.plugin.NiciraNvpApiVersion;
-import com.vmware.vim25.OvfCreateDescriptorParams;
-import com.vmware.vim25.OvfCreateDescriptorResult;
 import com.vmware.vim25.AlreadyExistsFaultMsg;
 import com.vmware.vim25.BoolPolicy;
-import com.vmware.vim25.CustomFieldStringValue;
 import com.vmware.vim25.ClusterConfigInfoEx;
-import com.vmware.vim25.DatacenterConfigInfo;
+import com.vmware.vim25.ConcurrentAccessFaultMsg;
+import com.vmware.vim25.CustomFieldStringValue;
 import com.vmware.vim25.DVPortSetting;
 import com.vmware.vim25.DVPortgroupConfigInfo;
 import com.vmware.vim25.DVPortgroupConfigSpec;
 import com.vmware.vim25.DVSSecurityPolicy;
 import com.vmware.vim25.DVSTrafficShapingPolicy;
+import com.vmware.vim25.DatacenterConfigInfo;
+import com.vmware.vim25.DuplicateNameFaultMsg;
 import com.vmware.vim25.DynamicProperty;
+import com.vmware.vim25.FileFaultFaultMsg;
 import com.vmware.vim25.HostNetworkSecurityPolicy;
 import com.vmware.vim25.HostNetworkTrafficShapingPolicy;
 import com.vmware.vim25.HostPortGroup;
@@ -101,6 +93,10 @@ import com.vmware.vim25.HostVirtualSwitch;
 import com.vmware.vim25.HttpNfcLeaseDeviceUrl;
 import com.vmware.vim25.HttpNfcLeaseInfo;
 import com.vmware.vim25.HttpNfcLeaseState;
+import com.vmware.vim25.InsufficientResourcesFaultFaultMsg;
+import com.vmware.vim25.InvalidDatastoreFaultMsg;
+import com.vmware.vim25.InvalidNameFaultMsg;
+import com.vmware.vim25.InvalidStateFaultMsg;
 import com.vmware.vim25.LocalizedMethodFault;
 import com.vmware.vim25.LongPolicy;
 import com.vmware.vim25.ManagedObjectReference;
@@ -108,11 +104,16 @@ import com.vmware.vim25.MethodFault;
 import com.vmware.vim25.NumericRange;
 import com.vmware.vim25.ObjectContent;
 import com.vmware.vim25.OptionValue;
+import com.vmware.vim25.OutOfBoundsFaultMsg;
+import com.vmware.vim25.OvfCreateDescriptorParams;
+import com.vmware.vim25.OvfCreateDescriptorResult;
 import com.vmware.vim25.OvfCreateImportSpecParams;
 import com.vmware.vim25.OvfCreateImportSpecResult;
-import com.vmware.vim25.OvfFileItem;
 import com.vmware.vim25.OvfFile;
+import com.vmware.vim25.OvfFileItem;
 import com.vmware.vim25.ParaVirtualSCSIController;
+import com.vmware.vim25.RuntimeFaultFaultMsg;
+import com.vmware.vim25.TaskInProgressFaultMsg;
 import com.vmware.vim25.VMwareDVSConfigSpec;
 import com.vmware.vim25.VMwareDVSPortSetting;
 import com.vmware.vim25.VMwareDVSPortgroupPolicy;
@@ -121,25 +122,24 @@ import com.vmware.vim25.VMwareDVSPvlanMapEntry;
 import com.vmware.vim25.VirtualBusLogicController;
 import com.vmware.vim25.VirtualController;
 import com.vmware.vim25.VirtualDevice;
-import com.vmware.vim25.VirtualDisk;
 import com.vmware.vim25.VirtualDeviceConfigSpec;
 import com.vmware.vim25.VirtualDeviceConfigSpecOperation;
+import com.vmware.vim25.VirtualDisk;
 import com.vmware.vim25.VirtualIDEController;
 import com.vmware.vim25.VirtualLsiLogicController;
 import com.vmware.vim25.VirtualLsiLogicSASController;
 import com.vmware.vim25.VirtualMachineConfigSpec;
 import com.vmware.vim25.VirtualMachineFileInfo;
 import com.vmware.vim25.VirtualMachineGuestOsIdentifier;
+import com.vmware.vim25.VirtualMachineImportSpec;
 import com.vmware.vim25.VirtualMachineVideoCard;
 import com.vmware.vim25.VirtualSCSIController;
 import com.vmware.vim25.VirtualSCSISharing;
-import com.vmware.vim25.VirtualMachineImportSpec;
+import com.vmware.vim25.VmConfigFaultFaultMsg;
 import com.vmware.vim25.VmwareDistributedVirtualSwitchPvlanSpec;
 import com.vmware.vim25.VmwareDistributedVirtualSwitchTrunkVlanSpec;
 import com.vmware.vim25.VmwareDistributedVirtualSwitchVlanIdSpec;
 import com.vmware.vim25.VmwareDistributedVirtualSwitchVlanSpec;
-import java.io.FileWriter;
-import java.util.UUID;
 
 public class HypervisorHostHelper {
     private static final Logger s_logger = Logger.getLogger(HypervisorHostHelper.class);
@@ -153,6 +153,48 @@ public class HypervisorHostHelper {
     public static final String VSPHERE_DATASTORE_BASE_FOLDER = "fcd";
     public static final String VSPHERE_DATASTORE_HIDDEN_FOLDER = ".hidden";
 
+    protected final static Map<String, Integer> apiVersionHardwareVersionMap;
+
+    static {
+        apiVersionHardwareVersionMap = new HashMap<String, Integer>();
+        apiVersionHardwareVersionMap.put("3.5", 4);
+        apiVersionHardwareVersionMap.put("3.6", 4);
+        apiVersionHardwareVersionMap.put("3.7", 4);
+        apiVersionHardwareVersionMap.put("3.8", 4);
+        apiVersionHardwareVersionMap.put("3.9", 4);
+        apiVersionHardwareVersionMap.put("4.0", 7);
+        apiVersionHardwareVersionMap.put("4.1", 7);
+        apiVersionHardwareVersionMap.put("4.2", 7);
+        apiVersionHardwareVersionMap.put("4.3", 7);
+        apiVersionHardwareVersionMap.put("4.4", 7);
+        apiVersionHardwareVersionMap.put("4.5", 7);
+        apiVersionHardwareVersionMap.put("4.6", 7);
+        apiVersionHardwareVersionMap.put("4.7", 7);
+        apiVersionHardwareVersionMap.put("4.8", 7);
+        apiVersionHardwareVersionMap.put("4.9", 7);
+        apiVersionHardwareVersionMap.put("5.0", 8);
+        apiVersionHardwareVersionMap.put("5.1", 9);
+        apiVersionHardwareVersionMap.put("5.2", 9);
+        apiVersionHardwareVersionMap.put("5.3", 9);
+        apiVersionHardwareVersionMap.put("5.4", 9);
+        apiVersionHardwareVersionMap.put("5.5", 10);
+        apiVersionHardwareVersionMap.put("5.6", 10);
+        apiVersionHardwareVersionMap.put("5.7", 10);
+        apiVersionHardwareVersionMap.put("5.8", 10);
+        apiVersionHardwareVersionMap.put("5.9", 10);
+        apiVersionHardwareVersionMap.put("6.0", 11);
+        apiVersionHardwareVersionMap.put("6.1", 11);
+        apiVersionHardwareVersionMap.put("6.2", 11);
+        apiVersionHardwareVersionMap.put("6.3", 11);
+        apiVersionHardwareVersionMap.put("6.4", 11);
+        apiVersionHardwareVersionMap.put("6.5", 13);
+        apiVersionHardwareVersionMap.put("6.6", 13);
+        apiVersionHardwareVersionMap.put("6.7", 14);
+        apiVersionHardwareVersionMap.put("6.8", 14);
+        apiVersionHardwareVersionMap.put("6.9", 14);
+        apiVersionHardwareVersionMap.put("7.0", 17);
+    }
+
     public static VirtualMachineMO findVmFromObjectContent(VmwareContext context, ObjectContent[] ocs, String name, String instanceNameCustomField) {
 
         if (ocs != null && ocs.length > 0) {
@@ -2211,4 +2253,18 @@ public class HypervisorHostHelper {
             dsMo.makeDirectory(hiddenFolderPath, hyperHost.getHyperHostDatacenter());
         }
     }
+
+    public static Integer getHostHardwareVersion(VmwareHypervisorHost host) {
+        Integer version = null;
+        HostMO hostMo = new HostMO(host.getContext(), host.getMor());
+        String hostApiVersion = "";
+        try {
+            hostApiVersion = hostMo.getHostAboutInfo().getApiVersion();
+        } catch (Exception ignored) {}
+        if (hostApiVersion == null) {
+            hostApiVersion = "";
+        }
+        version = apiVersionHardwareVersionMap.get(hostApiVersion);
+        return version;
+    }
 }
diff --git a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/util/VmwareHelper.java b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/util/VmwareHelper.java
index 424027b..ff0120c 100644
--- a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/util/VmwareHelper.java
+++ b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/util/VmwareHelper.java
@@ -744,4 +744,18 @@ public class VmwareHelper {
         return DatatypeFactory.newInstance().newXMLGregorianCalendar(gregorianCalendar);
     }
 
+    public static HostMO getHostMOFromHostName(final VmwareContext context, final String hostName) {
+        HostMO host = null;
+        if (com.cloud.utils.StringUtils.isNotBlank(hostName) && hostName.contains("@")) {
+            String hostMorInfo = hostName.split("@")[0];
+            if (hostMorInfo.contains(":")) {
+                ManagedObjectReference morHost = new ManagedObjectReference();
+                morHost.setType(hostMorInfo.split(":")[0]);
+                morHost.setValue(hostMorInfo.split(":")[1]);
+                host = new HostMO(context, morHost);
+            }
+        }
+        return host;
+    }
+
 }