You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by nv...@apache.org on 2022/04/12 11:14:33 UTC

[cloudstack] branch main updated: KVM disk-only based snapshot of volumes instead of taking VM's full snapshot and extracting disks (#5297)

This is an automated email from the ASF dual-hosted git repository.

nvazquez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 39fad2d9d75 KVM disk-only based snapshot of volumes instead of taking VM's full snapshot and extracting disks (#5297)
39fad2d9d75 is described below

commit 39fad2d9d7501bdd695aeaf8cb1bf7642efde386
Author: Daniel Augusto Veronezi Salvador <38...@users.noreply.github.com>
AuthorDate: Tue Apr 12 08:14:27 2022 -0300

    KVM disk-only based snapshot of volumes instead of taking VM's full snapshot and extracting disks (#5297)
    
    * Refactor create volume snapshot with running VM
    
    * Refactor create volume snapshot with stopped VM
    
    * Refactor create volume from snapshot
    
    * Refactor create template from snapshot
    
    * Refactor volume migration (migrateVolume/ migrateVirtualMachineWithVolume)
    
    * Refactor snapshot deletion
    
    * Refactor snapshot revertion
    
    * Adjusts and fix cherry-pick conflicts
    
    * Remove diffuse tests
    
    * Add validation to add flag '--delete' on command 'virsh blockcommand' only if libvirt version is equal or higher 6.0.0
    
    * Expunge temporary snapshot only if template creation is from snapshot
    
    * Extract strings to constant
    
    * Remove unused imports
    
    * Fix error on revert backed up snapshot
    
    * Turn method's return to void as it is not used
    
    * Rename method in SnapshotHelper
    
    * Fix folder creation when using SharedMountPoint pool
    
    * Remove static import
    
    * Remove unnused method
    
    * Cover take snapshot in centos 7
    
    * Handle right snapshot flag according to qemu version
    
    Co-authored-by: GutoVeronezi <da...@scclouds.com.br>
---
 .../storage/command/RevertSnapshotCommand.java     |   8 +-
 .../subsystem/api/storage/SnapshotDataFactory.java |   2 +
 .../engine/orchestration/VolumeOrchestrator.java   |  86 +---
 .../cloud/vm/VirtualMachineManagerImplTest.java    |   7 +-
 .../main/java/com/cloud/storage/SnapshotVO.java    |  10 +
 .../java/com/cloud/storage/dao/SnapshotDao.java    |   6 +
 .../com/cloud/storage/dao/SnapshotDaoImpl.java     |  11 +
 .../storage/datastore/db/SnapshotDataStoreDao.java |  11 +
 .../storage/motion/AncientDataMotionStrategy.java  |   3 +
 .../storage/snapshot/DefaultSnapshotStrategy.java  | 221 +++++-----
 .../storage/snapshot/SnapshotDataFactoryImpl.java  |  10 +-
 .../snapshot/DefaultSnapshotStrategyTest.java      | 162 ++++++++
 .../storage/image/db/SnapshotDataStoreDaoImpl.java |  47 ++-
 .../image/db/SnapshotDataStoreDaoImplTest.java     |  77 ++++
 .../LibvirtRevertSnapshotCommandWrapper.java       | 123 +++++-
 .../resource/wrapper/LibvirtUtilitiesHelper.java   |  35 ++
 .../kvm/storage/IscsiAdmStorageAdaptor.java        |  10 +-
 .../kvm/storage/IscsiAdmStoragePool.java           |   8 +
 .../kvm/storage/KVMStoragePoolManager.java         |   5 -
 .../kvm/storage/KVMStorageProcessor.java           | 453 +++++++++++++++++----
 .../kvm/storage/LibvirtStorageAdaptor.java         |  45 +-
 .../hypervisor/kvm/storage/LibvirtStoragePool.java |   9 +-
 .../kvm/storage/LinstorStorageAdaptor.java         |  20 +-
 .../kvm/storage/ManagedNfsStorageAdaptor.java      |  10 +-
 .../kvm/storage/ScaleIOStorageAdaptor.java         |  11 +-
 .../hypervisor/kvm/storage/StorageAdaptor.java     |   4 +-
 .../org/apache/cloudstack/utils/qemu/QemuImg.java  |  45 +-
 .../LibvirtRevertSnapshotCommandWrapperTest.java   | 170 ++++++++
 .../wrapper/LibvirtUtilitiesHelperTest.java        |  71 ++--
 .../kvm/storage/KVMStorageProcessorTest.java       | 265 +++++++++++-
 .../CloudStackPrimaryDataStoreDriverImpl.java      |   3 +-
 scripts/storage/qcow2/managesnapshot.sh            |  28 +-
 .../com/cloud/storage/VolumeApiServiceImpl.java    |  13 +
 .../storage/snapshot/SnapshotManagerImpl.java      |  43 +-
 .../com/cloud/template/TemplateManagerImpl.java    |  37 +-
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  |  14 +
 .../apache/cloudstack/snapshot/SnapshotHelper.java | 255 ++++++++++++
 .../core/spring-server-core-managers-context.xml   |   2 +
 .../storage/snapshot/SnapshotManagerTest.java      |   7 +
 .../cloud/template/TemplateManagerImplTest.java    |  12 +
 .../cloudstack/snapshot/SnapshotHelperTest.java    | 302 ++++++++++++++
 41 files changed, 2167 insertions(+), 494 deletions(-)

diff --git a/core/src/main/java/org/apache/cloudstack/storage/command/RevertSnapshotCommand.java b/core/src/main/java/org/apache/cloudstack/storage/command/RevertSnapshotCommand.java
index 1a4403b3baf..174302252a5 100644
--- a/core/src/main/java/org/apache/cloudstack/storage/command/RevertSnapshotCommand.java
+++ b/core/src/main/java/org/apache/cloudstack/storage/command/RevertSnapshotCommand.java
@@ -22,11 +22,13 @@ import org.apache.cloudstack.storage.to.SnapshotObjectTO;
 
 public final class RevertSnapshotCommand extends StorageSubSystemCommand {
     private SnapshotObjectTO data;
+    private SnapshotObjectTO dataOnPrimaryStorage;
     private boolean _executeInSequence = false;
 
-    public RevertSnapshotCommand(SnapshotObjectTO data) {
+    public RevertSnapshotCommand(SnapshotObjectTO data, SnapshotObjectTO dataOnPrimaryStorage) {
         super();
         this.data = data;
+        this.dataOnPrimaryStorage = dataOnPrimaryStorage;
     }
 
     protected RevertSnapshotCommand() {
@@ -37,6 +39,10 @@ public final class RevertSnapshotCommand extends StorageSubSystemCommand {
         return this.data;
     }
 
+    public SnapshotObjectTO getDataOnPrimaryStorage() {
+        return dataOnPrimaryStorage;
+    }
+
     @Override
     public void setExecuteInSequence(final boolean executeInSequence) {
         _executeInSequence = executeInSequence;
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java
index 6bbeb85d5f9..86f0ab8bcf2 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotDataFactory.java
@@ -29,6 +29,8 @@ public interface SnapshotDataFactory {
 
     SnapshotInfo getSnapshot(long snapshotId, DataStoreRole role);
 
+    SnapshotInfo getSnapshot(long snapshotId, DataStoreRole role, boolean retrieveAnySnapshotFromVolume);
+
     List<SnapshotInfo> getSnapshots(long volumeId, DataStoreRole store);
 
     List<SnapshotInfo> listSnapshotOnCache(long snapshotId);
diff --git a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
index 43f3e82be83..3848a2b9779 100644
--- a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
+++ b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
@@ -44,7 +44,6 @@ import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationSer
 import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreDriver;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStore;
@@ -52,8 +51,6 @@ import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreDriver
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy.SnapshotOperation;
 import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
 import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
@@ -74,7 +71,6 @@ import org.apache.cloudstack.resourcedetail.dao.DiskOfferingDetailsDao;
 import org.apache.cloudstack.storage.command.CommandResult;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
-import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
@@ -115,7 +111,6 @@ import com.cloud.storage.ScopeType;
 import com.cloud.storage.Snapshot;
 import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.ImageFormat;
-import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.VMTemplateStorageResourceAssoc;
@@ -165,6 +160,8 @@ import com.cloud.vm.dao.UserVmCloneSettingDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.UserVmDetailsDao;
 
+import org.apache.cloudstack.snapshot.SnapshotHelper;
+
 public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrationService, Configurable {
 
     public enum UserVmCloneType {
@@ -234,9 +231,14 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
     @Inject
     VolumeApiService _volumeApiService;
 
+    @Inject
+    protected SnapshotHelper snapshotHelper;
+
     private final StateMachine2<Volume.State, Volume.Event, Volume> _volStateMachine;
     protected List<StoragePoolAllocator> _storagePoolAllocators;
 
+    protected boolean backupSnapshotAfterTakingSnapshot = SnapshotManager.BackupSnapshotAfterTakingSnapshot.value();
+
     public List<StoragePoolAllocator> getStoragePoolAllocators() {
         return _storagePoolAllocators;
     }
@@ -516,21 +518,20 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
 
         VolumeInfo vol = volFactory.getVolume(volume.getId());
         DataStore store = dataStoreMgr.getDataStore(pool.getId(), DataStoreRole.Primary);
-        DataStoreRole dataStoreRole = getDataStoreRole(snapshot);
+        DataStoreRole dataStoreRole = snapshotHelper.getDataStoreRole(snapshot);
         SnapshotInfo snapInfo = snapshotFactory.getSnapshot(snapshot.getId(), dataStoreRole);
 
-        if (snapInfo == null && dataStoreRole == DataStoreRole.Image) {
-            // snapshot is not backed up to secondary, let's do that now.
-            snapInfo = snapshotFactory.getSnapshot(snapshot.getId(), DataStoreRole.Primary);
+        boolean kvmSnapshotOnlyInPrimaryStorage = snapshotHelper.isKvmSnapshotOnlyInPrimaryStorage(snapshot, dataStoreRole);
 
-            if (snapInfo == null) {
-                throw new CloudRuntimeException("Cannot find snapshot " + snapshot.getId());
-            }
-
-            snapInfo = backupSnapshotIfNeeded(snapshot, dataStoreRole, snapInfo);
+        try {
+            snapInfo = snapshotHelper.backupSnapshotToSecondaryStorageIfNotExists(snapInfo, dataStoreRole, snapshot, kvmSnapshotOnlyInPrimaryStorage);
+        } catch (CloudRuntimeException e) {
+            snapshotHelper.expungeTemporarySnapshot(kvmSnapshotOnlyInPrimaryStorage, snapInfo);
+            throw e;
         }
+
         // don't try to perform a sync if the DataStoreRole of the snapshot is equal to DataStoreRole.Primary
-        if (!DataStoreRole.Primary.equals(snapInfo.getDataStore().getRole())) {
+        if (!DataStoreRole.Primary.equals(dataStoreRole) || kvmSnapshotOnlyInPrimaryStorage) {
             try {
                 // sync snapshot to region store if necessary
                 DataStore snapStore = snapInfo.getDataStore();
@@ -552,57 +553,14 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
                 throw new CloudRuntimeException("Failed to create volume from snapshot:" + result.getResult());
             }
             return result.getVolume();
-        } catch (InterruptedException e) {
-            s_logger.debug("Failed to create volume from snapshot", e);
-            throw new CloudRuntimeException("Failed to create volume from snapshot", e);
-        } catch (ExecutionException e) {
-            s_logger.debug("Failed to create volume from snapshot", e);
-            throw new CloudRuntimeException("Failed to create volume from snapshot", e);
-        }
-
-    }
-
-    private SnapshotInfo backupSnapshotIfNeeded(Snapshot snapshot, DataStoreRole dataStoreRole, SnapshotInfo snapInfo) {
-        boolean backupSnapToSecondary = SnapshotManager.BackupSnapshotAfterTakingSnapshot.value() == null || SnapshotManager.BackupSnapshotAfterTakingSnapshot.value();
-
-        StoragePoolVO srcPool = _storagePoolDao.findById(snapInfo.getBaseVolume().getPoolId());
-        // We need to copy the snapshot onto secondary.
-        //Skipping the backup to secondary storage with NFS/FS could be supported when CLOUDSTACK-5297 is accepted with small enhancement in:
-        //KVMStorageProcessor::createVolumeFromSnapshot and CloudStackPrimaryDataStoreDriverImpl::copyAsync/createAsync
-        if ((!backupSnapToSecondary && (StoragePoolType.NetworkFilesystem.equals(srcPool.getPoolType()) || StoragePoolType.Filesystem.equals(srcPool.getPoolType())))) {
-            SnapshotStrategy snapshotStrategy = _storageStrategyFactory.getSnapshotStrategy(snapshot, SnapshotOperation.BACKUP);
-            snapshotStrategy.backupSnapshot(snapInfo);
-            // Attempt to grab it again.
-            snapInfo = snapshotFactory.getSnapshot(snapshot.getId(), dataStoreRole);
-            if (snapInfo == null) {
-                throw new CloudRuntimeException("Cannot find snapshot " + snapshot.getId() + " on secondary and could not create backup");
-            }
-        }
-        return snapInfo;
-    }
-
-    public DataStoreRole getDataStoreRole(Snapshot snapshot) {
-        SnapshotDataStoreVO snapshotStore = _snapshotDataStoreDao.findBySnapshot(snapshot.getId(), DataStoreRole.Primary);
-
-        if (snapshotStore == null) {
-            return DataStoreRole.Image;
-        }
-
-        long storagePoolId = snapshotStore.getDataStoreId();
-        DataStore dataStore = dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
-
-        Map<String, String> mapCapabilities = dataStore.getDriver().getCapabilities();
-
-        if (mapCapabilities != null) {
-            String value = mapCapabilities.get(DataStoreCapabilities.STORAGE_SYSTEM_SNAPSHOT.toString());
-            Boolean supportsStorageSystemSnapshots = new Boolean(value);
-
-            if (supportsStorageSystemSnapshots) {
-                return DataStoreRole.Primary;
-            }
+        } catch (InterruptedException | ExecutionException e) {
+            String message = String.format("Failed to create volume from snapshot [%s] due to [%s].", snapInfo.getTO(), e.getMessage());
+            s_logger.error(message, e);
+            throw new CloudRuntimeException(message, e);
+        } finally {
+            snapshotHelper.expungeTemporarySnapshot(kvmSnapshotOnlyInPrimaryStorage, snapInfo);
         }
 
-        return DataStoreRole.Image;
     }
 
     protected DiskProfile createDiskCharacteristics(VolumeInfo volume, VirtualMachineTemplate template, DataCenter dc, DiskOffering diskOffering) {
diff --git a/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java b/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java
index 1655bc4634e..86b0ea4a96d 100644
--- a/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java
+++ b/engine/orchestration/src/test/java/com/cloud/vm/VirtualMachineManagerImplTest.java
@@ -256,6 +256,7 @@ public class VirtualMachineManagerImplTest {
 
     @Test
     public void isStorageCrossClusterMigrationTestStorageTypeEqualsZone() {
+        Mockito.doReturn(1L).when(hostMock).getClusterId();
         Mockito.doReturn(2L).when(storagePoolVoMock).getClusterId();
         Mockito.doReturn(ScopeType.ZONE).when(storagePoolVoMock).getScope();
 
@@ -560,9 +561,9 @@ public class VirtualMachineManagerImplTest {
         HashMap<Volume, StoragePool> volumeToPoolObjectMap = new HashMap<>();
 
         Mockito.doReturn(ScopeType.CLUSTER).when(storagePoolVoMock).getScope();
-        Mockito.doNothing().when(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(hostMock, storagePoolVoMock, volumeVoMock);
-        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, volumeVoMock, storagePoolVoMock);
-        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(hostMockId, storagePoolVoMock);
+        Mockito.doNothing().when(virtualMachineManagerImpl).executeManagedStorageChecksWhenTargetStoragePoolNotProvided(Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.doNothing().when(virtualMachineManagerImpl).createVolumeToStoragePoolMappingIfPossible(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.doReturn(false).when(virtualMachineManagerImpl).isStorageCrossClusterMigration(Mockito.anyLong(), Mockito.any());
 
         virtualMachineManagerImpl.createStoragePoolMappingsForVolumes(virtualMachineProfileMock, dataCenterDeploymentMock, volumeToPoolObjectMap, allVolumes);
 
diff --git a/engine/schema/src/main/java/com/cloud/storage/SnapshotVO.java b/engine/schema/src/main/java/com/cloud/storage/SnapshotVO.java
index e70bdc87597..ebfad6633ed 100644
--- a/engine/schema/src/main/java/com/cloud/storage/SnapshotVO.java
+++ b/engine/schema/src/main/java/com/cloud/storage/SnapshotVO.java
@@ -28,6 +28,10 @@ import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Table;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
 import java.util.Date;
 import java.util.UUID;
 
@@ -274,4 +278,10 @@ public class SnapshotVO implements Snapshot {
     public Class<?> getEntityType() {
         return Snapshot.class;
     }
+
+    @Override
+    public String toString() {
+        return String.format("Snapshot %s", new ToStringBuilder(this, ToStringStyle.JSON_STYLE).append("uuid", getUuid()).append("name", getName())
+                .append("volumeId", getVolumeId()).toString());
+    }
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDao.java b/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDao.java
index 93ba720d0bf..998d0bbd724 100755
--- a/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDao.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDao.java
@@ -51,4 +51,10 @@ public interface SnapshotDao extends GenericDao<SnapshotVO, Long>, StateDao<Snap
 
     List<SnapshotVO> listByStatusNotIn(long volumeId, Snapshot.State... status);
 
+    /**
+     * Retrieves a list of snapshots filtered by ids.
+     * @param ids Snapshot ids.
+     * @return A list of snapshots filtered by ids.
+     */
+    List<SnapshotVO> listByIds(Object... ids);
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDaoImpl.java b/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDaoImpl.java
index 4c5bb3c74fb..f5ebf4bcf3d 100755
--- a/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/SnapshotDaoImpl.java
@@ -56,6 +56,7 @@ public class SnapshotDaoImpl extends GenericDaoBase<SnapshotVO, Long> implements
     private static final String GET_LAST_SNAPSHOT =
         "SELECT snapshots.id FROM snapshot_store_ref, snapshots where snapshots.id = snapshot_store_ref.snapshot_id AND snapshosts.volume_id = ? AND snapshot_store_ref.role = ? ORDER BY created DESC";
 
+    private SearchBuilder<SnapshotVO> snapshotIdsSearch;
     private SearchBuilder<SnapshotVO> VolumeIdSearch;
     private SearchBuilder<SnapshotVO> VolumeIdTypeSearch;
     private SearchBuilder<SnapshotVO> VolumeIdTypeNotDestroyedSearch;
@@ -168,6 +169,9 @@ public class SnapshotDaoImpl extends GenericDaoBase<SnapshotVO, Long> implements
         InstanceIdSearch = createSearchBuilder();
         InstanceIdSearch.and("status", InstanceIdSearch.entity().getState(), SearchCriteria.Op.IN);
 
+        snapshotIdsSearch = createSearchBuilder();
+        snapshotIdsSearch.and("id", snapshotIdsSearch.entity().getId(), SearchCriteria.Op.IN);
+
         SearchBuilder<VMInstanceVO> instanceSearch = _instanceDao.createSearchBuilder();
         instanceSearch.and("instanceId", instanceSearch.entity().getId(), SearchCriteria.Op.EQ);
 
@@ -248,6 +252,13 @@ public class SnapshotDaoImpl extends GenericDaoBase<SnapshotVO, Long> implements
         return listBy(sc, null);
     }
 
+    @Override
+    public List<SnapshotVO> listByIds(Object... ids) {
+        SearchCriteria<SnapshotVO> sc = snapshotIdsSearch.create();
+        sc.setParameters("id", ids);
+        return listBy(sc, null);
+    }
+
     @Override
     public boolean updateState(State currentState, Event event, State nextState, SnapshotVO snapshot, Object data) {
         TransactionLegacy txn = TransactionLegacy.currentTxn();
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java
index 5c6222b8dc9..2ce15894228 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java
@@ -80,4 +80,15 @@ StateDao<ObjectInDataStoreStateMachine.State, ObjectInDataStoreStateMachine.Even
     List<SnapshotDataStoreVO> findSnapshots(Long storeId, Date start, Date end);
 
     SnapshotDataStoreVO findDestroyedReferenceBySnapshot(long snapshotId, DataStoreRole role);
+
+    /**
+     * Removes the snapshot reference from the database according to its id and data store role.
+     * @return true if success, otherwise, false.
+     */
+    boolean expungeReferenceBySnapshotIdAndDataStoreRole(long snapshotId, DataStoreRole dataStorerole);
+
+    /**
+     * List all snapshots in 'snapshot_store_ref' with state 'Ready' by volume ID.
+     */
+    List<SnapshotDataStoreVO> listReadyByVolumeId(long volumeId);
 }
diff --git a/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java b/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java
index 74de7652a1f..a49612138a0 100644
--- a/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java
+++ b/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java
@@ -72,6 +72,7 @@ import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.VolumeVO;
 import com.cloud.storage.dao.VolumeDao;
+import static com.cloud.storage.snapshot.SnapshotManager.BackupSnapshotAfterTakingSnapshot;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.db.DB;
 import com.cloud.utils.exception.CloudRuntimeException;
@@ -291,6 +292,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
             }
 
             CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(volObj.getTO()), _createVolumeFromSnapshotWait, VirtualMachineManager.ExecuteInSequence.value());
+
             Answer answer = null;
             if (ep == null) {
                 String errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
@@ -582,6 +584,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
         }
         Map<String, String> options = new HashMap<String, String>();
         options.put("fullSnapshot", fullSnapshot.toString());
+        options.put(BackupSnapshotAfterTakingSnapshot.key(), String.valueOf(BackupSnapshotAfterTakingSnapshot.value()));
         Answer answer = null;
         try {
             if (needCacheStorage(srcData, destData)) {
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategy.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategy.java
index c0a2395aa34..68a9a2991ed 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategy.java
@@ -16,7 +16,10 @@
 // under the License.
 package org.apache.cloudstack.storage.snapshot;
 
+import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.inject.Inject;
 
@@ -40,6 +43,7 @@ import org.apache.cloudstack.storage.datastore.PrimaryDataStoreImpl;
 import org.apache.cloudstack.storage.to.SnapshotObjectTO;
 import org.apache.cloudstack.utils.identity.ManagementServerNode;
 
+import com.cloud.agent.api.to.DataTO;
 import com.cloud.event.EventTypes;
 import com.cloud.event.UsageEventUtils;
 import com.cloud.exception.InvalidParameterValueException;
@@ -53,7 +57,6 @@ import com.cloud.storage.StoragePool;
 import com.cloud.storage.StoragePoolStatus;
 import com.cloud.storage.Volume;
 import com.cloud.storage.VolumeVO;
-import com.cloud.storage.VolumeDetailVO;
 import com.cloud.storage.dao.SnapshotDao;
 import com.cloud.storage.dao.SnapshotDetailsDao;
 import com.cloud.storage.dao.VolumeDao;
@@ -70,6 +73,9 @@ import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.fsm.NoTransitionException;
 
 public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
+    private static final String SECONDARY_STORAGE_SNAPSHOT_ENTRY_IDENTIFIER = "secondary storage";
+    private static final String PRIMARY_STORAGE_SNAPSHOT_ENTRY_IDENTIFIER = "primary storage";
+
     private static final Logger s_logger = Logger.getLogger(DefaultSnapshotStrategy.class);
 
     @Inject
@@ -110,7 +116,7 @@ public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
                 CreateObjectAnswer createSnapshotAnswer = new CreateObjectAnswer(snapTO);
 
                 snapshotOnImageStore.processEvent(Event.OperationSuccessed, createSnapshotAnswer);
-                SnapshotObject snapObj = (SnapshotObject)snapshot;
+                SnapshotObject snapObj = castSnapshotInfoToSnapshotObject(snapshot);
                 try {
                     snapObj.processEvent(Snapshot.Event.OperationNotPerformed);
                 } catch (NoTransitionException e) {
@@ -171,27 +177,32 @@ public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
         return snapshotSvr.backupSnapshot(snapshot);
     }
 
-    protected boolean deleteSnapshotChain(SnapshotInfo snapshot) {
-        s_logger.debug("delete snapshot chain for snapshot: " + snapshot.getId());
+    private final List<Snapshot.State> snapshotStatesAbleToDeleteSnapshot = Arrays.asList(Snapshot.State.Destroying, Snapshot.State.Destroyed, Snapshot.State.Error);
+
+    protected boolean deleteSnapshotChain(SnapshotInfo snapshot, String storage) {
+        DataTO snapshotTo = snapshot.getTO();
+        s_logger.debug(String.format("Deleting %s chain of snapshots.", snapshotTo));
+
         boolean result = false;
-        boolean resultIsSet = false;   //need to track, the snapshot itself is deleted or not.
+        boolean resultIsSet = false;
         try {
-            while (snapshot != null &&
-                (snapshot.getState() == Snapshot.State.Destroying || snapshot.getState() == Snapshot.State.Destroyed || snapshot.getState() == Snapshot.State.Error)) {
+            while (snapshot != null && snapshotStatesAbleToDeleteSnapshot.contains(snapshot.getState())) {
                 SnapshotInfo child = snapshot.getChild();
 
                 if (child != null) {
-                    s_logger.debug("the snapshot has child, can't delete it on the storage");
+                    s_logger.debug(String.format("Snapshot [%s] has child [%s], not deleting it on the storage [%s]", snapshotTo, child.getTO(), storage));
                     break;
                 }
-                s_logger.debug("Snapshot: " + snapshot.getId() + " doesn't have children, so it's ok to delete it and its parents");
+
+                s_logger.debug(String.format("Snapshot [%s] does not have children; therefore, we will delete it and its parents.", snapshotTo));
+
                 SnapshotInfo parent = snapshot.getParent();
                 boolean deleted = false;
                 if (parent != null) {
                     if (parent.getPath() != null && parent.getPath().equalsIgnoreCase(snapshot.getPath())) {
                         //NOTE: if both snapshots share the same path, it's for xenserver's empty delta snapshot. We can't delete the snapshot on the backend, as parent snapshot still reference to it
                         //Instead, mark it as destroyed in the db.
-                        s_logger.debug("for empty delta snapshot, only mark it as destroyed in db");
+                        s_logger.debug(String.format("Snapshot [%s] is an empty delta snapshot; therefore, we will only mark it as destroyed in the database.", snapshotTo));
                         snapshot.processEvent(Event.DestroyRequested);
                         snapshot.processEvent(Event.OperationSuccessed);
                         deleted = true;
@@ -201,30 +212,31 @@ public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
                         }
                     }
                 }
+
                 if (!deleted) {
                     try {
                         boolean r = snapshotSvr.deleteSnapshot(snapshot);
                         if (r) {
-                            // delete snapshot in cache if there is
                             List<SnapshotInfo> cacheSnaps = snapshotDataFactory.listSnapshotOnCache(snapshot.getId());
                             for (SnapshotInfo cacheSnap : cacheSnaps) {
-                                s_logger.debug("Delete snapshot " + snapshot.getId() + " from image cache store: " + cacheSnap.getDataStore().getName());
+                                s_logger.debug(String.format("Deleting snapshot %s from image cache [%s].", snapshotTo, cacheSnap.getDataStore().getName()));
                                 cacheSnap.delete();
                             }
                         }
+
                         if (!resultIsSet) {
                             result = r;
                             resultIsSet = true;
                         }
                     } catch (Exception e) {
-                        // Snapshots which are not successfully deleted will be retried again.
-                        s_logger.debug("Failed to delete snapshot on storage. ", e);
+                        s_logger.error(String.format("Failed to delete snapshot [%s] on storage [%s] due to [%s].", snapshotTo, storage, e.getMessage()), e);
                     }
                 }
+
                 snapshot = parent;
             }
         } catch (Exception e) {
-            s_logger.debug("delete snapshot failed: ", e);
+            s_logger.error(String.format("Failed to delete snapshot [%s] on storage [%s] due to [%s].", snapshotTo, storage, e.getMessage()), e);
         }
         return result;
     }
@@ -262,142 +274,93 @@ public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
             throw new InvalidParameterValueException("Can't delete snapshotshot " + snapshotId + " due to it is in " + snapshotVO.getState() + " Status");
         }
 
-        Boolean deletedOnSecondary = deleteOnSecondaryIfNeeded(snapshotId);
-        boolean deletedOnPrimary = deleteOnPrimaryIfNeeded(snapshotId);
-
-        if (deletedOnPrimary) {
-            s_logger.debug(String.format("Successfully deleted snapshot (id: %d) on primary storage.", snapshotId));
-        } else {
-            s_logger.debug(String.format("The snapshot (id: %d) could not be found/deleted on primary storage.", snapshotId));
-        }
-        if (null != deletedOnSecondary && deletedOnSecondary) {
-            s_logger.debug(String.format("Successfully deleted snapshot (id: %d) on secondary storage.", snapshotId));
-        }
-        return (deletedOnSecondary != null) && deletedOnSecondary || deletedOnPrimary;
+        return destroySnapshotEntriesAndFiles(snapshotVO);
     }
 
-    private boolean deleteOnPrimaryIfNeeded(Long snapshotId) {
-        SnapshotVO snapshotVO;
-        boolean deletedOnPrimary = false;
-        snapshotVO = snapshotDao.findById(snapshotId);
-        SnapshotInfo snapshotOnPrimaryInfo = snapshotDataFactory.getSnapshot(snapshotId, DataStoreRole.Primary);
-        if (snapshotVO != null && snapshotVO.getState() == Snapshot.State.Destroyed) {
-            deletedOnPrimary = deleteSnapshotOnPrimary(snapshotId, snapshotOnPrimaryInfo);
-        } else {
-            // Here we handle snapshots which are to be deleted but are not marked as destroyed yet.
-            // This may occur for instance when they are created only on primary because
-            // snapshot.backup.to.secondary was set to false.
-            if (snapshotOnPrimaryInfo == null) {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug(String.format("Snapshot (id: %d) not found on primary storage, skipping snapshot deletion on primary and marking it destroyed", snapshotId));
-                }
-                snapshotVO.setState(Snapshot.State.Destroyed);
-                snapshotDao.update(snapshotId, snapshotVO);
-                deletedOnPrimary = true;
-            } else {
-                SnapshotObject obj = (SnapshotObject) snapshotOnPrimaryInfo;
-                try {
-                    obj.processEvent(Snapshot.Event.DestroyRequested);
-                    deletedOnPrimary = deleteSnapshotOnPrimary(snapshotId, snapshotOnPrimaryInfo);
-                    if (!deletedOnPrimary) {
-                        obj.processEvent(Snapshot.Event.OperationFailed);
-                    } else {
-                        obj.processEvent(Snapshot.Event.OperationSucceeded);
-                    }
-                } catch (NoTransitionException e) {
-                    s_logger.debug("Failed to set the state to destroying: ", e);
-                    deletedOnPrimary = false;
-                }
-            }
+    /**
+     * Destroys the snapshot entries and files on both primary and secondary storage (if it exists).
+     * @return true if destroy successfully, else false.
+     */
+    protected boolean destroySnapshotEntriesAndFiles(SnapshotVO snapshotVo) {
+        if (!deleteSnapshotInfos(snapshotVo)) {
+            return false;
         }
-        return deletedOnPrimary;
-    }
 
-    private Boolean deleteOnSecondaryIfNeeded(Long snapshotId) {
-        Boolean deletedOnSecondary = null;
-        SnapshotInfo snapshotOnImage = snapshotDataFactory.getSnapshot(snapshotId, DataStoreRole.Image);
-        if (snapshotOnImage == null) {
-            s_logger.debug(String.format("Can't find snapshot [snapshot id: %d] on secondary storage", snapshotId));
-        } else {
-            SnapshotObject obj = (SnapshotObject)snapshotOnImage;
-            try {
-                deletedOnSecondary = deleteSnapshotOnSecondaryStorage(snapshotId, snapshotOnImage, obj);
-                if (!deletedOnSecondary) {
-                    s_logger.debug(
-                            String.format("Failed to find/delete snapshot (id: %d) on secondary storage. Still necessary to check and delete snapshot on primary storage.",
-                                    snapshotId));
-                } else {
-                    s_logger.debug(String.format("Snapshot (id: %d) has been deleted on secondary storage.", snapshotId));
-                }
-            } catch (NoTransitionException e) {
-                s_logger.debug("Failed to set the state to destroying: ", e);
-//                deletedOnSecondary remain null
-            }
-        }
-        return deletedOnSecondary;
+        updateSnapshotToDestroyed(snapshotVo);
+
+        return true;
     }
 
     /**
-     * Deletes the snapshot on secondary storage.
-     * It can return false when the snapshot was stored on primary storage and not backed up on secondary; therefore, the snapshot should also be deleted on primary storage even when this method returns false.
+     * Updates the snapshot to {@link Snapshot.State#Destroyed}.
      */
-    private boolean deleteSnapshotOnSecondaryStorage(Long snapshotId, SnapshotInfo snapshotOnImage, SnapshotObject obj) throws NoTransitionException {
-        obj.processEvent(Snapshot.Event.DestroyRequested);
-        List<VolumeDetailVO> volumesFromSnapshot;
-        volumesFromSnapshot = _volumeDetailsDaoImpl.findDetails("SNAPSHOT_ID", String.valueOf(snapshotId), null);
+    protected void updateSnapshotToDestroyed(SnapshotVO snapshotVo) {
+        snapshotVo.setState(Snapshot.State.Destroyed);
+        snapshotDao.update(snapshotVo.getId(), snapshotVo);
+    }
 
-        if (volumesFromSnapshot.size() > 0) {
-            try {
-                obj.processEvent(Snapshot.Event.OperationFailed);
-            } catch (NoTransitionException e1) {
-                s_logger.debug("Failed to change snapshot state: " + e1.toString());
+    protected boolean deleteSnapshotInfos(SnapshotVO snapshotVo) {
+        Map<String, SnapshotInfo> snapshotInfos = retrieveSnapshotEntries(snapshotVo.getId());
+
+        for (var infoEntry : snapshotInfos.entrySet()) {
+            if (!deleteSnapshotInfo(infoEntry.getValue(), infoEntry.getKey(), snapshotVo)) {
+                return false;
             }
-            throw new InvalidParameterValueException("Unable to perform delete operation, Snapshot with id: " + snapshotId + " is in use  ");
         }
 
-        boolean result = deleteSnapshotChain(snapshotOnImage);
-        obj.processEvent(Snapshot.Event.OperationSucceeded);
-        return result;
+        return true;
     }
 
     /**
-     * Deletes the snapshot on primary storage. It returns true when the snapshot was not found on primary storage; </br>
-     * In case of failure while deleting the snapshot, it will throw one of the following exceptions: CloudRuntimeException, InterruptedException, or ExecutionException. </br>
+     * Destroys the snapshot entry and file.
+     * @return true if destroy successfully, else false.
      */
-    private boolean deleteSnapshotOnPrimary(Long snapshotId, SnapshotInfo snapshotOnPrimaryInfo) {
-        SnapshotDataStoreVO snapshotOnPrimary = snapshotStoreDao.findBySnapshot(snapshotId, DataStoreRole.Primary);
-        if (isSnapshotOnPrimaryStorage(snapshotId)) {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug(String.format("Snapshot reference is found on primary storage for snapshot id: %d, performing snapshot deletion on primary", snapshotId));
-            }
-            if (snapshotSvr.deleteSnapshot(snapshotOnPrimaryInfo)) {
-                snapshotOnPrimary.setState(State.Destroyed);
-                snapshotStoreDao.update(snapshotOnPrimary.getId(), snapshotOnPrimary);
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug(String.format("Successfully deleted snapshot id: %d on primary storage", snapshotId));
-                }
+    protected boolean deleteSnapshotInfo(SnapshotInfo snapshotInfo, String storage, SnapshotVO snapshotVo) {
+        if (snapshotInfo == null) {
+            s_logger.debug(String.format("Could not find %s entry on a %s. Skipping deletion on %s.", snapshotVo, storage, storage));
+            return true;
+        }
+
+        DataStore dataStore = snapshotInfo.getDataStore();
+        storage = String.format("%s {uuid: \"%s\", name: \"%s\"}", storage, dataStore.getUuid(), dataStore.getName());
+
+        try {
+            SnapshotObject snapshotObject = castSnapshotInfoToSnapshotObject(snapshotInfo);
+            snapshotObject.processEvent(Snapshot.Event.DestroyRequested);
+
+            if (deleteSnapshotChain(snapshotInfo, storage)) {
+                snapshotObject.processEvent(Snapshot.Event.OperationSucceeded);
+                s_logger.debug(String.format("%s was deleted on %s.", snapshotVo, storage));
                 return true;
             }
-        } else {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug(String.format("Snapshot reference is not found on primary storage for snapshot id: %d, skipping snapshot deletion on primary", snapshotId));
-            }
-            return true;
+
+            snapshotObject.processEvent(Snapshot.Event.OperationFailed);
+            s_logger.debug(String.format("Failed to delete %s on %s.", snapshotVo, storage));
+        } catch (NoTransitionException ex) {
+            s_logger.warn(String.format("Failed to delete %s on %s due to %s.", snapshotVo, storage, ex.getMessage()), ex);
         }
+
         return false;
     }
 
     /**
-     * Returns true if the snapshot volume is on primary storage.
+     * Cast SnapshotInfo to SnapshotObject.
+     * @return SnapshotInfo cast to SnapshotObject.
      */
-    private boolean isSnapshotOnPrimaryStorage(long snapshotId) {
-        SnapshotDataStoreVO snapshotOnPrimary = snapshotStoreDao.findBySnapshot(snapshotId, DataStoreRole.Primary);
-        if (snapshotOnPrimary != null) {
-            long volumeId = snapshotOnPrimary.getVolumeId();
-            VolumeVO volumeVO = volumeDao.findById(volumeId);
-            return volumeVO != null && volumeVO.getRemoved() == null;
-        }
-        return false;
+    protected SnapshotObject castSnapshotInfoToSnapshotObject(SnapshotInfo snapshotInfo) {
+        return (SnapshotObject) snapshotInfo;
+    }
+
+    /**
+     * Retrieves the snapshot infos on primary and secondary storage.
+     * @param snapshotId The snapshot to retrieve the infos.
+     * @return A map of snapshot infos.
+     */
+    protected Map<String, SnapshotInfo> retrieveSnapshotEntries(long snapshotId) {
+        Map<String, SnapshotInfo> snapshotInfos = new LinkedHashMap<>();
+        snapshotInfos.put(SECONDARY_STORAGE_SNAPSHOT_ENTRY_IDENTIFIER, snapshotDataFactory.getSnapshot(snapshotId, DataStoreRole.Image, false));
+        snapshotInfos.put(PRIMARY_STORAGE_SNAPSHOT_ENTRY_IDENTIFIER, snapshotDataFactory.getSnapshot(snapshotId, DataStoreRole.Primary, false));
+        return snapshotInfos;
     }
 
     @Override
@@ -490,7 +453,7 @@ public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
 
             /*The Management Server ID is stored in snapshot_details table with the snapshot id and (name, value): (MS_ID, <ms_id>), to know which snapshots have not been completed in case of some failure situation like
              *  Mgmt server down etc. and by fetching the entries on restart the cleaning up of failed snapshots is done*/
-            _snapshotDetailsDao.addDetail(((SnapshotObject)snapshotOnPrimary).getId(), AsyncJob.Constants.MS_ID, Long.toString(ManagementServerNode.getManagementServerId()), false);
+            _snapshotDetailsDao.addDetail(castSnapshotInfoToSnapshotObject(snapshotOnPrimary).getId(), AsyncJob.Constants.MS_ID, Long.toString(ManagementServerNode.getManagementServerId()), false);
             return snapshotOnPrimary;
         } finally {
             if (snapshotVO != null) {
@@ -504,7 +467,7 @@ public class DefaultSnapshotStrategy extends SnapshotStrategyBase {
         Transaction.execute(new TransactionCallbackNoReturn() {
             @Override
             public void doInTransactionWithoutResult(TransactionStatus status) {
-                _snapshotDetailsDao.removeDetail(((SnapshotObject)snapshotOnPrimary).getId(), AsyncJob.Constants.MS_ID);
+                _snapshotDetailsDao.removeDetail(castSnapshotInfoToSnapshotObject(snapshotOnPrimary).getId(), AsyncJob.Constants.MS_ID);
                 DataStore primaryStore = snapshotOnPrimary.getDataStore();
                 try {
                     SnapshotInfo parent = snapshotOnPrimary.getParent();
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java
index 9799ffe2fcb..d894d7953ff 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotDataFactoryImpl.java
@@ -83,15 +83,23 @@ public class SnapshotDataFactoryImpl implements SnapshotDataFactory {
         return infos;
     }
 
-
     @Override
     public SnapshotInfo getSnapshot(long snapshotId, DataStoreRole role) {
+        return getSnapshot(snapshotId, role, true);
+    }
+
+    @Override
+    public SnapshotInfo getSnapshot(long snapshotId, DataStoreRole role, boolean retrieveAnySnapshotFromVolume) {
         SnapshotVO snapshot = snapshotDao.findById(snapshotId);
         if (snapshot == null) {
             return null;
         }
         SnapshotDataStoreVO snapshotStore = snapshotStoreDao.findBySnapshot(snapshotId, role);
         if (snapshotStore == null) {
+            if (!retrieveAnySnapshotFromVolume) {
+                return null;
+            }
+
             snapshotStore = snapshotStoreDao.findByVolume(snapshotId, snapshot.getVolumeId(), role);
             if (snapshotStore == null) {
                 return null;
diff --git a/engine/storage/snapshot/src/test/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategyTest.java b/engine/storage/snapshot/src/test/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategyTest.java
new file mode 100644
index 00000000000..2143311c3bf
--- /dev/null
+++ b/engine/storage/snapshot/src/test/java/org/apache/cloudstack/storage/snapshot/DefaultSnapshotStrategyTest.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.snapshot;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.utils.fsm.NoTransitionException;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultSnapshotStrategyTest {
+
+    DefaultSnapshotStrategy defaultSnapshotStrategySpy = Mockito.spy(DefaultSnapshotStrategy.class);
+
+    @Mock
+    SnapshotDataFactory snapshotDataFactoryMock;
+
+    @Mock
+    SnapshotInfo snapshotInfo1Mock, snapshotInfo2Mock;
+
+    @Mock
+    SnapshotObject snapshotObjectMock;
+
+    @Mock
+    SnapshotDao snapshotDaoMock;
+
+    @Mock
+    SnapshotVO snapshotVoMock;
+
+    @Mock
+    DataStore dataStoreMock;
+
+    Map<String, SnapshotInfo> mapStringSnapshotInfoInstance = new LinkedHashMap<>();
+
+    @Before
+    public void setup() {
+        defaultSnapshotStrategySpy.snapshotDataFactory = snapshotDataFactoryMock;
+        defaultSnapshotStrategySpy.snapshotDao = snapshotDaoMock;
+
+        mapStringSnapshotInfoInstance.put("secondary storage", snapshotInfo1Mock);
+        mapStringSnapshotInfoInstance.put("priamry storage", snapshotInfo1Mock);
+    }
+
+    @Test
+    public void validateRetrieveSnapshotEntries() {
+        Long snapshotId = 1l;
+        Mockito.doReturn(snapshotInfo1Mock, snapshotInfo2Mock).when(snapshotDataFactoryMock).getSnapshot(Mockito.anyLong(), Mockito.any(DataStoreRole.class), Mockito.anyBoolean());
+        Map<String, SnapshotInfo> result = defaultSnapshotStrategySpy.retrieveSnapshotEntries(snapshotId);
+
+        Mockito.verify(snapshotDataFactoryMock).getSnapshot(snapshotId, DataStoreRole.Image, false);
+        Mockito.verify(snapshotDataFactoryMock).getSnapshot(snapshotId, DataStoreRole.Primary, false);
+
+        Assert.assertTrue(result.containsKey("secondary storage"));
+        Assert.assertTrue(result.containsKey("primary storage"));
+        Assert.assertEquals(snapshotInfo1Mock, result.get("secondary storage"));
+        Assert.assertEquals(snapshotInfo2Mock, result.get("primary storage"));
+    }
+
+    @Test
+    public void validateUpdateSnapshotToDestroyed() {
+        Mockito.doReturn(true).when(snapshotDaoMock).update(Mockito.anyLong(), Mockito.any());
+        defaultSnapshotStrategySpy.updateSnapshotToDestroyed(snapshotVoMock);
+
+        Mockito.verify(snapshotVoMock).setState(Snapshot.State.Destroyed);
+    }
+
+    @Test
+    public void validateDestroySnapshotEntriesAndFilesFailToDeleteReturnsFalse() {
+        Mockito.doReturn(false).when(defaultSnapshotStrategySpy).deleteSnapshotInfos(Mockito.any());
+        Assert.assertFalse(defaultSnapshotStrategySpy.destroySnapshotEntriesAndFiles(snapshotVoMock));
+    }
+
+    @Test
+    public void validateDestroySnapshotEntriesAndFilesDeletesSuccessfullyReturnsTrue() {
+        Mockito.doReturn(true).when(defaultSnapshotStrategySpy).deleteSnapshotInfos(Mockito.any());
+        Assert.assertTrue(defaultSnapshotStrategySpy.destroySnapshotEntriesAndFiles(snapshotVoMock));
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfosFailToDeleteReturnsFalse() {
+        Mockito.doReturn(mapStringSnapshotInfoInstance).when(defaultSnapshotStrategySpy).retrieveSnapshotEntries(Mockito.anyLong());
+        Mockito.doReturn(false).when(defaultSnapshotStrategySpy).deleteSnapshotInfo(Mockito.any(), Mockito.anyString(), Mockito.any());
+        Assert.assertFalse(defaultSnapshotStrategySpy.deleteSnapshotInfos(snapshotVoMock));
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfosDeletesSuccessfullyReturnsTrue() {
+        Mockito.doReturn(mapStringSnapshotInfoInstance).when(defaultSnapshotStrategySpy).retrieveSnapshotEntries(Mockito.anyLong());
+        Mockito.doReturn(true).when(defaultSnapshotStrategySpy).deleteSnapshotInfo(Mockito.any(), Mockito.anyString(), Mockito.any());
+        Assert.assertTrue(defaultSnapshotStrategySpy.deleteSnapshotInfos(snapshotVoMock));
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfoSnapshotInfoIsNullOnSecondaryStorageReturnsTrue() {
+        Assert.assertTrue(defaultSnapshotStrategySpy.deleteSnapshotInfo(null, "secondary storage", snapshotVoMock));
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfoSnapshotInfoIsNullOnPrimaryStorageReturnsFalse() {
+        Assert.assertTrue(defaultSnapshotStrategySpy.deleteSnapshotInfo(null, "primary storage", snapshotVoMock));
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfoSnapshotDeleteSnapshotChainSuccessfullyReturnsTrue() throws NoTransitionException {
+        Mockito.doReturn(dataStoreMock).when(snapshotInfo1Mock).getDataStore();
+        Mockito.doReturn(snapshotObjectMock).when(defaultSnapshotStrategySpy).castSnapshotInfoToSnapshotObject(snapshotInfo1Mock);
+        Mockito.doNothing().when(snapshotObjectMock).processEvent(Mockito.any(Snapshot.Event.class));
+        Mockito.doReturn(true).when(defaultSnapshotStrategySpy).deleteSnapshotChain(Mockito.any(), Mockito.anyString());
+
+        Assert.assertTrue(defaultSnapshotStrategySpy.deleteSnapshotInfo(snapshotInfo1Mock, "secondary storage", snapshotVoMock));
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfoSnapshotDeleteSnapshotChainFails() throws NoTransitionException {
+        Mockito.doReturn(dataStoreMock).when(snapshotInfo1Mock).getDataStore();
+        Mockito.doReturn(snapshotObjectMock).when(defaultSnapshotStrategySpy).castSnapshotInfoToSnapshotObject(snapshotInfo1Mock);
+        Mockito.doNothing().when(snapshotObjectMock).processEvent(Mockito.any(Snapshot.Event.class));
+        Mockito.doReturn(false).when(defaultSnapshotStrategySpy).deleteSnapshotChain(Mockito.any(), Mockito.anyString());
+
+        boolean result = defaultSnapshotStrategySpy.deleteSnapshotInfo(snapshotInfo1Mock, "secondary storage", snapshotVoMock);
+        Assert.assertFalse(result);
+    }
+
+    @Test
+    public void validateDeleteSnapshotInfoSnapshotProcessSnapshotEventThrowsNoTransitionExceptionReturnsFalse() throws NoTransitionException {
+        Mockito.doReturn(dataStoreMock).when(snapshotInfo1Mock).getDataStore();
+        Mockito.doReturn(snapshotObjectMock).when(defaultSnapshotStrategySpy).castSnapshotInfoToSnapshotObject(snapshotInfo1Mock);
+        Mockito.doThrow(NoTransitionException.class).when(snapshotObjectMock).processEvent(Mockito.any(Snapshot.Event.class));
+
+        Assert.assertFalse(defaultSnapshotStrategySpy.deleteSnapshotInfo(snapshotInfo1Mock, "secondary storage", snapshotVoMock));
+    }
+}
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
index c950eff9e95..fa6c8d1f606 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
@@ -57,15 +57,16 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
     private SearchBuilder<SnapshotDataStoreVO> storeStateSearch;
     private SearchBuilder<SnapshotDataStoreVO> destroyedSearch;
     private SearchBuilder<SnapshotDataStoreVO> cacheSearch;
-    private SearchBuilder<SnapshotDataStoreVO> snapshotSearch;
     private SearchBuilder<SnapshotDataStoreVO> storeSnapshotSearch;
     private SearchBuilder<SnapshotDataStoreVO> snapshotIdSearch;
     private SearchBuilder<SnapshotDataStoreVO> volumeIdSearch;
+    private SearchBuilder<SnapshotDataStoreVO> volumeIdAndStateReadySearch;
     private SearchBuilder<SnapshotDataStoreVO> volumeSearch;
     private SearchBuilder<SnapshotDataStoreVO> stateSearch;
     private SearchBuilder<SnapshotDataStoreVO> parentSnapshotSearch;
     private SearchBuilder<SnapshotVO> snapshotVOSearch;
     private SearchBuilder<SnapshotDataStoreVO> snapshotCreatedSearch;
+    protected SearchBuilder<SnapshotDataStoreVO> snapshotSearch;
 
     public static ArrayList<Hypervisor.HypervisorType> hypervisorsSupportingSnapshotsChaining = new ArrayList<Hypervisor.HypervisorType>();
 
@@ -145,6 +146,11 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
         volumeSearch.and("snapshot_id", volumeSearch.entity().getSnapshotId(), SearchCriteria.Op.EQ);
         volumeSearch.done();
 
+        volumeIdAndStateReadySearch = createSearchBuilder();
+        volumeIdAndStateReadySearch.and("volume_id", volumeIdAndStateReadySearch.entity().getVolumeId(), SearchCriteria.Op.EQ);
+        volumeIdAndStateReadySearch.and("state", volumeIdAndStateReadySearch.entity().getState(), SearchCriteria.Op.EQ);
+        volumeIdAndStateReadySearch.done();
+
         stateSearch = createSearchBuilder();
         stateSearch.and("state", stateSearch.entity().getState(), SearchCriteria.Op.IN);
         stateSearch.done();
@@ -334,18 +340,14 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
 
     @Override
     public SnapshotDataStoreVO findBySnapshot(long snapshotId, DataStoreRole role) {
-        SearchCriteria<SnapshotDataStoreVO> sc = snapshotSearch.create();
-        sc.setParameters("snapshot_id", snapshotId);
-        sc.setParameters("store_role", role);
+        SearchCriteria<SnapshotDataStoreVO> sc = createSearchCriteriaBySnapshotIdAndStoreRole(snapshotId, role);
         sc.setParameters("state", State.Ready);
         return findOneBy(sc);
     }
 
     @Override
     public SnapshotDataStoreVO findBySourceSnapshot(long snapshotId, DataStoreRole role) {
-        SearchCriteria<SnapshotDataStoreVO> sc = snapshotSearch.create();
-        sc.setParameters("snapshot_id", snapshotId);
-        sc.setParameters("store_role", role);
+        SearchCriteria<SnapshotDataStoreVO> sc = createSearchCriteriaBySnapshotIdAndStoreRole(snapshotId, role);
         sc.setParameters("state", State.Migrating);
         return findOneBy(sc);
     }
@@ -497,11 +499,20 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
     }
 
     public SnapshotDataStoreVO findDestroyedReferenceBySnapshot(long snapshotId, DataStoreRole role) {
+        SearchCriteria<SnapshotDataStoreVO> sc = createSearchCriteriaBySnapshotIdAndStoreRole(snapshotId, role);
+        sc.setParameters("state", State.Destroyed);
+        return findOneBy(sc);
+    }
+
+    /**
+     * Creates a SearchCriteria with snapshot id and data store role.
+     * @return A SearchCriteria with snapshot id and data store role
+     */
+    protected SearchCriteria<SnapshotDataStoreVO> createSearchCriteriaBySnapshotIdAndStoreRole(long snapshotId, DataStoreRole role) {
         SearchCriteria<SnapshotDataStoreVO> sc = snapshotSearch.create();
         sc.setParameters("snapshot_id", snapshotId);
         sc.setParameters("store_role", role);
-        sc.setParameters("state", State.Destroyed);
-        return findOneBy(sc);
+        return sc;
     }
 
     private boolean isSnapshotChainingRequired(long volumeId) {
@@ -520,4 +531,22 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
         return false;
     }
 
+    @Override
+    public boolean expungeReferenceBySnapshotIdAndDataStoreRole(long snapshotId, DataStoreRole dataStoreRole) {
+        SnapshotDataStoreVO snapshotDataStoreVo = findOneBy(createSearchCriteriaBySnapshotIdAndStoreRole(snapshotId, dataStoreRole));
+
+        if (snapshotDataStoreVo == null) {
+            return true;
+        }
+
+        return expunge(snapshotDataStoreVo.getId());
+    }
+
+    @Override
+    public List<SnapshotDataStoreVO> listReadyByVolumeId(long volumeId) {
+        SearchCriteria<SnapshotDataStoreVO> sc = volumeIdAndStateReadySearch.create();
+        sc.setParameters("volume_id", volumeId);
+        sc.setParameters("state", State.Ready);
+        return listBy(sc);
+    }
 }
diff --git a/engine/storage/src/test/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImplTest.java b/engine/storage/src/test/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImplTest.java
new file mode 100644
index 00000000000..b7be8e3e526
--- /dev/null
+++ b/engine/storage/src/test/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImplTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.storage.image.db;
+
+import com.cloud.storage.DataStoreRole;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SnapshotDataStoreDaoImplTest {
+
+    SnapshotDataStoreDaoImpl snapshotDataStoreDaoImplSpy = Mockito.spy(SnapshotDataStoreDaoImpl.class);
+
+    @Mock
+    SnapshotDataStoreVO snapshotDataStoreVoMock;
+
+    @Mock
+    SearchCriteria searchCriteriaMock;
+
+    @Mock
+    SearchBuilder searchBuilderMock;
+
+    @Before
+    public void init(){
+        snapshotDataStoreDaoImplSpy.snapshotSearch = searchBuilderMock;
+    }
+
+    @Test
+    public void validateExpungeReferenceBySnapshotIdAndDataStoreRoleNullReference(){
+        Mockito.doReturn(searchCriteriaMock).when(snapshotDataStoreDaoImplSpy).createSearchCriteriaBySnapshotIdAndStoreRole(Mockito.anyLong(), Mockito.any());
+        Mockito.doReturn(null).when(snapshotDataStoreDaoImplSpy).findOneBy(searchCriteriaMock);
+        Assert.assertTrue(snapshotDataStoreDaoImplSpy.expungeReferenceBySnapshotIdAndDataStoreRole(0, DataStoreRole.Image));
+    }
+
+    @Test
+    public void validateExpungeReferenceBySnapshotIdAndDataStoreRole(){
+        Mockito.doReturn(searchCriteriaMock).when(snapshotDataStoreDaoImplSpy).createSearchCriteriaBySnapshotIdAndStoreRole(Mockito.anyLong(), Mockito.any());
+        Mockito.doReturn(snapshotDataStoreVoMock).when(snapshotDataStoreDaoImplSpy).findOneBy(searchCriteriaMock);
+        Mockito.doReturn(true).when(snapshotDataStoreDaoImplSpy).expunge(Mockito.anyLong());
+        Assert.assertTrue(snapshotDataStoreDaoImplSpy.expungeReferenceBySnapshotIdAndDataStoreRole(0, DataStoreRole.Image));
+    }
+
+    @Test
+    public void validateCreateSearchCriteriaBySnapshotIdAndStoreRole() {
+        Mockito.doReturn(searchCriteriaMock).when(searchBuilderMock).create();
+        Mockito.doNothing().when(searchCriteriaMock).setParameters(Mockito.anyString(), Mockito.any());
+        SearchCriteria result = snapshotDataStoreDaoImplSpy.createSearchCriteriaBySnapshotIdAndStoreRole(0, DataStoreRole.Image);
+
+        Mockito.verify(searchCriteriaMock).setParameters("snapshot_id", 0l);
+        Mockito.verify(searchCriteriaMock).setParameters("store_role", DataStoreRole.Image);
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapper.java
index 3807e5ca931..92e737e528e 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapper.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapper.java
@@ -20,6 +20,13 @@
 package com.cloud.hypervisor.kvm.resource.wrapper;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
 
 import org.apache.cloudstack.storage.command.RevertSnapshotCommand;
 import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
@@ -42,12 +49,14 @@ import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
 import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
 import com.cloud.resource.CommandWrapper;
 import com.cloud.resource.ResourceWrapper;
+import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.Storage.StoragePoolType;
+import com.cloud.utils.Pair;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.script.Script;
 
 @ResourceWrapper(handles = RevertSnapshotCommand.class)
-public final class LibvirtRevertSnapshotCommandWrapper extends CommandWrapper<RevertSnapshotCommand, Answer, LibvirtComputingResource> {
+public class LibvirtRevertSnapshotCommandWrapper extends CommandWrapper<RevertSnapshotCommand, Answer, LibvirtComputingResource> {
 
     private static final Logger s_logger = Logger.getLogger(LibvirtRevertSnapshotCommandWrapper.class);
     private static final String MON_HOST = "mon_host";
@@ -55,24 +64,26 @@ public final class LibvirtRevertSnapshotCommandWrapper extends CommandWrapper<Re
     private static final String CLIENT_MOUNT_TIMEOUT = "client_mount_timeout";
     private static final String RADOS_CONNECTION_TIMEOUT = "30";
 
+    protected Set<StoragePoolType> storagePoolTypesThatSupportRevertSnapshot = new HashSet<>(Arrays.asList(StoragePoolType.RBD, StoragePoolType.Filesystem,
+            StoragePoolType.NetworkFilesystem, StoragePoolType.SharedMountPoint));
+
     @Override
     public Answer execute(final RevertSnapshotCommand command, final LibvirtComputingResource libvirtComputingResource) {
+        SnapshotObjectTO snapshotOnPrimaryStorage = command.getDataOnPrimaryStorage();
         SnapshotObjectTO snapshot = command.getData();
         VolumeObjectTO volume = snapshot.getVolume();
         PrimaryDataStoreTO primaryStore = (PrimaryDataStoreTO)volume.getDataStore();
         DataStoreTO snapshotImageStore = snapshot.getDataStore();
-        if (!(snapshotImageStore instanceof NfsTO) && primaryStore.getPoolType() != StoragePoolType.RBD) {
+        if (!(snapshotImageStore instanceof NfsTO) && !storagePoolTypesThatSupportRevertSnapshot.contains(primaryStore.getPoolType())) {
             return new Answer(command, false,
                     String.format("Revert snapshot does not support storage pool of type [%s]. Revert snapshot is supported by storage pools of type 'NFS' or 'RBD'",
                             primaryStore.getPoolType()));
         }
 
         String volumePath = volume.getPath();
-        String snapshotPath = null;
-        String snapshotRelPath = null;
-        KVMStoragePool secondaryStoragePool = null;
+        String snapshotRelPath = snapshot.getPath();
+
         try {
-            snapshotRelPath = snapshot.getPath();
             KVMStoragePoolManager storagePoolMgr = libvirtComputingResource.getStoragePoolMgr();
 
             KVMPhysicalDisk snapshotDisk = storagePoolMgr.getPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volumePath);
@@ -100,20 +111,23 @@ public final class LibvirtRevertSnapshotCommandWrapper extends CommandWrapper<Re
                 rbd.close(image);
                 rados.ioCtxDestroy(io);
             } else {
-                NfsTO nfsImageStore = (NfsTO)snapshotImageStore;
-                String secondaryStoragePoolUrl = nfsImageStore.getUrl();
-                secondaryStoragePool = storagePoolMgr.getStoragePoolByURI(secondaryStoragePoolUrl);
-                String ssPmountPath = secondaryStoragePool.getLocalPath();
-                snapshotPath = ssPmountPath + File.separator + snapshotRelPath;
-
-                Script cmd = new Script(libvirtComputingResource.manageSnapshotPath(), libvirtComputingResource.getCmdsTimeout(), s_logger);
-                cmd.add("-v", snapshotPath);
-                cmd.add("-n", snapshotDisk.getName());
-                cmd.add("-p", snapshotDisk.getPath());
-                String result = cmd.execute();
-                if (result != null) {
-                    s_logger.debug("Failed to revert snaptshot: " + result);
-                    return new Answer(command, false, result);
+                KVMStoragePool secondaryStoragePool = null;
+                if (snapshotImageStore != null && DataStoreRole.Primary != snapshotImageStore.getRole()) {
+                    secondaryStoragePool = storagePoolMgr.getStoragePoolByURI(snapshotImageStore.getUrl());
+                }
+
+                if (primaryPool.getType() == StoragePoolType.CLVM) {
+                    Script cmd = new Script(libvirtComputingResource.manageSnapshotPath(), libvirtComputingResource.getCmdsTimeout(), s_logger);
+                    cmd.add("-v", getFullPathAccordingToStorage(secondaryStoragePool, snapshotRelPath));
+                    cmd.add("-n", snapshotDisk.getName());
+                    cmd.add("-p", snapshotDisk.getPath());
+                    String result = cmd.execute();
+                    if (result != null) {
+                        s_logger.debug("Failed to revert snaptshot: " + result);
+                        return new Answer(command, false, result);
+                    }
+                } else {
+                    revertVolumeToSnapshot(snapshotOnPrimaryStorage, snapshot, snapshotImageStore, primaryPool, secondaryStoragePool);
                 }
             }
 
@@ -128,4 +142,73 @@ public final class LibvirtRevertSnapshotCommandWrapper extends CommandWrapper<Re
             return new Answer(command, false, e.toString());
         }
     }
+
+    /**
+     * Retrieves the full path according to the storage.
+     * @return The full path according to the storage.
+     */
+    protected String getFullPathAccordingToStorage(KVMStoragePool kvmStoragePool, String path) {
+        return String.format("%s%s%s", kvmStoragePool.getLocalPath(), File.separator, path);
+    }
+
+    /**
+     * Reverts the volume to the snapshot.
+     */
+    protected void revertVolumeToSnapshot(SnapshotObjectTO snapshotOnPrimaryStorage, SnapshotObjectTO snapshotOnSecondaryStorage, DataStoreTO dataStoreTo,
+            KVMStoragePool kvmStoragePoolPrimary, KVMStoragePool kvmStoragePoolSecondary) {
+        VolumeObjectTO volumeObjectTo = snapshotOnSecondaryStorage.getVolume();
+        String volumePath = getFullPathAccordingToStorage(kvmStoragePoolPrimary, volumeObjectTo.getPath());
+
+        Pair<String, SnapshotObjectTO> resultGetSnapshot = getSnapshot(snapshotOnPrimaryStorage, snapshotOnSecondaryStorage, kvmStoragePoolPrimary, kvmStoragePoolSecondary);
+        String snapshotPath = resultGetSnapshot.first();
+        SnapshotObjectTO snapshotToPrint = resultGetSnapshot.second();
+
+        s_logger.debug(String.format("Reverting volume [%s] to snapshot [%s].", volumeObjectTo, snapshotToPrint));
+
+        try {
+            replaceVolumeWithSnapshot(volumePath, snapshotPath);
+            s_logger.debug(String.format("Successfully reverted volume [%s] to snapshot [%s].", volumeObjectTo, snapshotToPrint));
+        } catch (IOException ex) {
+            throw new CloudRuntimeException(String.format("Unable to revert volume [%s] to snapshot [%s] due to [%s].", volumeObjectTo, snapshotToPrint, ex.getMessage()), ex);
+        }
+    }
+
+    /**
+     * If the snapshot is backed up on the secondary storage, it will retrieve the snapshot and snapshot path from the secondary storage, otherwise, it will retrieve the snapshot
+     * and the snapshot path from the primary storage.
+     */
+    protected Pair<String, SnapshotObjectTO> getSnapshot(SnapshotObjectTO snapshotOnPrimaryStorage, SnapshotObjectTO snapshotOnSecondaryStorage,
+            KVMStoragePool kvmStoragePoolPrimary, KVMStoragePool kvmStoragePoolSecondary){
+        String snapshotPath = snapshotOnPrimaryStorage.getPath();
+
+        if (Files.exists(Paths.get(snapshotPath))) {
+            return new Pair<>(snapshotPath, snapshotOnPrimaryStorage);
+        }
+
+        if (kvmStoragePoolSecondary == null) {
+            throw new CloudRuntimeException(String.format("Snapshot [%s] does not exists on secondary storage, unable to revert volume [%s] to it.",
+                    snapshotOnSecondaryStorage, snapshotOnSecondaryStorage.getVolume()));
+        }
+
+        s_logger.trace(String.format("Snapshot [%s] does not exists on primary storage [%s], searching snapshot [%s] on secondary storage [%s].", snapshotOnPrimaryStorage,
+                kvmStoragePoolPrimary, snapshotOnSecondaryStorage, kvmStoragePoolSecondary));
+
+        String snapshotPathOnSecondaryStorage = snapshotOnSecondaryStorage.getPath();
+
+        if (snapshotPathOnSecondaryStorage == null) {
+            throw new CloudRuntimeException(String.format("Snapshot [%s] was not found on secondary storage neither, unable to revert volume [%s] to it.",
+                    snapshotOnSecondaryStorage, snapshotOnSecondaryStorage.getVolume()));
+        }
+
+        snapshotPath = getFullPathAccordingToStorage(kvmStoragePoolSecondary, snapshotPathOnSecondaryStorage);
+        return new Pair<>(snapshotPath, snapshotOnSecondaryStorage);
+    }
+
+    /**
+     * Replaces the current volume with the snapshot.
+     * @throws IOException If can't replace the current volume with the snapshot.
+     */
+    protected void replaceVolumeWithSnapshot(String volumePath, String snapshotPath) throws IOException {
+        Files.copy(Paths.get(snapshotPath), Paths.get(volumePath), StandardCopyOption.REPLACE_EXISTING);
+    }
 }
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelper.java
index 2881ed04220..f277f321f01 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelper.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelper.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 
 import javax.naming.ConfigurationException;
 
+import org.apache.log4j.Logger;
 import org.libvirt.Connect;
 import org.libvirt.LibvirtException;
 
@@ -32,6 +33,7 @@ import com.cloud.storage.StorageLayer;
 import com.cloud.storage.template.Processor;
 import com.cloud.storage.template.QCOW2Processor;
 import com.cloud.storage.template.TemplateLocation;
+import com.cloud.utils.Pair;
 import com.cloud.utils.script.Script;
 
 /**
@@ -39,9 +41,15 @@ import com.cloud.utils.script.Script;
  * and the methods wrapped here.
  */
 public class LibvirtUtilitiesHelper {
+    private static final Logger s_logger = Logger.getLogger(LibvirtUtilitiesHelper.class);
 
     public static final int TIMEOUT = 10000;
 
+    /**
+     * Although the flag '--delete' for command 'virsh blockcommit' already exists in Libvirt 1.2.9, its functionality was only implemented on 6.0.0.
+     */
+    private static final int LIBVIRT_VERSION_THAT_SUPPORTS_FLAG_DELETE_ON_COMMAND_VIRSH_BLOCKCOMMIT = 6000000;
+
     public Connect getConnectionByVmName(final String vmName) throws LibvirtException {
         return LibvirtConnection.getConnectionByVmName(vmName);
     }
@@ -109,4 +117,31 @@ public class LibvirtUtilitiesHelper {
                 + "</domainsnapshot>";
         return vmSnapshotXML;
     }
+
+    /**
+     * Validates if the libvirt's version is equal or higher than the parameter.
+     * @param conn The libvirt connection to retrieve the version.
+     * @param version The version to validate against.
+     */
+    protected static Pair<String, Boolean> isLibvirtVersionEqualOrHigherThanVersionInParameter(Connect conn, long version) {
+        try {
+            long currentLibvirtVersion = conn.getLibVirVersion();
+            return new Pair<>(String.valueOf(currentLibvirtVersion), currentLibvirtVersion >= version);
+        } catch (LibvirtException ex) {
+            String exceptionMessage = ex.getMessage();
+            s_logger.error(String.format("Unable to validate if the Libvirt's version is equal or higher than [%s] due to [%s]. Returning 'false' as default'.", version,
+                    exceptionMessage), ex);
+            return new Pair<>(String.format("Unknow due to [%s]", exceptionMessage), false);
+        }
+    }
+
+    /**
+     * Validates if Libvirt supports the flag '--delete' on command 'virsh blockcommit'.
+     */
+    public static boolean isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit(Connect conn) {
+        Pair<String, Boolean> result = isLibvirtVersionEqualOrHigherThanVersionInParameter(conn, LIBVIRT_VERSION_THAT_SUPPORTS_FLAG_DELETE_ON_COMMAND_VIRSH_BLOCKCOMMIT);
+        s_logger.debug(String.format("The current Libvirt's version [%s]%s supports the flag '--delete' on command 'virsh blockcommit'.", result.first(),
+                result.second() ? "" : " does not"));
+        return result.second();
+    }
 }
\ No newline at end of file
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java
index 437abc6fa49..641cd3a31d9 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStorageAdaptor.java
@@ -427,11 +427,6 @@ public class IscsiAdmStorageAdaptor implements StorageAdaptor {
         return destPool.getPhysicalDisk(destVolumeUuid);
     }
 
-    @Override
-    public KVMPhysicalDisk createDiskFromSnapshot(KVMPhysicalDisk snapshot, String snapshotName, String name, KVMStoragePool destPool, int timeout) {
-        throw new UnsupportedOperationException("Creating a disk from a snapshot is not supported in this configuration.");
-    }
-
     @Override
     public boolean refresh(KVMStoragePool pool) {
         return true;
@@ -439,6 +434,11 @@ public class IscsiAdmStorageAdaptor implements StorageAdaptor {
 
     @Override
     public boolean createFolder(String uuid, String path) {
+        return createFolder(uuid, path, null);
+    }
+
+    @Override
+    public boolean createFolder(String uuid, String path, String localPath) {
         throw new UnsupportedOperationException("A folder cannot be created in this configuration.");
     }
 
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStoragePool.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStoragePool.java
index 8e4af764cd6..bee71a644fd 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStoragePool.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/IscsiAdmStoragePool.java
@@ -20,6 +20,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.cloudstack.utils.qemu.QemuImg.PhysicalDiskFormat;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
 import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.StoragePoolType;
@@ -170,4 +172,10 @@ public class IscsiAdmStoragePool implements KVMStoragePool {
     public boolean supportsConfigDriveIso() {
         return false;
     }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this, ToStringStyle.JSON_STYLE).append("uuid", getUuid()).append("path", getLocalPath()).toString();
+    }
+
 }
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
index ab5b9307630..279c25b4479 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStoragePoolManager.java
@@ -419,11 +419,6 @@ public class KVMStoragePoolManager {
         return adaptor.copyPhysicalDisk(disk, name, destPool, timeout);
     }
 
-    public KVMPhysicalDisk createDiskFromSnapshot(KVMPhysicalDisk snapshot, String snapshotName, String name, KVMStoragePool destPool, int timeout) {
-        StorageAdaptor adaptor = getStorageAdaptor(destPool.getType());
-        return adaptor.createDiskFromSnapshot(snapshot, snapshotName, name, destPool, timeout);
-    }
-
     public KVMPhysicalDisk createDiskWithTemplateBacking(KVMPhysicalDisk template, String name, PhysicalDiskFormat format, long size,
                                                          KVMStoragePool destPool, int timeout) {
         StorageAdaptor adaptor = getStorageAdaptor(destPool.getType());
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessor.java
index ead98bbfd68..f73307be131 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessor.java
@@ -27,8 +27,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
-import java.text.MessageFormat;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -70,7 +70,7 @@ import org.apache.cloudstack.utils.qemu.QemuImgException;
 import org.apache.cloudstack.utils.qemu.QemuImgFile;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
+
 import org.apache.log4j.Logger;
 import org.libvirt.Connect;
 import org.libvirt.Domain;
@@ -109,15 +109,18 @@ import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.DiskDef;
 import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.DiskDef.DeviceType;
 import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.DiskDef.DiscardType;
 import com.cloud.hypervisor.kvm.resource.LibvirtVMDef.DiskDef.DiskProtocol;
+import com.cloud.hypervisor.kvm.resource.wrapper.LibvirtUtilitiesHelper;
 import com.cloud.storage.JavaStorageLayer;
 import com.cloud.storage.MigrationOptions;
 import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageLayer;
 import com.cloud.storage.resource.StorageProcessor;
+import static com.cloud.storage.snapshot.SnapshotManager.BackupSnapshotAfterTakingSnapshot;
 import com.cloud.storage.template.Processor;
 import com.cloud.storage.template.Processor.FormatInfo;
 import com.cloud.storage.template.QCOW2Processor;
+import com.cloud.storage.template.TemplateConstants;
 import com.cloud.storage.template.TemplateLocation;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
@@ -125,6 +128,15 @@ import com.cloud.utils.UriUtils;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.script.Script;
 import com.cloud.utils.storage.S3.S3Utils;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
 public class KVMStorageProcessor implements StorageProcessor {
     private static final Logger s_logger = Logger.getLogger(KVMStorageProcessor.class);
@@ -990,7 +1002,7 @@ public class KVMStorageProcessor implements StorageProcessor {
                 }
             } else {
                 final Script command = new Script(_manageSnapshotPath, cmd.getWaitInMillSeconds(), s_logger);
-                command.add("-b", snapshotDisk.getPath());
+                command.add("-b", snapshot.getPath());
                 command.add(NAME_OPTION, snapshotName);
                 command.add("-p", snapshotDestPath);
                 if (isCreatedFromVmSnapshot) {
@@ -1021,49 +1033,18 @@ public class KVMStorageProcessor implements StorageProcessor {
         } finally {
             if (isCreatedFromVmSnapshot) {
                 s_logger.debug("Ignoring removal of vm snapshot on primary as this snapshot is created from vm snapshot");
-            } else {
-                try {
-                    /* Delete the snapshot on primary */
-                    DomainInfo.DomainState state = null;
-                    Domain vm = null;
-                    if (vmName != null) {
-                        try {
-                            vm = resource.getDomain(conn, vmName);
-                            state = vm.getInfo().state;
-                        } catch (final LibvirtException e) {
-                            s_logger.trace("Ignoring libvirt error.", e);
-                        }
-                    }
+            } else if (primaryPool.getType() != StoragePoolType.RBD) {
+                String snapshotPath = snapshot.getPath();
+                String backupSnapshotAfterTakingSnapshot = cmd.getOptions() == null ? null : cmd.getOptions().get(BackupSnapshotAfterTakingSnapshot.key());
 
-                    final KVMStoragePool primaryStorage = storagePoolMgr.getStoragePool(primaryStore.getPoolType(),
-                            primaryStore.getUuid());
-                    if (state == DomainInfo.DomainState.VIR_DOMAIN_RUNNING && !primaryStorage.isExternalSnapshot()) {
-                        final DomainSnapshot snap = vm.snapshotLookupByName(snapshotName);
-                        try {
-                            s_logger.info(String.format("Suspending VM '%s' to delete snapshot,", vm.getName()));
-                            vm.suspend();
-                        } catch (final LibvirtException e) {
-                            s_logger.error("Failed to suspend the VM", e);
-                            throw e;
-                        }
-                        snap.delete(0);
-
-                        /*
-                         * libvirt on RHEL6 doesn't handle resume event emitted from
-                         * qemu
-                         */
-                        vm = resource.getDomain(conn, vmName);
-                        state = vm.getInfo().state;
-                        if (state == DomainInfo.DomainState.VIR_DOMAIN_PAUSED) {
-                            vm.resume();
-                        }
-                    } else {
-                        if (primaryPool.getType() != StoragePoolType.RBD) {
-                            deleteSnapshotViaManageSnapshotScript(snapshotName, snapshotDisk);
-                        }
+                if (backupSnapshotAfterTakingSnapshot == null || BooleanUtils.toBoolean(backupSnapshotAfterTakingSnapshot)) {
+                    try {
+                        Files.deleteIfExists(Paths.get(snapshotPath));
+                    } catch (IOException ex) {
+                        s_logger.error(String.format("Failed to delete snapshot [%s] on primary storage [%s].", snapshotPath, primaryPool.getUuid()), ex);
                     }
-                } catch (final Exception ex) {
-                    s_logger.error("Failed to delete snapshots on primary", ex);
+                } else {
+                    s_logger.debug(String.format("This backup is temporary, not deleting snapshot [%s] on primary storage [%s]", snapshotPath, primaryPool.getUuid()));
                 }
             }
 
@@ -1500,8 +1481,56 @@ public class KVMStorageProcessor implements StorageProcessor {
         }
     }
 
-    protected static final MessageFormat SnapshotXML = new MessageFormat("   <domainsnapshot>" + "       <name>{0}</name>" + "          <domain>"
-            + "            <uuid>{1}</uuid>" + "        </domain>" + "    </domainsnapshot>");
+    /**
+     * XML to take disk-only snapshot of the VM.<br><br>
+     * 1st parameter: snapshot's name;<br>
+     * 2nd parameter: disk's label (target.dev tag from VM's XML);<br>
+     * 3rd parameter: absolute path to create the snapshot;<br>
+     * 4th parameter: list of disks to avoid on snapshot {@link #TAG_AVOID_DISK_FROM_SNAPSHOT};
+     */
+    private static final String XML_CREATE_DISK_SNAPSHOT = "<domainsnapshot><name>%s</name><disks><disk name='%s' snapshot='external'><source file='%s'/></disk>%s</disks>"
+      + "</domainsnapshot>";
+
+    /**
+     * XML to take full VM snapshot.<br><br>
+     * 1st parameter: snapshot's name;<br>
+     * 2nd parameter: domain's UUID;<br>
+     */
+    private static final String XML_CREATE_FULL_VM_SNAPSHOT = "<domainsnapshot><name>%s</name><domain><uuid>%s</uuid></domain></domainsnapshot>";
+
+    /**
+     * Tag to avoid disk from snapshot.<br><br>
+     * 1st parameter: disk's label (target.dev tag from VM's XML);
+     */
+    private static final String TAG_AVOID_DISK_FROM_SNAPSHOT = "<disk name='%s' snapshot='no' />";
+
+    /**
+     * Virsh command to merge (blockcommit) snapshot into the base file.<br><br>
+     * 1st parameter: VM's name;<br>
+     * 2nd parameter: disk's label (target.dev tag from VM's XML);<br>
+     * 3rd parameter: the absolute path of the base file;
+     * 4th parameter: the flag '--delete', if Libvirt supports it. Libvirt started to support it on version <b>6.0.0</b>;
+     */
+    private static final String COMMAND_MERGE_SNAPSHOT = "virsh blockcommit %s %s --base %s --active --wait %s --pivot";
+
+    /**
+     * Flag to take disk-only snapshots from VM.<br><br>
+     * Libvirt lib for java does not have the enum virDomainSnapshotCreateFlags.
+     * @see <a href="https://libvirt.org/html/libvirt-libvirt-domain-snapshot.html">Module libvirt-domain-snapshot from libvirt</a>
+     */
+    private static final int VIR_DOMAIN_SNAPSHOT_CREATE_DISK_ONLY = 16;
+
+    /**
+     * Min rate between available pool and disk size to take disk snapshot.<br><br>
+     * As we are copying the base disk to a folder in the same primary storage, we need at least once more disk size of available space in the primary storage, plus 5% as a
+     * security margin.
+     */
+    private static final double MIN_RATE_BETWEEN_AVAILABLE_POOL_AND_DISK_SIZE_TO_TAKE_DISK_SNAPSHOT = 1.05;
+
+    /**
+     * Message that can occurs when using a QEMU binary that does not support live disk snapshot (e.g. CentOS 7 QEMU binaries).
+     */
+    private static final String LIBVIRT_OPERATION_NOT_SUPPORTED_MESSAGE = "Operation not supported";
 
     @Override
     public Answer createSnapshot(final CreateObjectCommand cmd) {
@@ -1510,6 +1539,7 @@ public class KVMStorageProcessor implements StorageProcessor {
         final VolumeObjectTO volume = snapshotTO.getVolume();
         final String snapshotName = UUID.randomUUID().toString();
         final String vmName = volume.getVmName();
+
         try {
             final Connect conn = LibvirtConnection.getConnectionByVmName(vmName);
             DomainInfo.DomainState state = null;
@@ -1526,15 +1556,34 @@ public class KVMStorageProcessor implements StorageProcessor {
             final KVMStoragePool primaryPool = storagePoolMgr.getStoragePool(primaryStore.getPoolType(), primaryStore.getUuid());
 
             final KVMPhysicalDisk disk = storagePoolMgr.getPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volume.getPath());
+
+            String diskPath = disk.getPath();
+            String snapshotPath = diskPath + File.separator + snapshotName;
             if (state == DomainInfo.DomainState.VIR_DOMAIN_RUNNING && !primaryPool.isExternalSnapshot()) {
-                final String vmUuid = vm.getUUIDString();
-                final Object[] args = new Object[] {snapshotName, vmUuid};
-                final String snapshot = SnapshotXML.format(args);
 
-                final long start = System.currentTimeMillis();
-                vm.snapshotCreateXML(snapshot);
-                final long total = (System.currentTimeMillis() - start)/1000;
-                s_logger.debug("snapshot takes " + total + " seconds to finish");
+                validateAvailableSizeOnPoolToTakeVolumeSnapshot(primaryPool, disk);
+
+                try {
+                    snapshotPath = getSnapshotPathInPrimaryStorage(primaryPool.getLocalPath(), snapshotName);
+
+                    String diskLabel = takeVolumeSnapshot(resource.getDisks(conn, vmName), snapshotName, diskPath, vm);
+                    String copyResult = copySnapshotToPrimaryStorageDir(primaryPool, diskPath, snapshotPath, volume);
+
+                    mergeSnapshotIntoBaseFile(vm, diskLabel, diskPath, snapshotName, volume, conn);
+
+                    validateCopyResult(copyResult, snapshotPath);
+                } catch (LibvirtException e) {
+                    if (!e.getMessage().contains(LIBVIRT_OPERATION_NOT_SUPPORTED_MESSAGE)) {
+                        throw e;
+                    }
+
+                    s_logger.info(String.format("It was not possible to take live disk snapshot for volume [%s], in VM [%s], due to [%s]. We will take full snapshot of the VM"
+                            + " and extract the disk instead. Consider upgrading your QEMU binary.", volume, vmName, e.getMessage()));
+
+                    takeFullVmSnaphotForBinariesThatDoesNotSupportLiveDiskSnapshot(vm, snapshotName, vmName);
+                    primaryPool.createFolder(TemplateConstants.DEFAULT_SNAPSHOT_ROOT_DIR);
+                    extractDiskFromFullVmSnapshot(disk, volume, snapshotPath, snapshotName, vmName, vm);
+                }
 
                 /*
                  * libvirt on RHEL6 doesn't handle resume event emitted from
@@ -1575,7 +1624,7 @@ public class KVMStorageProcessor implements StorageProcessor {
                     } catch (final Exception e) {
                         s_logger.error("A RBD snapshot operation on " + disk.getName() + " failed. The error was: " + e.getMessage());
                     }
-                } else {
+                } else if (primaryPool.getType() == StoragePoolType.CLVM) {
                     /* VM is not running, create a snapshot by ourself */
                     final Script command = new Script(_manageSnapshotPath, _cmdsTimeout, s_logger);
                     command.add(MANAGE_SNAPSTHOT_CREATE_OPTION, disk.getPath());
@@ -1585,19 +1634,251 @@ public class KVMStorageProcessor implements StorageProcessor {
                         s_logger.debug("Failed to manage snapshot: " + result);
                         return new CreateObjectAnswer("Failed to manage snapshot: " + result);
                     }
+                } else {
+                    snapshotPath = getSnapshotPathInPrimaryStorage(primaryPool.getLocalPath(), snapshotName);
+                    String copyResult = copySnapshotToPrimaryStorageDir(primaryPool, diskPath, snapshotPath, volume);
+                    validateCopyResult(copyResult, snapshotPath);
                 }
             }
 
             final SnapshotObjectTO newSnapshot = new SnapshotObjectTO();
-            // NOTE: sort of hack, we'd better just put snapshtoName
-            newSnapshot.setPath(disk.getPath() + File.separator + snapshotName);
+
+            newSnapshot.setPath(snapshotPath);
             return new CreateObjectAnswer(newSnapshot);
-        } catch (final LibvirtException e) {
-            s_logger.debug("Failed to manage snapshot: ", e);
-            return new CreateObjectAnswer("Failed to manage snapshot: " + e.toString());
+        } catch (CloudRuntimeException | LibvirtException | IOException ex) {
+            String errorMsg = String.format("Failed take snapshot for volume [%s], in VM [%s], due to [%s].", volume, vmName, ex.getMessage());
+            s_logger.error(errorMsg, ex);
+            return new CreateObjectAnswer(errorMsg);
+        }
+    }
+
+    protected void deleteFullVmSnapshotAfterConvertingItToExternalDiskSnapshot(Domain vm, String snapshotName, VolumeObjectTO volume, String vmName) throws LibvirtException {
+        s_logger.debug(String.format("Deleting full VM snapshot [%s] of VM [%s] as we already converted it to an external disk snapshot of the volume [%s].", snapshotName, vmName,
+                volume));
+
+        DomainSnapshot domainSnapshot = vm.snapshotLookupByName(snapshotName);
+        domainSnapshot.delete(0);
+    }
+
+    protected void extractDiskFromFullVmSnapshot(KVMPhysicalDisk disk, VolumeObjectTO volume, String snapshotPath, String snapshotName, String vmName, Domain vm)
+            throws LibvirtException {
+        QemuImg qemuImg = new QemuImg(_cmdsTimeout);
+        QemuImgFile srcFile = new QemuImgFile(disk.getPath(), disk.getFormat());
+        QemuImgFile destFile = new QemuImgFile(snapshotPath, disk.getFormat());
+
+        try {
+            s_logger.debug(String.format("Converting full VM snapshot [%s] of VM [%s] to external disk snapshot of the volume [%s].", snapshotName, vmName, volume));
+            qemuImg.convert(srcFile, destFile, null, snapshotName, true);
+        } catch (QemuImgException qemuException) {
+            String message = String.format("Could not convert full VM snapshot [%s] of VM [%s] to external disk snapshot of volume [%s] due to [%s].", snapshotName, vmName, volume,
+                    qemuException.getMessage());
+
+            s_logger.error(message, qemuException);
+            throw new CloudRuntimeException(message, qemuException);
+        } finally {
+            deleteFullVmSnapshotAfterConvertingItToExternalDiskSnapshot(vm, snapshotName, volume, vmName);
+        }
+    }
+
+    protected void takeFullVmSnaphotForBinariesThatDoesNotSupportLiveDiskSnapshot(Domain vm, String snapshotName, String vmName) throws LibvirtException {
+        String vmUuid = vm.getUUIDString();
+
+        long start = System.currentTimeMillis();
+        vm.snapshotCreateXML(String.format(XML_CREATE_FULL_VM_SNAPSHOT, snapshotName, vmUuid));
+        s_logger.debug(String.format("Full VM Snapshot [%s] of VM [%s] took [%s] seconds to finish.", snapshotName, vmName, (System.currentTimeMillis() - start)/1000));
+    }
+
+    protected void validateCopyResult(String copyResult, String snapshotPath) throws CloudRuntimeException, IOException {
+        if (copyResult == null) {
+            return;
+        }
+
+        Files.deleteIfExists(Paths.get(snapshotPath));
+        throw new CloudRuntimeException(copyResult);
+    }
+
+    /**
+     * Merges the snapshot into base file to keep volume and VM behavior after stopping - starting.
+     * @param vm Domain of the VM;
+     * @param diskLabel Disk label to manage snapshot and base file;
+     * @param baseFilePath Path of the base file;
+     * @param snapshotName Name of the snapshot;
+     * @throws LibvirtException
+     */
+    protected void mergeSnapshotIntoBaseFile(Domain vm, String diskLabel, String baseFilePath, String snapshotName, VolumeObjectTO volume,
+            Connect conn) throws LibvirtException {
+        boolean isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit = LibvirtUtilitiesHelper.isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit(conn);
+        String vmName = vm.getName();
+        String mergeCommand = String.format(COMMAND_MERGE_SNAPSHOT, vmName, diskLabel, baseFilePath, isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit ? "--delete" : "");
+        String mergeResult = Script.runSimpleBashScript(mergeCommand);
+
+        if (mergeResult == null) {
+            s_logger.debug(String.format("Successfully merged snapshot [%s] into VM [%s] %s base file.", snapshotName, vmName, volume));
+            manuallyDeleteUnusedSnapshotFile(isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit, getSnapshotTemporaryPath(baseFilePath, snapshotName));
+            return;
+        }
+
+        String errorMsg = String.format("Failed to merge snapshot [%s] into VM [%s] %s base file. Command [%s] resulted in [%s]. If the VM is stopped and then started, it"
+          + " will start to write in the base file again. All changes made between the snapshot and the VM stop will be in the snapshot. If the VM is stopped, the snapshot must be"
+          + " merged into the base file manually.", snapshotName, vmName, volume, mergeCommand, mergeResult);
+
+        s_logger.warn(String.format("%s VM XML: [%s].", errorMsg, vm.getXMLDesc(0)));
+        throw new CloudRuntimeException(errorMsg);
+    }
+
+    /**
+     * Manually deletes the unused snapshot file.<br/>
+     * This method is necessary due to Libvirt created the tag '--delete' on command 'virsh blockcommit' on version <b>1.2.9</b>, however it was only implemented on version
+     *  <b>6.0.0</b>.
+     * @param snapshotPath The unused snapshot file to manually delete.
+     */
+    protected void manuallyDeleteUnusedSnapshotFile(boolean isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit, String snapshotPath) {
+        if (isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit) {
+            s_logger.debug(String.format("The current Libvirt's version supports the flag '--delete' on command 'virsh blockcommit', we will skip the manually deletion of the"
+                    + " unused snapshot file [%s] as it already was automatically deleted.", snapshotPath));
+            return;
+        }
+
+        s_logger.debug(String.format("The current Libvirt's version does not supports the flag '--delete' on command 'virsh blockcommit', therefore we will manually delete the"
+                + " unused snapshot file [%s].", snapshotPath));
+
+        try {
+            Files.deleteIfExists(Paths.get(snapshotPath));
+            s_logger.debug(String.format("Manually deleted unused snapshot file [%s].", snapshotPath));
+        } catch (IOException ex) {
+            throw new CloudRuntimeException(String.format("Unable to manually delete unused snapshot file [%s] due to [%s].", snapshotPath, ex.getMessage()));
+        }
+    }
+
+    /**
+     * Creates the snapshot directory in the primary storage, if it does not exist; then copies the base file (VM's old writing file) to the snapshot dir..
+     * @param primaryPool Storage to create folder, if not exists;
+     * @param baseFile Base file of VM, which will be copied;
+     * @param snapshotPath Path to copy the base file;
+     * @return null if copies successfully or a error message.
+     */
+    protected String copySnapshotToPrimaryStorageDir(KVMStoragePool primaryPool, String baseFile, String snapshotPath, VolumeObjectTO volume) {
+        try {
+            primaryPool.createFolder(TemplateConstants.DEFAULT_SNAPSHOT_ROOT_DIR);
+            Files.copy(Paths.get(baseFile), Paths.get(snapshotPath));
+            s_logger.debug(String.format("Copied %s snapshot from [%s] to [%s].", volume, baseFile, snapshotPath));
+            return null;
+        } catch (IOException ex) {
+            return String.format("Unable to copy %s snapshot [%s] to [%s] due to [%s].", volume, baseFile, snapshotPath, ex.getMessage());
         }
     }
 
+    /**
+     * Retrieves the path of the snapshot on primary storage snapshot's dir.
+     * @param primaryStoragePath Path of the primary storage;
+     * @param snapshotName Snapshot name;
+     * @return the path of the snapshot in primary storage snapshot's dir.
+     */
+    protected String getSnapshotPathInPrimaryStorage(String primaryStoragePath, String snapshotName) {
+        return String.format("%s%s%s%s%s", primaryStoragePath, File.separator, TemplateConstants.DEFAULT_SNAPSHOT_ROOT_DIR, File.separator, snapshotName);
+    }
+
+    /**
+     * Take a volume snapshot of the specified volume.
+     * @param disks List of VM's disks;
+     * @param snapshotName Name of the snapshot;
+     * @param diskPath Path of the disk to take snapshot;
+     * @param vm VM in which disk stay;
+     * @return the disk label in VM's XML.
+     * @throws LibvirtException
+     */
+    protected String takeVolumeSnapshot(List<DiskDef> disks, String snapshotName, String diskPath, Domain vm) throws LibvirtException{
+        Pair<String, Set<String>> diskToSnapshotAndDisksToAvoid = getDiskToSnapshotAndDisksToAvoid(disks, diskPath, vm);
+        String diskLabelToSnapshot = diskToSnapshotAndDisksToAvoid.first();
+        String disksToAvoidsOnSnapshot = diskToSnapshotAndDisksToAvoid.second().stream().map(diskLabel -> String.format(TAG_AVOID_DISK_FROM_SNAPSHOT, diskLabel))
+          .collect(Collectors.joining());
+        String snapshotTemporaryPath = getSnapshotTemporaryPath(diskPath, snapshotName);
+
+        String createSnapshotXmlFormated = String.format(XML_CREATE_DISK_SNAPSHOT, snapshotName, diskLabelToSnapshot, snapshotTemporaryPath, disksToAvoidsOnSnapshot);
+
+        long start = System.currentTimeMillis();
+        vm.snapshotCreateXML(createSnapshotXmlFormated, VIR_DOMAIN_SNAPSHOT_CREATE_DISK_ONLY);
+        s_logger.debug(String.format("Snapshot [%s] took [%s] seconds to finish.", snapshotName, (System.currentTimeMillis() - start)/1000));
+
+        return diskLabelToSnapshot;
+    }
+
+    /**
+     * Retrieves the disk label to take snapshot and, in case that there is more than one disk attached to VM, the disk labels to avoid the snapshot;
+     * @param disks List of VM's disks;
+     * @param diskPath Path of the disk to take snapshot;
+     * @param vm VM in which disks stay;
+     * @return the label to take snapshot and the labels to avoid it. If the disk path not be found in VM's XML or be found more than once, it will throw a CloudRuntimeException.
+     * @throws org.libvirt.LibvirtException
+     */
+    protected Pair<String, Set<String>> getDiskToSnapshotAndDisksToAvoid(List<DiskDef> disks, String diskPath, Domain vm) throws LibvirtException {
+        String diskLabelToSnapshot = null;
+        Set<String> disksToAvoid = new HashSet<>();
+
+        for (DiskDef disk : disks) {
+            String diskDefPath = disk.getDiskPath();
+
+            if (StringUtils.isEmpty(diskDefPath)) {
+                continue;
+            }
+
+            String diskLabel = disk.getDiskLabel();
+
+            if (!diskPath.equals(diskDefPath)) {
+                disksToAvoid.add(diskLabel);
+                continue;
+            }
+
+            if (diskLabelToSnapshot != null) {
+                throw new CloudRuntimeException(String.format("VM [%s] has more than one disk with path [%s]. VM's XML [%s].", vm.getName(), diskPath, vm.getXMLDesc(0)));
+            }
+
+            diskLabelToSnapshot = diskLabel;
+        }
+
+        if (diskLabelToSnapshot == null) {
+            throw new CloudRuntimeException(String.format("VM [%s] has no disk with path [%s]. VM's XML [%s].", vm.getName(), diskPath, vm.getXMLDesc(0)));
+        }
+
+        return new Pair<>(diskLabelToSnapshot, disksToAvoid);
+    }
+
+    /**
+     * Retrieves the temporary path of the snapshot.
+     * @param diskPath Path of the disk to snapshot;
+     * @param snapshotName Snapshot name;
+     * @return the path of the disk replacing the disk with the snapshot.
+     */
+    protected String getSnapshotTemporaryPath(String diskPath, String snapshotName) {
+        String[] diskPathSplitted = diskPath.split(File.separator);
+        diskPathSplitted[diskPathSplitted.length - 1] = snapshotName;
+        return String.join(File.separator, diskPathSplitted);
+    }
+
+    /**
+     * Validate if the primary storage has enough capacity to take a disk snapshot, as the snapshot will duplicate the disk to backup.
+     * @param primaryPool Primary storage to verify capacity;
+     * @param disk Disk that will be snapshotted.
+     */
+    protected void validateAvailableSizeOnPoolToTakeVolumeSnapshot(KVMStoragePool primaryPool, KVMPhysicalDisk disk) {
+        long availablePoolSize = primaryPool.getAvailable();
+        String poolDescription = new ToStringBuilder(primaryPool, ToStringStyle.JSON_STYLE).append("uuid", primaryPool.getUuid()).append("localPath", primaryPool.getLocalPath())
+                .toString();
+        String diskDescription = new ToStringBuilder(disk, ToStringStyle.JSON_STYLE).append("name", disk.getName()).append("path", disk.getPath()).append("size", disk.getSize())
+                .toString();
+
+        if (isAvailablePoolSizeDividedByDiskSizeLesserThanMinRate(availablePoolSize, disk.getSize())) {
+            throw new CloudRuntimeException(String.format("Pool [%s] available size [%s] must be at least once more of disk [%s] size, plus 5%%. Not taking snapshot.", poolDescription, availablePoolSize,
+                diskDescription));
+        }
+
+        s_logger.debug(String.format("Pool [%s] has enough available size [%s] to take volume [%s] snapshot.", poolDescription, availablePoolSize, diskDescription));
+    }
+
+    protected boolean isAvailablePoolSizeDividedByDiskSizeLesserThanMinRate(long availablePoolSize, long diskSize) {
+        return ((availablePoolSize * 1d) / (diskSize * 1d)) < MIN_RATE_BETWEEN_AVAILABLE_POOL_AND_DISK_SIZE_TO_TAKE_DISK_SNAPSHOT;
+    }
+
     private Rados radosConnect(final KVMStoragePool primaryPool) throws RadosException {
         Rados r = new Rados(primaryPool.getAuthUserName());
         r.confSet(CEPH_MON_HOST, primaryPool.getSourceHost() + ":" + primaryPool.getSourcePort());
@@ -1668,6 +1949,9 @@ public class KVMStorageProcessor implements StorageProcessor {
         }
     }
 
+    private List<StoragePoolType> storagePoolTypesToDeleteSnapshotFile = Arrays.asList(StoragePoolType.Filesystem, StoragePoolType.NetworkFilesystem,
+            StoragePoolType.SharedMountPoint);
+
     private KVMPhysicalDisk createVolumeFromRBDSnapshot(CopyCommand cmd, DataTO destData,
             PrimaryDataStoreTO pool, DataStoreTO imageStore, VolumeObjectTO volume, String snapshotName, KVMPhysicalDisk disk) {
         PrimaryDataStoreTO primaryStore = (PrimaryDataStoreTO) imageStore;
@@ -1790,56 +2074,71 @@ public class KVMStorageProcessor implements StorageProcessor {
 
     @Override
     public Answer deleteSnapshot(final DeleteCommand cmd) {
-        String snap_full_name = "";
+        String snapshotFullName = "";
         try {
             SnapshotObjectTO snapshotTO = (SnapshotObjectTO) cmd.getData();
             PrimaryDataStoreTO primaryStore = (PrimaryDataStoreTO) snapshotTO.getDataStore();
-            VolumeObjectTO volume = snapshotTO.getVolume();
             KVMStoragePool primaryPool = storagePoolMgr.getStoragePool(primaryStore.getPoolType(), primaryStore.getUuid());
-            KVMPhysicalDisk disk = storagePoolMgr.getPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volume.getPath());
             String snapshotFullPath = snapshotTO.getPath();
             String snapshotName = snapshotFullPath.substring(snapshotFullPath.lastIndexOf("/") + 1);
-            snap_full_name = disk.getName() + "@" + snapshotName;
+            snapshotFullName = snapshotName;
             if (primaryPool.getType() == StoragePoolType.RBD) {
+                VolumeObjectTO volume = snapshotTO.getVolume();
+                KVMPhysicalDisk disk = storagePoolMgr.getPhysicalDisk(primaryStore.getPoolType(), primaryStore.getUuid(), volume.getPath());
+                snapshotFullName = disk.getName() + "@" + snapshotName;
                 Rados r = radosConnect(primaryPool);
                 IoCTX io = r.ioCtxCreate(primaryPool.getSourceDir());
                 Rbd rbd = new Rbd(io);
                 RbdImage image = rbd.open(disk.getName());
                 try {
-                    s_logger.info("Attempting to remove RBD snapshot " + snap_full_name);
+                    s_logger.info("Attempting to remove RBD snapshot " + snapshotFullName);
                     if (image.snapIsProtected(snapshotName)) {
-                        s_logger.debug("Unprotecting RBD snapshot " + snap_full_name);
+                        s_logger.debug("Unprotecting RBD snapshot " + snapshotFullName);
                         image.snapUnprotect(snapshotName);
                     }
                     image.snapRemove(snapshotName);
-                    s_logger.info("Snapshot " + snap_full_name + " successfully removed from " +
+                    s_logger.info("Snapshot " + snapshotFullName + " successfully removed from " +
                             primaryPool.getType().toString() + "  pool.");
                 } catch (RbdException e) {
-                    s_logger.error("Failed to remove snapshot " + snap_full_name + ", with exception: " + e.toString() +
+                    s_logger.error("Failed to remove snapshot " + snapshotFullName + ", with exception: " + e.toString() +
                         ", RBD error: " + ErrorCode.getErrorMessage(e.getReturnValue()));
                 } finally {
                     rbd.close(image);
                     r.ioCtxDestroy(io);
                 }
-            } else if (primaryPool.getType() == StoragePoolType.NetworkFilesystem || primaryPool.getType() == StoragePoolType.Filesystem) {
-                s_logger.info(String.format("Deleting snapshot (id=%s, name=%s, path=%s, storage type=%s) on primary storage", snapshotTO.getId(), snapshotTO.getName(), snapshotTO.getPath(), primaryPool.getType()));
-                deleteSnapshotViaManageSnapshotScript(snapshotName, disk);
+            } else if (storagePoolTypesToDeleteSnapshotFile.contains(primaryPool.getType())) {
+                s_logger.info(String.format("Deleting snapshot (id=%s, name=%s, path=%s, storage type=%s) on primary storage", snapshotTO.getId(), snapshotTO.getName(),
+                        snapshotTO.getPath(), primaryPool.getType()));
+                deleteSnapshotFile(snapshotTO);
             } else {
                 s_logger.warn("Operation not implemented for storage pool type of " + primaryPool.getType().toString());
                 throw new InternalErrorException("Operation not implemented for storage pool type of " + primaryPool.getType().toString());
             }
-            return new Answer(cmd, true, "Snapshot " + snap_full_name + " removed successfully.");
+            return new Answer(cmd, true, "Snapshot " + snapshotFullName + " removed successfully.");
         } catch (RadosException e) {
-            s_logger.error("Failed to remove snapshot " + snap_full_name + ", with exception: " + e.toString() +
+            s_logger.error("Failed to remove snapshot " + snapshotFullName + ", with exception: " + e.toString() +
                 ", RBD error: " + ErrorCode.getErrorMessage(e.getReturnValue()));
-            return new Answer(cmd, false, "Failed to remove snapshot " + snap_full_name);
+            return new Answer(cmd, false, "Failed to remove snapshot " + snapshotFullName);
         } catch (RbdException e) {
-            s_logger.error("Failed to remove snapshot " + snap_full_name + ", with exception: " + e.toString() +
+            s_logger.error("Failed to remove snapshot " + snapshotFullName + ", with exception: " + e.toString() +
                 ", RBD error: " + ErrorCode.getErrorMessage(e.getReturnValue()));
-            return new Answer(cmd, false, "Failed to remove snapshot " + snap_full_name);
+            return new Answer(cmd, false, "Failed to remove snapshot " + snapshotFullName);
         } catch (Exception e) {
-            s_logger.error("Failed to remove snapshot " + snap_full_name + ", with exception: " + e.toString());
-            return new Answer(cmd, false, "Failed to remove snapshot " + snap_full_name);
+            s_logger.error("Failed to remove snapshot " + snapshotFullName + ", with exception: " + e.toString());
+            return new Answer(cmd, false, "Failed to remove snapshot " + snapshotFullName);
+        }
+    }
+
+    /**
+     * Deletes the snapshot's file.
+     * @throws CloudRuntimeException If can't delete the snapshot file.
+     */
+    protected void deleteSnapshotFile(SnapshotObjectTO snapshotObjectTo) throws CloudRuntimeException {
+        try {
+            Files.deleteIfExists(Paths.get(snapshotObjectTo.getPath()));
+            s_logger.debug(String.format("Deleted snapshot [%s].", snapshotObjectTo));
+        } catch (IOException ex) {
+            throw new CloudRuntimeException(String.format("Unable to delete snapshot [%s] due to [%s].", snapshotObjectTo, ex.getMessage()));
         }
     }
 
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStorageAdaptor.java
index 6eafe16bb0c..d29f0756816 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStorageAdaptor.java
@@ -94,7 +94,20 @@ public class LibvirtStorageAdaptor implements StorageAdaptor {
 
     @Override
     public boolean createFolder(String uuid, String path) {
+        return createFolder(uuid, path, null);
+    }
+
+    @Override
+    public boolean createFolder(String uuid, String path, String localPath) {
         String mountPoint = _mountPoint + File.separator + uuid;
+
+        if (localPath != null) {
+            s_logger.debug(String.format("Pool [%s] is of type local or shared mount point; therefore, we will use the local path [%s] to create the folder [%s] (if it does not"
+                    + " exist).", uuid, localPath, path));
+
+            mountPoint = localPath;
+        }
+
         File f = new File(mountPoint + File.separator + path);
         if (!f.exists()) {
             f.mkdirs();
@@ -1401,38 +1414,6 @@ public class LibvirtStorageAdaptor implements StorageAdaptor {
         return newDisk;
     }
 
-    @Override
-    public KVMPhysicalDisk createDiskFromSnapshot(KVMPhysicalDisk snapshot, String snapshotName, String name, KVMStoragePool destPool, int timeout) {
-        s_logger.info("Creating volume " + name + " from snapshot " + snapshotName + " in pool " + destPool.getUuid() +
-                " (" + destPool.getType().toString() + ")");
-
-        PhysicalDiskFormat format = snapshot.getFormat();
-        long size = snapshot.getSize();
-        String destPath = destPool.getLocalPath().endsWith("/") ?
-                destPool.getLocalPath() + name :
-                destPool.getLocalPath() + "/" + name;
-
-        if (destPool.getType() == StoragePoolType.NetworkFilesystem) {
-            try {
-                if (format == PhysicalDiskFormat.QCOW2) {
-                    QemuImg qemu = new QemuImg(timeout);
-                    QemuImgFile destFile = new QemuImgFile(destPath, format);
-                    if (size > snapshot.getVirtualSize()) {
-                        destFile.setSize(size);
-                    } else {
-                        destFile.setSize(snapshot.getVirtualSize());
-                    }
-                    QemuImgFile srcFile = new QemuImgFile(snapshot.getPath(), snapshot.getFormat());
-                    qemu.convert(srcFile, destFile, snapshotName);
-                }
-            } catch (QemuImgException | LibvirtException e) {
-                s_logger.error("Failed to create " + destPath +
-                        " due to a failed executing of qemu-img: " + e.getMessage());
-            }
-        }
-        return destPool.getPhysicalDisk(name);
-    }
-
     @Override
     public boolean refresh(KVMStoragePool pool) {
         LibvirtStoragePool libvirtPool = (LibvirtStoragePool)pool;
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStoragePool.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStoragePool.java
index b2e8decfcb1..f5e9685bf0a 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStoragePool.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LibvirtStoragePool.java
@@ -24,6 +24,8 @@ import org.apache.log4j.Logger;
 import org.libvirt.StoragePool;
 
 import org.apache.cloudstack.utils.qemu.QemuImg.PhysicalDiskFormat;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
 import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.StoragePoolType;
@@ -269,7 +271,7 @@ public class LibvirtStoragePool implements KVMStoragePool {
 
     @Override
     public boolean createFolder(String path) {
-        return this._storageAdaptor.createFolder(this.uuid, path);
+        return this._storageAdaptor.createFolder(this.uuid, path, this.type == StoragePoolType.Filesystem ? this.localPath : null);
     }
 
     @Override
@@ -279,4 +281,9 @@ public class LibvirtStoragePool implements KVMStoragePool {
         }
         return false;
     }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this, ToStringStyle.JSON_STYLE).append("uuid", getUuid()).append("path", getLocalPath()).toString();
+    }
 }
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LinstorStorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LinstorStorageAdaptor.java
index dc00601674b..1e6ddb13738 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LinstorStorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/LinstorStorageAdaptor.java
@@ -428,18 +428,6 @@ public class LinstorStorageAdaptor implements StorageAdaptor {
         return dstDisk;
     }
 
-    @Override
-    public KVMPhysicalDisk createDiskFromSnapshot(
-        KVMPhysicalDisk snapshot,
-        String snapshotName,
-        String name,
-        KVMStoragePool destPool,
-        int timeout)
-    {
-        s_logger.debug("Linstor: createDiskFromSnapshot");
-        return null;
-    }
-
     @Override
     public boolean refresh(KVMStoragePool pool)
     {
@@ -448,8 +436,12 @@ public class LinstorStorageAdaptor implements StorageAdaptor {
     }
 
     @Override
-    public boolean createFolder(String uuid, String path)
-    {
+    public boolean createFolder(String uuid, String path) {
+        return createFolder(uuid, path, null);
+    }
+
+    @Override
+    public boolean createFolder(String uuid, String path, String localPath) {
         throw new UnsupportedOperationException("A folder cannot be created in this configuration.");
     }
 
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ManagedNfsStorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ManagedNfsStorageAdaptor.java
index 6db2f82beb4..0a219d4a477 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ManagedNfsStorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ManagedNfsStorageAdaptor.java
@@ -294,11 +294,6 @@ public class ManagedNfsStorageAdaptor implements StorageAdaptor {
         throw new UnsupportedOperationException("Copying a disk is not supported in this configuration.");
     }
 
-    @Override
-    public KVMPhysicalDisk createDiskFromSnapshot(KVMPhysicalDisk snapshot, String snapshotName, String name, KVMStoragePool destPool, int timeout) {
-        throw new UnsupportedOperationException("Creating a disk from a snapshot is not supported in this configuration.");
-    }
-
     @Override
     public boolean refresh(KVMStoragePool pool) {
         return true;
@@ -306,6 +301,11 @@ public class ManagedNfsStorageAdaptor implements StorageAdaptor {
 
     @Override
     public boolean createFolder(String uuid, String path) {
+        return createFolder(uuid, path, null);
+    }
+
+    @Override
+    public boolean createFolder(String uuid, String path, String localPath) {
         String mountPoint = _mountPoint + File.separator + uuid;
         File f = new File(mountPoint + File.separator + path);
         if (!f.exists()) {
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
index 6151c906f78..fcaa5272ebb 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/ScaleIOStorageAdaptor.java
@@ -288,11 +288,6 @@ public class ScaleIOStorageAdaptor implements StorageAdaptor {
         return destDisk;
     }
 
-    @Override
-    public KVMPhysicalDisk createDiskFromSnapshot(KVMPhysicalDisk snapshot, String snapshotName, String name, KVMStoragePool destPool, int timeout) {
-        return null;
-    }
-
     @Override
     public boolean refresh(KVMStoragePool pool) {
         return true;
@@ -305,9 +300,15 @@ public class ScaleIOStorageAdaptor implements StorageAdaptor {
 
     @Override
     public boolean createFolder(String uuid, String path) {
+        return createFolder(uuid, path, null);
+    }
+
+    @Override
+    public boolean createFolder(String uuid, String path, String localPath) {
         return true;
     }
 
+
     @Override
     public KVMPhysicalDisk createDiskFromTemplateBacking(KVMPhysicalDisk template, String name, QemuImg.PhysicalDiskFormat format, long size, KVMStoragePool destPool, int timeout) {
         return null;
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
index 570c2070c75..bc048d6cf26 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/storage/StorageAdaptor.java
@@ -66,14 +66,14 @@ public interface StorageAdaptor {
 
     public KVMPhysicalDisk copyPhysicalDisk(KVMPhysicalDisk disk, String name, KVMStoragePool destPools, int timeout);
 
-    public KVMPhysicalDisk createDiskFromSnapshot(KVMPhysicalDisk snapshot, String snapshotName, String name, KVMStoragePool destPool, int timeout);
-
     public boolean refresh(KVMStoragePool pool);
 
     public boolean deleteStoragePool(KVMStoragePool pool);
 
     public boolean createFolder(String uuid, String path);
 
+    public boolean createFolder(String uuid, String path, String localPath);
+
     /**
      * Creates disk using template backing.
      * Precondition: Template is on destPool
diff --git a/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java b/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java
index 53974f6f70a..351ec1031e3 100644
--- a/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java
+++ b/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuImg.java
@@ -25,10 +25,13 @@ import com.cloud.storage.Storage;
 import com.cloud.utils.script.OutputInterpreter;
 import com.cloud.utils.script.Script;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
 import org.apache.commons.lang.NotImplementedException;
 import org.libvirt.LibvirtException;
 
 public class QemuImg {
+    private Logger logger = Logger.getLogger(this.getClass());
+
     public final static String BACKING_FILE = "backing_file";
     public final static String BACKING_FILE_FORMAT = "backing_file_format";
     public final static String CLUSTER_SIZE = "cluster_size";
@@ -276,14 +279,7 @@ public class QemuImg {
             script.add(optionsStr);
         }
 
-        if (StringUtils.isNotBlank(snapshotName)) {
-            if (!forceSourceFormat) {
-                script.add("-f");
-                script.add(srcFile.getFormat().toString());
-            }
-            script.add("-s");
-            script.add(snapshotName);
-        }
+        addSnapshotToConvertCommand(srcFile.getFormat().toString(), snapshotName, forceSourceFormat, script, version);
 
         script.add(srcFile.getFileName());
         script.add(destFile.getFileName());
@@ -298,6 +294,39 @@ public class QemuImg {
         }
     }
 
+    /**
+     * Qemu version 2.0.0 added (via commit <a href="https://github.com/qemu/qemu/commit/ef80654d0dc1edf2dd2a51feff8cc3e1102a6583">ef80654d0dc1edf2dd2a51feff8cc3e1102a6583</a>) the
+     * flag "-l" to inform the snapshot name or ID
+     */
+    private static final int QEMU_VERSION_THAT_ADDS_FLAG_L_TO_CONVERT_SNAPSHOT = 2000000;
+
+    /**
+     * Adds a flag to inform snapshot name or ID on conversion. If the QEMU version is less than {@link QemuImg#QEMU_VERSION_THAT_ADDS_FLAG_L_TO_CONVERT_SNAPSHOT), adds the
+     * flag "-s", otherwise, adds the flag "-l".
+     */
+    protected void addSnapshotToConvertCommand(String srcFormat, String snapshotName, boolean forceSourceFormat, Script script, Long qemuVersion) {
+        if (StringUtils.isBlank(snapshotName)) {
+            return;
+        }
+
+        if (qemuVersion >= QEMU_VERSION_THAT_ADDS_FLAG_L_TO_CONVERT_SNAPSHOT) {
+            script.add("-l");
+            script.add(String.format("snapshot.name=%s", snapshotName));
+            return;
+        }
+
+        logger.debug(String.format("Current QEMU version [%s] does not support flag \"-l\" (added on version >= 2.0.0) to inform the snapshot name or ID on conversion."
+                + " Adding the old flag \"-s\" instead.", qemuVersion));
+
+        if (!forceSourceFormat) {
+            script.add("-f");
+            script.add(srcFormat);
+        }
+
+        script.add("-s");
+        script.add(snapshotName);
+    }
+
     /**
      * Convert a image from source to destination
      *
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapperTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapperTest.java
new file mode 100644
index 00000000000..8333c6e3746
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtRevertSnapshotCommandWrapperTest.java
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.cloudstack.storage.to.SnapshotObjectTO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.cloud.agent.api.to.DataStoreTO;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+@RunWith(PowerMockRunner.class)
+public class LibvirtRevertSnapshotCommandWrapperTest {
+
+    LibvirtRevertSnapshotCommandWrapper libvirtRevertSnapshotCommandWrapperSpy = Mockito.spy(LibvirtRevertSnapshotCommandWrapper.class);
+
+    @Mock
+    KVMStoragePool kvmStoragePoolPrimaryMock;
+
+    @Mock
+    KVMStoragePool kvmStoragePoolSecondaryMock;
+
+    @Mock
+    Path pathMock;
+
+    @Mock
+    SnapshotObjectTO snapshotObjectToPrimaryMock;
+
+    @Mock
+    SnapshotObjectTO snapshotObjectToSecondaryMock;
+
+    @Mock
+    Pair<String, SnapshotObjectTO> pairStringSnapshotObjectToMock;
+
+    @Mock
+    DataStoreTO dataStoreToMock;
+
+    @Mock
+    VolumeObjectTO volumeObjectToMock;
+
+    @Test
+    public void validateGetFullPathAccordingToStorage() {
+        String snapshotPath = "snapshotPath";
+        String storagePath = "storagePath";
+        String expectedResult = String.format("%s%s%s", storagePath, File.separator, snapshotPath);
+
+        Mockito.doReturn(storagePath).when(kvmStoragePoolPrimaryMock).getLocalPath();
+        String result = libvirtRevertSnapshotCommandWrapperSpy.getFullPathAccordingToStorage(kvmStoragePoolPrimaryMock, snapshotPath);
+
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test
+    @PrepareForTest(LibvirtRevertSnapshotCommandWrapper.class)
+    public void validateReplaceVolumeWithSnapshotReplaceFiles() throws IOException {
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.copy(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.any(CopyOption.class))).thenReturn(pathMock);
+        libvirtRevertSnapshotCommandWrapperSpy.replaceVolumeWithSnapshot("test", "test");
+    }
+
+    @Test (expected = IOException.class)
+    @PrepareForTest(LibvirtRevertSnapshotCommandWrapper.class)
+    public void validateReplaceVolumeWithSnapshotThrowsIOException() throws IOException {
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.copy(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.any(CopyOption.class))).thenThrow(IOException.class);
+        libvirtRevertSnapshotCommandWrapperSpy.replaceVolumeWithSnapshot("test", "test");
+    }
+
+    @Test
+    @PrepareForTest(LibvirtRevertSnapshotCommandWrapper.class)
+    public void validateGetSnapshotPathExistsOnPrimaryStorage() {
+        String snapshotPath = "test";
+        Pair<String, SnapshotObjectTO> expectedResult = new Pair<>(snapshotPath, snapshotObjectToPrimaryMock);
+
+        Mockito.doReturn(snapshotPath).when(snapshotObjectToPrimaryMock).getPath();
+
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.exists(Mockito.any(Path.class), Mockito.any())).thenReturn(true);
+
+        Pair<String, SnapshotObjectTO> result = libvirtRevertSnapshotCommandWrapperSpy.getSnapshot(snapshotObjectToPrimaryMock, snapshotObjectToSecondaryMock,
+                kvmStoragePoolPrimaryMock, kvmStoragePoolSecondaryMock);
+
+        Assert.assertEquals(expectedResult.first(), result.first());
+        Assert.assertEquals(expectedResult.second(), result.second());
+    }
+
+    @Test
+    @PrepareForTest(LibvirtRevertSnapshotCommandWrapper.class)
+    public void validateGetSnapshotPathExistsOnSecondaryStorage() {
+        String snapshotPath = "test";
+        Pair<String, SnapshotObjectTO> expectedResult = new Pair<>(snapshotPath, snapshotObjectToSecondaryMock);
+
+        PowerMockito.mockStatic(Files.class, Paths.class);
+        PowerMockito.when(Paths.get(Mockito.any(), Mockito.any())).thenReturn(pathMock);
+        PowerMockito.when(Files.exists(Mockito.any(Path.class), Mockito.any())).thenReturn(false);
+
+        Mockito.doReturn(snapshotPath).when(snapshotObjectToSecondaryMock).getPath();
+        Mockito.doReturn(snapshotPath).when(libvirtRevertSnapshotCommandWrapperSpy).getFullPathAccordingToStorage(Mockito.any(), Mockito.any());
+
+        Pair<String, SnapshotObjectTO> result = libvirtRevertSnapshotCommandWrapperSpy.getSnapshot(snapshotObjectToPrimaryMock, snapshotObjectToSecondaryMock,
+                kvmStoragePoolPrimaryMock, kvmStoragePoolSecondaryMock);
+
+        Assert.assertEquals(expectedResult.first(), result.first());
+        Assert.assertEquals(expectedResult.second(), result.second());
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    @PrepareForTest(LibvirtRevertSnapshotCommandWrapper.class)
+    public void validateGetSnapshotPathDoesNotExistsOnSecondaryStorageThrows() {
+        PowerMockito.mockStatic(Files.class, Paths.class);
+        PowerMockito.when(Paths.get(Mockito.any(), Mockito.any())).thenReturn(pathMock);
+        PowerMockito.when(Files.exists(Mockito.any(Path.class), Mockito.any())).thenReturn(false);
+
+        Mockito.doReturn(null).when(snapshotObjectToSecondaryMock).getPath();
+
+        libvirtRevertSnapshotCommandWrapperSpy.getSnapshot(snapshotObjectToPrimaryMock, snapshotObjectToSecondaryMock,
+                kvmStoragePoolPrimaryMock, kvmStoragePoolSecondaryMock);
+    }
+
+    @Test
+    public void validateRevertVolumeToSnapshotReplaceSuccessfully() throws IOException {
+        Mockito.doReturn(volumeObjectToMock).when(snapshotObjectToSecondaryMock).getVolume();
+        Mockito.doReturn("").when(libvirtRevertSnapshotCommandWrapperSpy).getFullPathAccordingToStorage(Mockito.any(), Mockito.anyString());
+        Mockito.doReturn(pairStringSnapshotObjectToMock).when(libvirtRevertSnapshotCommandWrapperSpy).getSnapshot(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.doNothing().when(libvirtRevertSnapshotCommandWrapperSpy).replaceVolumeWithSnapshot(Mockito.any(), Mockito.any());
+        libvirtRevertSnapshotCommandWrapperSpy.revertVolumeToSnapshot(snapshotObjectToPrimaryMock, snapshotObjectToSecondaryMock, dataStoreToMock, kvmStoragePoolPrimaryMock,
+                kvmStoragePoolSecondaryMock);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void validateRevertVolumeToSnapshotReplaceVolumeThrowsIOException() throws IOException {
+        Mockito.doReturn(volumeObjectToMock).when(snapshotObjectToSecondaryMock).getVolume();
+        Mockito.doReturn("").when(libvirtRevertSnapshotCommandWrapperSpy).getFullPathAccordingToStorage(Mockito.any(), Mockito.anyString());
+        Mockito.doReturn(pairStringSnapshotObjectToMock).when(libvirtRevertSnapshotCommandWrapperSpy).getSnapshot(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito.doThrow(IOException.class).when(libvirtRevertSnapshotCommandWrapperSpy).replaceVolumeWithSnapshot(Mockito.any(), Mockito.any());
+        libvirtRevertSnapshotCommandWrapperSpy.revertVolumeToSnapshot(snapshotObjectToPrimaryMock, snapshotObjectToSecondaryMock, dataStoreToMock, kvmStoragePoolPrimaryMock,
+                kvmStoragePoolSecondaryMock);
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelperTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelperTest.java
index a43b342228f..bd8a55983d4 100644
--- a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelperTest.java
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtUtilitiesHelperTest.java
@@ -16,42 +16,63 @@
 // under the License.
 package com.cloud.hypervisor.kvm.resource.wrapper;
 
-import java.io.File;
-import java.util.UUID;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.libvirt.Connect;
+import org.libvirt.LibvirtException;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-import com.cloud.utils.script.Script;
+import com.cloud.utils.Pair;
 
 import junit.framework.TestCase;
 
+@RunWith(PowerMockRunner.class)
 public class LibvirtUtilitiesHelperTest extends TestCase {
 
-    public void testGenerateUUID() {
-        LibvirtUtilitiesHelper helper = new LibvirtUtilitiesHelper();
-        UUID uuid = UUID.fromString(helper.generateUUIDName());
-        assertEquals(4, uuid.version());
+    LibvirtUtilitiesHelper libvirtUtilitiesHelperSpy = Mockito.spy(LibvirtUtilitiesHelper.class);
+
+    @Mock
+    Connect connectMock;
+
+    @Test
+    public void validateIsLibvirtVersionEqualOrHigherThanVersionInParameterExceptionOnRetrievingLibvirtVersionReturnsFalse() throws LibvirtException {
+        Mockito.doThrow(LibvirtException.class).when(connectMock).getLibVirVersion();
+        Pair<String, Boolean> result = LibvirtUtilitiesHelper.isLibvirtVersionEqualOrHigherThanVersionInParameter(connectMock, 0l);
+
+        Assert.assertEquals("Unknow due to [null]", result.first());
+        Assert.assertFalse(result.second());
     }
 
-    public void testSSHKeyPaths() {
-        LibvirtUtilitiesHelper helper = new LibvirtUtilitiesHelper();
-        /* These paths are hardcoded in LibvirtComputingResource and we should
-         * verify that they do not change.
-         * Hardcoded paths are not what we want in the longer run
-         */
-        assertEquals("/root/.ssh", helper.retrieveSshKeysPath());
-        assertEquals("/root/.ssh" + File.separator + "id_rsa.pub.cloud", helper.retrieveSshPubKeyPath());
-        assertEquals("/root/.ssh" + File.separator + "id_rsa.cloud", helper.retrieveSshPrvKeyPath());
+    @Test
+    public void validateIsLibvirtVersionEqualOrHigherThanVersionInParameterLibvirtVersionIsLowerThanParameterReturnsFalse() throws LibvirtException {
+        long libvirtVersion = 9l;
+        Mockito.doReturn(libvirtVersion).when(connectMock).getLibVirVersion();
+        Pair<String, Boolean> result = LibvirtUtilitiesHelper.isLibvirtVersionEqualOrHigherThanVersionInParameter(connectMock, 10l);
+
+        Assert.assertEquals(String.valueOf(libvirtVersion), result.first());
+        Assert.assertFalse(result.second());
     }
 
-    public void testBashScriptPath() {
-        LibvirtUtilitiesHelper helper = new LibvirtUtilitiesHelper();
-        assertEquals("/bin/bash", helper.retrieveBashScriptPath());
+    @Test
+    public void validateIsLibvirtVersionEqualOrHigherThanVersionInParameterLibvirtVersionIsEqualsToParameterReturnsTrue() throws LibvirtException {
+        long libvirtVersion = 10l;
+        Mockito.doReturn(libvirtVersion).when(connectMock).getLibVirVersion();
+        Pair<String, Boolean> result = LibvirtUtilitiesHelper.isLibvirtVersionEqualOrHigherThanVersionInParameter(connectMock, 10l);
+
+        Assert.assertEquals(String.valueOf(libvirtVersion), result.first());
+        Assert.assertTrue(result.second());
     }
 
-    public void testBuildScript() {
-        LibvirtUtilitiesHelper helper = new LibvirtUtilitiesHelper();
-        String path = "/path/to/my/script";
-        Script script = helper.buildScript(path);
-        assertEquals(path + " ", script.toString());
-        assertEquals(LibvirtUtilitiesHelper.TIMEOUT, script.getTimeout());
+    @Test
+    public void validateIsLibvirtVersionEqualOrHigherThanVersionInParameterLibvirtVersionIsHigherThanParameterReturnsTrue() throws LibvirtException {
+        long libvirtVersion = 11l;
+        Mockito.doReturn(libvirtVersion).when(connectMock).getLibVirVersion();
+        Pair<String, Boolean> result = LibvirtUtilitiesHelper.isLibvirtVersionEqualOrHigherThanVersionInParameter(connectMock, 10l);
+
+        Assert.assertEquals(String.valueOf(libvirtVersion), result.first());
+        Assert.assertTrue(result.second());
     }
 }
diff --git a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessorTest.java b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessorTest.java
index 36d957038a2..9f6a46a3c7e 100644
--- a/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessorTest.java
+++ b/plugins/hypervisors/kvm/src/test/java/com/cloud/hypervisor/kvm/storage/KVMStorageProcessorTest.java
@@ -19,18 +19,37 @@
 package com.cloud.hypervisor.kvm.storage;
 
 import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.resource.LibvirtVMDef;
+import com.cloud.hypervisor.kvm.resource.wrapper.LibvirtUtilitiesHelper;
+import com.cloud.storage.template.TemplateConstants;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
 import javax.naming.ConfigurationException;
 
 import com.cloud.utils.script.Script;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.cloudstack.storage.to.SnapshotObjectTO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.libvirt.Connect;
+import org.libvirt.Domain;
+import org.libvirt.LibvirtException;
 import org.mockito.InjectMocks;
 import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -47,6 +66,27 @@ public class KVMStorageProcessorTest {
     @InjectMocks
     private KVMStorageProcessor storageProcessor;
 
+    @Spy
+    KVMStorageProcessor storageProcessorSpy = new KVMStorageProcessor(storagePoolManager, resource);
+
+    @Mock
+    Pair<String, Set<String>> diskToSnapshotAndDisksToAvoidMock;
+
+    @Mock
+    Domain domainMock;
+
+    @Mock
+    KVMStoragePool kvmStoragePoolMock;
+
+    @Mock
+    VolumeObjectTO volumeObjectToMock;
+
+    @Mock
+    SnapshotObjectTO snapshotObjectToMock;
+
+    @Mock
+    Connect connectMock;
+
     private static final String directDownloadTemporaryPath = "/var/lib/libvirt/images/dd";
     private static final long templateSize = 80000L;
 
@@ -54,18 +94,20 @@ public class KVMStorageProcessorTest {
     public void setUp() throws ConfigurationException {
         MockitoAnnotations.initMocks(this);
         storageProcessor = new KVMStorageProcessor(storagePoolManager, resource);
-        PowerMockito.mockStatic(Script.class);
-        Mockito.when(resource.getDirectDownloadTemporaryDownloadPath()).thenReturn(directDownloadTemporaryPath);
     }
 
     @Test
     public void testIsEnoughSpaceForDownloadTemplateOnTemporaryLocationAssumeEnoughSpaceWhenNotProvided() {
+        PowerMockito.mockStatic(Script.class);
+        Mockito.when(resource.getDirectDownloadTemporaryDownloadPath()).thenReturn(directDownloadTemporaryPath);
         boolean result = storageProcessor.isEnoughSpaceForDownloadTemplateOnTemporaryLocation(null);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testIsEnoughSpaceForDownloadTemplateOnTemporaryLocationNotEnoughSpace() {
+        PowerMockito.mockStatic(Script.class);
+        Mockito.when(resource.getDirectDownloadTemporaryDownloadPath()).thenReturn(directDownloadTemporaryPath);
         String output = String.valueOf(templateSize - 30000L);
         Mockito.when(Script.runSimpleBashScript(Matchers.anyString())).thenReturn(output);
         boolean result = storageProcessor.isEnoughSpaceForDownloadTemplateOnTemporaryLocation(templateSize);
@@ -74,6 +116,8 @@ public class KVMStorageProcessorTest {
 
     @Test
     public void testIsEnoughSpaceForDownloadTemplateOnTemporaryLocationEnoughSpace() {
+        PowerMockito.mockStatic(Script.class);
+        Mockito.when(resource.getDirectDownloadTemporaryDownloadPath()).thenReturn(directDownloadTemporaryPath);
         String output = String.valueOf(templateSize + 30000L);
         Mockito.when(Script.runSimpleBashScript(Matchers.anyString())).thenReturn(output);
         boolean result = storageProcessor.isEnoughSpaceForDownloadTemplateOnTemporaryLocation(templateSize);
@@ -82,9 +126,226 @@ public class KVMStorageProcessorTest {
 
     @Test
     public void testIsEnoughSpaceForDownloadTemplateOnTemporaryLocationNotExistingLocation() {
+        PowerMockito.mockStatic(Script.class);
+        Mockito.when(resource.getDirectDownloadTemporaryDownloadPath()).thenReturn(directDownloadTemporaryPath);
         String output = String.format("df: ā€˜%sā€™: No such file or directory", directDownloadTemporaryPath);
         Mockito.when(Script.runSimpleBashScript(Matchers.anyString())).thenReturn(output);
         boolean result = storageProcessor.isEnoughSpaceForDownloadTemplateOnTemporaryLocation(templateSize);
         Assert.assertFalse(result);
     }
+
+    @Test
+    public void validateGetSnapshotTemporaryPath(){
+        String path = "/path/to/disk";
+        String snapshotName = "snapshot";
+        String expectedResult = "/path/to/snapshot";
+
+        String result = storageProcessor.getSnapshotTemporaryPath(path, snapshotName);
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test
+    public void validateGetSnapshotPathInPrimaryStorage(){
+        String path = "/path/to/disk";
+        String snapshotName = "snapshot";
+        String expectedResult = String.format("%s%s%s%s%s", path, File.separator, TemplateConstants.DEFAULT_SNAPSHOT_ROOT_DIR, File.separator, snapshotName);
+
+        String result = storageProcessor.getSnapshotPathInPrimaryStorage(path, snapshotName);
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void validateValidateAvailableSizeOnPoolToTakeVolumeSnapshotAvailabeSizeLessThanMinRateThrowCloudRuntimeException(){
+        KVMPhysicalDisk kvmPhysicalDiskMock = Mockito.mock(KVMPhysicalDisk.class);
+
+        Mockito.doReturn(104l).when(kvmStoragePoolMock).getAvailable();
+        Mockito.doReturn(100l).when(kvmPhysicalDiskMock).getSize();
+
+        storageProcessor.validateAvailableSizeOnPoolToTakeVolumeSnapshot(kvmStoragePoolMock, kvmPhysicalDiskMock);
+    }
+
+    @Test
+    public void validateValidateAvailableSizeOnPoolToTakeVolumeSnapshotAvailabeSizeEqualOrHigherThanMinRateDoNothing(){
+        KVMPhysicalDisk kvmPhysicalDiskMock = Mockito.mock(KVMPhysicalDisk.class);
+
+        Mockito.doReturn(105l, 106l).when(kvmStoragePoolMock).getAvailable();
+        Mockito.doReturn(100l).when(kvmPhysicalDiskMock).getSize();
+
+        storageProcessor.validateAvailableSizeOnPoolToTakeVolumeSnapshot(kvmStoragePoolMock, kvmPhysicalDiskMock);
+        storageProcessor.validateAvailableSizeOnPoolToTakeVolumeSnapshot(kvmStoragePoolMock, kvmPhysicalDiskMock);
+    }
+
+    private List<LibvirtVMDef.DiskDef> createDiskDefs(int iterations, boolean duplicatePath) {
+        List<LibvirtVMDef.DiskDef> disks = new ArrayList<>();
+
+        for (int i = 1; i <= iterations; i++) {
+            LibvirtVMDef.DiskDef disk = new LibvirtVMDef.DiskDef();
+            disk.defFileBasedDisk(String.format("path%s", duplicatePath ? "" : i), String.format("label%s", i), LibvirtVMDef.DiskDef.DiskBus.USB, LibvirtVMDef.DiskDef.DiskFmtType.RAW);
+            disks.add(disk);
+        }
+
+        return disks;
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void validateGetDiskToSnapshotAndDisksToAvoidDuplicatePathThrowsCloudRuntimeException() throws LibvirtException{
+        List<LibvirtVMDef.DiskDef> disks = createDiskDefs(2, true);
+
+        storageProcessor.getDiskToSnapshotAndDisksToAvoid(disks, "path", domainMock);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void validateGetDiskToSnapshotAndDisksToAvoidPathNotFoundThrowsCloudRuntimeException() throws LibvirtException{
+        List<LibvirtVMDef.DiskDef> disks = createDiskDefs(5, false);
+
+        storageProcessor.getDiskToSnapshotAndDisksToAvoid(disks, "path6", domainMock);
+    }
+
+    @Test
+    public void validateGetDiskToSnapshotAndDisksToAvoidPathFoundReturnLabels() throws LibvirtException{
+        List<LibvirtVMDef.DiskDef> disks = createDiskDefs(5, false);
+
+        String expectedLabelResult = "label2";
+        long expectedDisksSizeResult = disks.size() - 1;
+
+        Pair<String, Set<String>> result = storageProcessor.getDiskToSnapshotAndDisksToAvoid(disks, "path2", domainMock);
+
+        Assert.assertEquals(expectedLabelResult, result.first());
+        Assert.assertEquals(expectedDisksSizeResult, result.second().size());
+    }
+
+    @Test (expected = LibvirtException.class)
+    public void validateTakeVolumeSnapshotFailToCreateSnapshotThrowLibvirtException() throws LibvirtException{
+        Mockito.doReturn(diskToSnapshotAndDisksToAvoidMock).when(storageProcessorSpy).getDiskToSnapshotAndDisksToAvoid(Mockito.any(), Mockito.anyString(), Mockito.any());
+        Mockito.doReturn("").when(domainMock).getName();
+        Mockito.doReturn(new HashSet<>()).when(diskToSnapshotAndDisksToAvoidMock).second();
+        Mockito.doThrow(LibvirtException.class).when(domainMock).snapshotCreateXML(Mockito.anyString(), Mockito.anyInt());
+
+        storageProcessorSpy.takeVolumeSnapshot(new ArrayList<>(), "", "", domainMock);
+    }
+
+    @Test
+    public void validateTakeVolumeSnapshotSuccessReturnDiskLabel() throws LibvirtException{
+        String expectedResult = "label";
+
+        Mockito.doReturn(diskToSnapshotAndDisksToAvoidMock).when(storageProcessorSpy).getDiskToSnapshotAndDisksToAvoid(Mockito.any(), Mockito.anyString(), Mockito.any());
+        Mockito.doReturn("").when(domainMock).getName();
+        Mockito.doReturn(expectedResult).when(diskToSnapshotAndDisksToAvoidMock).first();
+        Mockito.doReturn(new HashSet<>()).when(diskToSnapshotAndDisksToAvoidMock).second();
+        Mockito.doReturn(null).when(domainMock).snapshotCreateXML(Mockito.anyString(), Mockito.anyInt());
+
+        String result = storageProcessorSpy.takeVolumeSnapshot(new ArrayList<>(), "", "", domainMock);
+
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test
+    @PrepareForTest(KVMStorageProcessor.class)
+    public void validateCopySnapshotToPrimaryStorageDirFailToCopyReturnErrorMessage() throws Exception {
+        String baseFile = "baseFile";
+        String snapshotPath = "snapshotPath";
+        String errorMessage = "error";
+        String expectedResult = String.format("Unable to copy %s snapshot [%s] to [%s] due to [%s].", volumeObjectToMock, baseFile, snapshotPath, errorMessage);
+
+        Mockito.doReturn(true).when(kvmStoragePoolMock).createFolder(Mockito.anyString());
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.copy(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.any())).thenThrow(new IOException(errorMessage));
+
+        String result = storageProcessorSpy.copySnapshotToPrimaryStorageDir(kvmStoragePoolMock, baseFile, snapshotPath, volumeObjectToMock);
+
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test
+    @PrepareForTest(KVMStorageProcessor.class)
+    public void validateCopySnapshotToPrimaryStorageDirCopySuccessReturnNull() throws Exception {
+        String baseFile = "baseFile";
+        String snapshotPath = "snapshotPath";
+
+        Mockito.doReturn(true).when(kvmStoragePoolMock).createFolder(Mockito.anyString());
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.copy(Mockito.any(Path.class), Mockito.any(Path.class), Mockito.any())).thenReturn(null);
+
+        String result = storageProcessorSpy.copySnapshotToPrimaryStorageDir(kvmStoragePoolMock, baseFile, snapshotPath, volumeObjectToMock);
+
+        Assert.assertNull(result);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    @PrepareForTest({Script.class, LibvirtUtilitiesHelper.class})
+    public void validateMergeSnapshotIntoBaseFileErrorOnMergeThrowCloudRuntimeException() throws Exception {
+        PowerMockito.mockStatic(Script.class, LibvirtUtilitiesHelper.class);
+        PowerMockito.when(Script.runSimpleBashScript(Mockito.anyString())).thenReturn("");
+        PowerMockito.when(LibvirtUtilitiesHelper.isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit(Mockito.any())).thenReturn(true);
+
+        storageProcessorSpy.mergeSnapshotIntoBaseFile(domainMock, "", "", "", volumeObjectToMock, connectMock);
+    }
+
+    @Test
+    @PrepareForTest({Script.class, LibvirtUtilitiesHelper.class})
+    public void validateMergeSnapshotIntoBaseFileMergeSuccessDoNothing() throws Exception {
+        PowerMockito.mockStatic(Script.class, LibvirtUtilitiesHelper.class);
+        PowerMockito.when(Script.runSimpleBashScript(Mockito.anyString())).thenReturn(null);
+        PowerMockito.when(LibvirtUtilitiesHelper.isLibvirtSupportingFlagDeleteOnCommandVirshBlockcommit(Mockito.any())).thenReturn(true);
+        Mockito.doNothing().when(storageProcessorSpy).manuallyDeleteUnusedSnapshotFile(Mockito.anyBoolean(), Mockito.any());
+
+        storageProcessorSpy.mergeSnapshotIntoBaseFile(domainMock, "", "", "", volumeObjectToMock, connectMock);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    @PrepareForTest(KVMStorageProcessor.class)
+    public void validateManuallyDeleteUnusedSnapshotFileLibvirtDoesNotSupportsFlagDeleteExceptionOnFileDeletionThrowsException() throws IOException {
+        Mockito.doReturn("").when(snapshotObjectToMock).getPath();
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.deleteIfExists(Mockito.any(Path.class))).thenThrow(IOException.class);
+
+        storageProcessorSpy.manuallyDeleteUnusedSnapshotFile(false, "");
+    }
+
+    @Test
+    public void validateIsAvailablePoolSizeDividedByDiskSizeLesserThanMinRate(){
+        Assert.assertTrue(storageProcessorSpy.isAvailablePoolSizeDividedByDiskSizeLesserThanMinRate(10499l, 10000l));
+        Assert.assertFalse(storageProcessorSpy.isAvailablePoolSizeDividedByDiskSizeLesserThanMinRate(10500l, 10000l));
+        Assert.assertFalse(storageProcessorSpy.isAvailablePoolSizeDividedByDiskSizeLesserThanMinRate(10501l, 10000l));
+    }
+
+    @Test
+    public void validateValidateCopyResultResultIsNullReturn() throws CloudRuntimeException, IOException{
+        storageProcessorSpy.validateCopyResult(null, "");
+    }
+
+    @Test (expected = IOException.class)
+    public void validateValidateCopyResultFailToDeleteThrowIOException() throws CloudRuntimeException, IOException{
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.deleteIfExists(Mockito.any())).thenThrow(new IOException(""));
+        storageProcessorSpy.validateCopyResult("", "");
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    @PrepareForTest(KVMStorageProcessor.class)
+    public void validateValidateCopyResulResultNotNullThrowCloudRuntimeException() throws CloudRuntimeException, IOException{
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.deleteIfExists(Mockito.any())).thenReturn(true);
+        storageProcessorSpy.validateCopyResult("", "");
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    @PrepareForTest(KVMStorageProcessor.class)
+    public void validateDeleteSnapshotFileErrorOnDeleteThrowsCloudRuntimeException() throws Exception {
+        Mockito.doReturn("").when(snapshotObjectToMock).getPath();
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.deleteIfExists(Mockito.any(Path.class))).thenThrow(IOException.class);
+
+        storageProcessorSpy.deleteSnapshotFile(snapshotObjectToMock);
+    }
+
+    @Test
+    @PrepareForTest(KVMStorageProcessor.class)
+    public void validateDeleteSnapshotFileSuccess () throws IOException {
+        Mockito.doReturn("").when(snapshotObjectToMock).getPath();
+        PowerMockito.mockStatic(Files.class);
+        PowerMockito.when(Files.deleteIfExists(Mockito.any(Path.class))).thenReturn(true);
+
+        storageProcessorSpy.deleteSnapshotFile(snapshotObjectToMock);
+    }
 }
diff --git a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/driver/CloudStackPrimaryDataStoreDriverImpl.java b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/driver/CloudStackPrimaryDataStoreDriverImpl.java
index fff15fd0ca9..56d2c3b7b2f 100644
--- a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/driver/CloudStackPrimaryDataStoreDriverImpl.java
+++ b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/driver/CloudStackPrimaryDataStoreDriverImpl.java
@@ -379,8 +379,7 @@ public class CloudStackPrimaryDataStoreDriverImpl implements PrimaryDataStoreDri
 
     @Override
     public void revertSnapshot(SnapshotInfo snapshot, SnapshotInfo snapshotOnPrimaryStore, AsyncCompletionCallback<CommandResult> callback) {
-        SnapshotObjectTO snapshotTO = (SnapshotObjectTO)snapshot.getTO();
-        RevertSnapshotCommand cmd = new RevertSnapshotCommand(snapshotTO);
+        RevertSnapshotCommand cmd = new RevertSnapshotCommand((SnapshotObjectTO)snapshot.getTO(), (SnapshotObjectTO)snapshotOnPrimaryStore.getTO());
 
         CommandResult result = new CommandResult();
         try {
diff --git a/scripts/storage/qcow2/managesnapshot.sh b/scripts/storage/qcow2/managesnapshot.sh
index c5831ef494e..b1bf73251a8 100755
--- a/scripts/storage/qcow2/managesnapshot.sh
+++ b/scripts/storage/qcow2/managesnapshot.sh
@@ -226,34 +226,10 @@ backup_snapshot() {
       return 2
     fi
   elif [ -f ${disk} ]; then
-    # Does the snapshot exist?
-    qemuimg_ret=$($qemu_img snapshot $forceShareFlag -l $disk 2>&1)
-    ret_code=$?
-    if [ $ret_code -gt 0 ] && [[ $qemuimg_ret == *"snapshot: invalid option -- 'U'"* ]]
-    then
-      forceShareFlag=""
-      qemuimg_ret=$($qemu_img snapshot $forceShareFlag -l $disk)
-      ret_code=$?
-    fi
-    if [ $ret_code -gt 0 ] || [[ ! $qemuimg_ret == *"$snapshotname"* ]]
-    then
-      printf "there is no $snapshotname on disk $disk\n" >&2
-      return 1
-    fi
 
-    qemuimg_ret=$($qemu_img convert $forceShareFlag -f qcow2 -O qcow2 -l snapshot.name=$snapshotname $disk $destPath/$destName 2>&1 > /dev/null)
+    cp "$disk" "${destPath}/${destName}"
     ret_code=$?
-    if [ $ret_code -gt 0 ] && [[ $qemuimg_ret == *"convert: invalid option -- 'U'"* ]]
-    then
-      forceShareFlag=""
-      qemuimg_ret=$($qemu_img convert $forceShareFlag -f qcow2 -O qcow2 -l snapshot.name=$snapshotname $disk $destPath/$destName 2>&1 > /dev/null)
-      ret_code=$?
-    fi
-    if [ $ret_code -gt 0 ] && [[ $qemuimg_ret == *"convert: invalid option -- 'l'"* ]]
-    then
-      $qemu_img convert $forceShareFlag -f qcow2 -O qcow2 -s $snapshotname $disk $destPath/$destName >& /dev/null
-      ret_code=$?
-    fi
+
     if [ $ret_code -gt 0 ]
     then
       printf "Failed to backup $snapshotname for disk $disk to $destPath\n" >&2
diff --git a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
index b573af7dba1..c31daf3693c 100644
--- a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
@@ -83,6 +83,7 @@ import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.jobs.JobInfo;
 import org.apache.cloudstack.resourcedetail.DiskOfferingDetailVO;
 import org.apache.cloudstack.resourcedetail.dao.DiskOfferingDetailsDao;
+import org.apache.cloudstack.snapshot.SnapshotHelper;
 import org.apache.cloudstack.storage.command.AttachAnswer;
 import org.apache.cloudstack.storage.command.AttachCommand;
 import org.apache.cloudstack.storage.command.DettachCommand;
@@ -99,6 +100,8 @@ import org.apache.cloudstack.utils.imagestore.ImageStoreUtil;
 import org.apache.cloudstack.utils.volume.VirtualMachineDiskInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -304,6 +307,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     @Inject
     private ManagementService managementService;
 
+    @Inject
+    protected SnapshotHelper snapshotHelper;
+
     protected Gson _gson;
 
     private static final List<HypervisorType> SupportedHypervisorsForVolResize = Arrays.asList(HypervisorType.KVM, HypervisorType.XenServer,
@@ -2804,6 +2810,13 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
             throw new InvalidParameterValueException("Cannot migrate volume " + vol + "to the destination storage pool " + destPool.getName() + " as the storage pool is in maintenance mode.");
         }
 
+        try {
+            snapshotHelper.checkKvmVolumeSnapshotsOnlyInPrimaryStorage(vol, _volsDao.getHypervisorType(vol.getId()));
+        } catch (CloudRuntimeException ex) {
+            throw new CloudRuntimeException(String.format("Unable to migrate %s to the destination storage pool [%s] due to [%s]", vol,
+                    new ToStringBuilder(destPool, ToStringStyle.JSON_STYLE).append("uuid", destPool.getUuid()).append("name", destPool.getName()).toString(), ex.getMessage()), ex);
+        }
+
         DiskOfferingVO diskOffering = _diskOfferingDao.findById(vol.getDiskOfferingId());
         if (diskOffering == null) {
             throw new CloudRuntimeException("volume '" + vol.getUuid() + "', has no diskoffering. Migration target cannot be checked.");
diff --git a/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java b/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java
index 049754a04d8..d98125e3896 100755
--- a/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java
@@ -38,7 +38,6 @@ import org.apache.cloudstack.api.command.user.snapshot.ListSnapshotsCmd;
 import org.apache.cloudstack.api.command.user.snapshot.UpdateSnapshotPolicyCmd;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
 import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
@@ -56,6 +55,7 @@ import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
+import org.apache.cloudstack.snapshot.SnapshotHelper;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
@@ -101,7 +101,6 @@ import com.cloud.storage.SnapshotScheduleVO;
 import com.cloud.storage.SnapshotVO;
 import com.cloud.storage.Storage;
 import com.cloud.storage.Storage.ImageFormat;
-import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.VMTemplateVO;
@@ -210,6 +209,9 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
     @Inject
     private AnnotationDao annotationDao;
 
+    @Inject
+    protected SnapshotHelper snapshotHelper;
+
     private int _totalRetries;
     private int _pauseInterval;
     private int snapshotBackupRetries, snapshotBackupRetryInterval;
@@ -312,7 +314,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             }
         }
 
-        DataStoreRole dataStoreRole = getDataStoreRole(snapshot);
+        DataStoreRole dataStoreRole = snapshotHelper.getDataStoreRole(snapshot);
 
         SnapshotInfo snapshotInfo = snapshotFactory.getSnapshot(snapshotId, dataStoreRole);
 
@@ -596,7 +598,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             return false;
         }
 
-        DataStoreRole dataStoreRole = getDataStoreRole(snapshotCheck);
+        DataStoreRole dataStoreRole = snapshotHelper.getDataStoreRole(snapshotCheck);
 
         SnapshotDataStoreVO snapshotStoreRef = _snapshotStoreDao.findBySnapshot(snapshotId, dataStoreRole);
 
@@ -1247,7 +1249,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             try {
                 postCreateSnapshot(volume.getId(), snapshotId, payload.getSnapshotPolicyId());
 
-                DataStoreRole dataStoreRole = getDataStoreRole(snapshot);
+                DataStoreRole dataStoreRole = snapshotHelper.getDataStoreRole(snapshot);
 
                 SnapshotDataStoreVO snapshotStoreRef = _snapshotStoreDao.findBySnapshot(snapshotId, dataStoreRole);
                 if (snapshotStoreRef == null) {
@@ -1337,37 +1339,6 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
         }
     }
 
-    private DataStoreRole getDataStoreRole(Snapshot snapshot) {
-        SnapshotDataStoreVO snapshotStore = _snapshotStoreDao.findBySnapshot(snapshot.getId(), DataStoreRole.Primary);
-
-        if (snapshotStore == null) {
-            return DataStoreRole.Image;
-        }
-
-        long storagePoolId = snapshotStore.getDataStoreId();
-        DataStore dataStore = dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
-
-        Map<String, String> mapCapabilities = dataStore.getDriver().getCapabilities();
-
-        if (mapCapabilities != null) {
-            String value = mapCapabilities.get(DataStoreCapabilities.STORAGE_SYSTEM_SNAPSHOT.toString());
-            Boolean supportsStorageSystemSnapshots = Boolean.valueOf(value);
-
-            if (supportsStorageSystemSnapshots) {
-                return DataStoreRole.Primary;
-            }
-        }
-
-        StoragePoolVO storagePoolVO = _storagePoolDao.findById(storagePoolId);
-        if (storagePoolVO.getPoolType() == StoragePoolType.RBD) {
-            return DataStoreRole.Primary;
-        } else if (snapshot.getSnapshotType() == Type.GROUP.ordinal()) {
-            return DataStoreRole.Primary;
-        }
-
-        return DataStoreRole.Image;
-    }
-
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 
diff --git a/server/src/main/java/com/cloud/template/TemplateManagerImpl.java b/server/src/main/java/com/cloud/template/TemplateManagerImpl.java
index 527be5d8cd0..3c70a2e7a0f 100755
--- a/server/src/main/java/com/cloud/template/TemplateManagerImpl.java
+++ b/server/src/main/java/com/cloud/template/TemplateManagerImpl.java
@@ -66,8 +66,6 @@ import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy.SnapshotOperation;
 import org.apache.cloudstack.engine.subsystem.api.storage.StorageCacheManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
@@ -115,7 +113,6 @@ import com.cloud.agent.api.to.DiskTO;
 import com.cloud.agent.api.to.NfsTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
 import com.cloud.api.ApiDBUtils;
-import com.cloud.api.ApiResponseHelper;
 import com.cloud.api.query.dao.UserVmJoinDao;
 import com.cloud.api.query.vo.UserVmJoinVO;
 import com.cloud.configuration.Config;
@@ -209,6 +206,7 @@ import com.cloud.vm.dao.VMInstanceDao;
 import com.google.common.base.Joiner;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.apache.cloudstack.snapshot.SnapshotHelper;
 
 public class TemplateManagerImpl extends ManagerBase implements TemplateManager, TemplateApiService, Configurable {
     private final static Logger s_logger = Logger.getLogger(TemplateManagerImpl.class);
@@ -300,6 +298,9 @@ public class TemplateManagerImpl extends ManagerBase implements TemplateManager,
     @Inject
     private EndPointSelector selector;
 
+    @Inject
+    protected SnapshotHelper snapshotHelper;
+
     private TemplateAdapter getAdapter(HypervisorType type) {
         TemplateAdapter adapter = null;
         if (type == HypervisorType.BareMetal) {
@@ -1613,6 +1614,8 @@ public class TemplateManagerImpl extends ManagerBase implements TemplateManager,
         SnapshotVO snapshot = null;
         VolumeVO volume = null;
         Account caller = CallContext.current().getCallingAccount();
+        boolean kvmSnapshotOnlyInPrimaryStorage = false;
+        SnapshotInfo snapInfo = null;
 
         try {
             TemplateInfo tmplInfo = _tmplFactory.getTemplate(templateId, DataStoreRole.Image);
@@ -1631,26 +1634,12 @@ public class TemplateManagerImpl extends ManagerBase implements TemplateManager,
             AsyncCallFuture<TemplateApiResult> future = null;
 
             if (snapshotId != null) {
-                DataStoreRole dataStoreRole = ApiResponseHelper.getDataStoreRole(snapshot, _snapshotStoreDao, _dataStoreMgr);
-
-                SnapshotInfo snapInfo = _snapshotFactory.getSnapshot(snapshotId, dataStoreRole);
+                DataStoreRole dataStoreRole = snapshotHelper.getDataStoreRole(snapshot);
+                kvmSnapshotOnlyInPrimaryStorage = snapshotHelper.isKvmSnapshotOnlyInPrimaryStorage(snapshot, dataStoreRole);
 
-                if (dataStoreRole == DataStoreRole.Image) {
-                    if (snapInfo == null) {
-                        snapInfo = _snapshotFactory.getSnapshot(snapshotId, DataStoreRole.Primary);
-                        if(snapInfo == null) {
-                            throw new CloudRuntimeException("Cannot find snapshot "+snapshotId);
-                        }
-                        // We need to copy the snapshot onto secondary.
-                        SnapshotStrategy snapshotStrategy = _storageStrategyFactory.getSnapshotStrategy(snapshot, SnapshotOperation.BACKUP);
-                        snapshotStrategy.backupSnapshot(snapInfo);
-
-                        // Attempt to grab it again.
-                        snapInfo = _snapshotFactory.getSnapshot(snapshotId, dataStoreRole);
-                        if(snapInfo == null) {
-                            throw new CloudRuntimeException("Cannot find snapshot " + snapshotId + " on secondary and could not create backup");
-                        }
-                    }
+                snapInfo = _snapshotFactory.getSnapshot(snapshotId, dataStoreRole);
+                if (dataStoreRole == DataStoreRole.Image || kvmSnapshotOnlyInPrimaryStorage) {
+                    snapInfo = snapshotHelper.backupSnapshotToSecondaryStorageIfNotExists(snapInfo, dataStoreRole, snapshot, kvmSnapshotOnlyInPrimaryStorage);
                     _accountMgr.checkAccess(caller, null, true, snapInfo);
                     DataStore snapStore = snapInfo.getDataStore();
 
@@ -1734,6 +1723,10 @@ public class TemplateManagerImpl extends ManagerBase implements TemplateManager,
                 });
 
             }
+
+            if (snapshotId != null) {
+                snapshotHelper.expungeTemporarySnapshot(kvmSnapshotOnlyInPrimaryStorage, snapInfo);
+            }
         }
 
         if (privateTemplate != null) {
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index e8733e618aa..dca068559ab 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -112,6 +112,7 @@ import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 import org.apache.cloudstack.query.QueryService;
+import org.apache.cloudstack.snapshot.SnapshotHelper;
 import org.apache.cloudstack.storage.command.DeleteCommand;
 import org.apache.cloudstack.storage.command.DettachCommand;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
@@ -124,6 +125,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.log4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -559,6 +562,9 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
     @Inject
     private StatsCollector statsCollector;
 
+    @Inject
+    protected SnapshotHelper snapshotHelper;
+
     private ScheduledExecutorService _executor = null;
     private ScheduledExecutorService _vmIpFetchExecutor = null;
     private int _expungeInterval;
@@ -6693,6 +6699,14 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
                     volToPoolObjectMap.put(volume.getId(), pool.getId());
                 }
                 HypervisorType hypervisorType = _volsDao.getHypervisorType(volume.getId());
+
+                try {
+                    snapshotHelper.checkKvmVolumeSnapshotsOnlyInPrimaryStorage(volume, hypervisorType);
+                } catch (CloudRuntimeException ex) {
+                    throw new CloudRuntimeException(String.format("Unable to migrate %s to the destination storage pool [%s] due to [%s]", volume,
+                            new ToStringBuilder(pool, ToStringStyle.JSON_STYLE).append("uuid", pool.getUuid()).append("name", pool.getName()).toString(), ex.getMessage()), ex);
+                }
+
                 if (hypervisorType.equals(HypervisorType.VMware)) {
                     try {
                         DiskOffering diskOffering = _diskOfferingDao.findById(volume.getDiskOfferingId());
diff --git a/server/src/main/java/org/apache/cloudstack/snapshot/SnapshotHelper.java b/server/src/main/java/org/apache/cloudstack/snapshot/SnapshotHelper.java
new file mode 100644
index 00000000000..dc4b17cfc06
--- /dev/null
+++ b/server/src/main/java/org/apache/cloudstack/snapshot/SnapshotHelper.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.snapshot;
+
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.Storage.StoragePoolType;
+import com.cloud.storage.dao.SnapshotDao;
+
+import static com.cloud.storage.snapshot.SnapshotManager.BackupSnapshotAfterTakingSnapshot;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.inject.Inject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
+import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.log4j.Logger;
+
+public class SnapshotHelper {
+    private final Logger logger = Logger.getLogger(this.getClass());
+
+    @Inject
+    protected SnapshotDataStoreDao snapshotDataStoreDao;
+
+    @Inject
+    protected SnapshotDataFactory snapshotFactory;
+
+    @Inject
+    protected SnapshotService snapshotService;
+
+    @Inject
+    protected StorageStrategyFactory storageStrategyFactory;
+
+    @Inject
+    protected DataStoreManager dataStorageManager;
+
+    @Inject
+    protected SnapshotDao snapshotDao;
+
+    @Inject
+    protected PrimaryDataStoreDao primaryDataStoreDao;
+
+    protected boolean backupSnapshotAfterTakingSnapshot = BackupSnapshotAfterTakingSnapshot.value();
+
+    protected final Set<StoragePoolType> storagePoolTypesToValidateWithBackupSnapshotAfterTakingSnapshot = new HashSet<>(Arrays.asList(StoragePoolType.RBD,
+            StoragePoolType.PowerFlex));
+
+     /**
+     * If the snapshot is a backup from a KVM snapshot that should be kept only in primary storage, expunges it from secondary storage.
+     * @param snapInfo the snapshot info to delete.
+     */
+    public void expungeTemporarySnapshot(boolean kvmSnapshotOnlyInPrimaryStorage, SnapshotInfo snapInfo) {
+        if (!kvmSnapshotOnlyInPrimaryStorage) {
+            logger.trace(String.format("Snapshot [%s] is not a temporary backup to create a volume from snapshot. Not expunging it.", snapInfo.getId()));
+            return;
+        }
+
+        if (snapInfo == null) {
+            logger.warn("Unable to expunge snapshot due to its info is null.");
+            return;
+        }
+
+        logger.debug(String.format("Expunging snapshot [%s] due to it is a temporary backup to create a volume from snapshot. It is occurring because the global setting [%s]"
+          + " has the value [%s].", snapInfo.getId(), BackupSnapshotAfterTakingSnapshot.key(), backupSnapshotAfterTakingSnapshot));
+
+        try {
+            snapshotService.deleteSnapshot(snapInfo);
+        } catch (CloudRuntimeException ex) {
+            logger.warn(String.format("Unable to delete the temporary snapshot [%s] on secondary storage due to [%s]. We still will expunge the database reference, consider"
+              + " manually deleting the file [%s].", snapInfo.getId(), ex.getMessage(), snapInfo.getPath()), ex);
+        }
+
+        snapshotDataStoreDao.expungeReferenceBySnapshotIdAndDataStoreRole(snapInfo.getId(), DataStoreRole.Image);
+    }
+
+    /**
+     * Backup the snapshot to secondary storage if it should be backed up and was not yet or it is a temporary backup to create a volume.
+     * @return The parameter snapInfo if the snapshot is not backupable, else backs up the snapshot to secondary storage and returns its info.
+     * @throws CloudRuntimeException
+     */
+    public SnapshotInfo backupSnapshotToSecondaryStorageIfNotExists(SnapshotInfo snapInfo, DataStoreRole dataStoreRole, Snapshot snapshot, boolean kvmSnapshotOnlyInPrimaryStorage) throws CloudRuntimeException {
+        if (!isSnapshotBackupable(snapInfo, dataStoreRole, kvmSnapshotOnlyInPrimaryStorage)) {
+            logger.trace(String.format("Snapshot [%s] is already on secondary storage or is not a KVM snapshot that is only kept in primary storage. Therefore, we do not back it up."
+              + " up.", snapInfo.getId()));
+
+            return snapInfo;
+        }
+
+        snapInfo = getSnapshotInfoByIdAndRole(snapshot.getId(), DataStoreRole.Primary);
+
+        SnapshotStrategy snapshotStrategy = storageStrategyFactory.getSnapshotStrategy(snapshot, SnapshotStrategy.SnapshotOperation.BACKUP);
+        snapshotStrategy.backupSnapshot(snapInfo);
+
+        return getSnapshotInfoByIdAndRole(snapshot.getId(), kvmSnapshotOnlyInPrimaryStorage ? DataStoreRole.Image : dataStoreRole);
+    }
+
+    /**
+     * Search for the snapshot info by the snapshot id and {@link DataStoreRole}.
+     * @return The snapshot info if it exists, else throws an exception.
+     * @throws CloudRuntimeException
+     */
+    protected SnapshotInfo getSnapshotInfoByIdAndRole(long snapshotId, DataStoreRole dataStoreRole) throws CloudRuntimeException{
+        SnapshotInfo snapInfo = snapshotFactory.getSnapshot(snapshotId, dataStoreRole);
+
+        if (snapInfo != null) {
+            return snapInfo;
+        }
+
+        throw new CloudRuntimeException(String.format("Could not find snapshot [%s] in %s storage. Therefore, we do not back it up.", snapshotId, dataStoreRole));
+    }
+
+    /**
+     * Verifies if the snapshot is backupable.
+     * @return true if snapInfo is null and dataStoreRole is {@link DataStoreRole#Image} or is a KVM snapshot that is only kept in primary storage, else false.
+     */
+    protected boolean isSnapshotBackupable(SnapshotInfo snapInfo, DataStoreRole dataStoreRole, boolean kvmSnapshotOnlyInPrimaryStorage) {
+        return (snapInfo == null && dataStoreRole == DataStoreRole.Image) || kvmSnapshotOnlyInPrimaryStorage;
+    }
+
+    /**
+     * Verifies if the snapshot was took on KVM and is kept in primary storage.
+     * @return true if hypervisor is {@link  HypervisorType#KVM} and data store role is {@link  DataStoreRole#Primary} and global setting "snapshot.backup.to.secondary" is false,
+     * else false.
+     */
+    public boolean isKvmSnapshotOnlyInPrimaryStorage(Snapshot snapshot, DataStoreRole dataStoreRole){
+        return snapshot.getHypervisorType() == Hypervisor.HypervisorType.KVM && dataStoreRole == DataStoreRole.Primary && !backupSnapshotAfterTakingSnapshot;
+    }
+
+    public DataStoreRole getDataStoreRole(Snapshot snapshot) {
+        SnapshotDataStoreVO snapshotStore = snapshotDataStoreDao.findBySnapshot(snapshot.getId(), DataStoreRole.Primary);
+
+        if (snapshotStore == null) {
+            return DataStoreRole.Image;
+        }
+
+        long storagePoolId = snapshotStore.getDataStoreId();
+
+        StoragePoolVO storagePoolVO = primaryDataStoreDao.findById(storagePoolId);
+        if ((storagePoolTypesToValidateWithBackupSnapshotAfterTakingSnapshot.contains(storagePoolVO.getPoolType()) || snapshot.getHypervisorType() == HypervisorType.KVM)
+                && !backupSnapshotAfterTakingSnapshot) {
+            return DataStoreRole.Primary;
+        }
+
+        DataStore dataStore = dataStorageManager.getDataStore(storagePoolId, DataStoreRole.Primary);
+
+        if (dataStore == null) {
+            return DataStoreRole.Image;
+        }
+
+        Map<String, String> mapCapabilities = dataStore.getDriver().getCapabilities();
+
+        if (MapUtils.isNotEmpty(mapCapabilities) && BooleanUtils.toBoolean(mapCapabilities.get(DataStoreCapabilities.STORAGE_SYSTEM_SNAPSHOT.toString()))) {
+            return DataStoreRole.Primary;
+        }
+
+        return DataStoreRole.Image;
+    }
+
+    /**
+     * Verifies if it is a KVM volume that has snapshots only in primary storage.
+     * @throws CloudRuntimeException If it is a KVM volume and has at least one snapshot only in primary storage.
+     */
+    public void checkKvmVolumeSnapshotsOnlyInPrimaryStorage(VolumeVO volumeVo, HypervisorType hypervisorType) throws CloudRuntimeException {
+        if (HypervisorType.KVM != hypervisorType) {
+            logger.trace(String.format("The %s hypervisor [%s] is not KVM, therefore we will not check if the snapshots are only in primary storage.", volumeVo, hypervisorType));
+            return;
+        }
+
+        Set<Long> snapshotIdsOnlyInPrimaryStorage = getSnapshotIdsOnlyInPrimaryStorage(volumeVo.getId());
+
+        if (CollectionUtils.isEmpty(snapshotIdsOnlyInPrimaryStorage)) {
+            logger.trace(String.format("%s is a KVM volume and all its snapshots exists in the secondary storage, therefore this volume is able for migration.", volumeVo));
+            return;
+        }
+
+        throwCloudRuntimeExceptionOfSnapshotsOnlyInPrimaryStorage(volumeVo, snapshotIdsOnlyInPrimaryStorage);
+    }
+
+    /**
+     * Throws a CloudRuntimeException with the volume and the snapshots only in primary storage.
+     */
+    protected void throwCloudRuntimeExceptionOfSnapshotsOnlyInPrimaryStorage(VolumeVO volumeVo, Set<Long> snapshotIdsOnlyInPrimaryStorage) throws CloudRuntimeException {
+        List<SnapshotVO> snapshots = snapshotDao.listByIds(snapshotIdsOnlyInPrimaryStorage.toArray());
+
+        String message = String.format("%s is a KVM volume and has snapshots only in primary storage. Snapshots [%s].%s", volumeVo,
+                snapshots.stream().map(snapshot -> new ToStringBuilder(snapshot, ToStringStyle.JSON_STYLE).append("uuid", snapshot.getUuid()).append("name", snapshot.getName())
+                        .build()).collect(Collectors.joining(", ")), backupSnapshotAfterTakingSnapshot ? "" : " Consider excluding them to migrate the volume to another storage.");
+
+        logger.error(message);
+        throw new CloudRuntimeException(message);
+    }
+
+    /**
+     * Retrieves the ids of the ready snapshots of the volume that only exists in primary storage.
+     * @param volumeId volume id to retrieve the snapshots.
+     * @return The ids of the ready snapshots of the volume that only exists in primary storage
+     */
+    protected Set<Long> getSnapshotIdsOnlyInPrimaryStorage(long volumeId) {
+        List<SnapshotDataStoreVO> snapshotsReferences = snapshotDataStoreDao.listReadyByVolumeId(volumeId);
+        Map<Long, List<SnapshotDataStoreVO>> referencesGroupBySnapshotId = snapshotsReferences.stream().collect(Collectors.groupingBy(reference -> reference.getSnapshotId()));
+
+        Set<Long> snapshotIdsOnlyInPrimaryStorage = new HashSet<>();
+        for (var reference: referencesGroupBySnapshotId.entrySet()) {
+            List<SnapshotDataStoreVO> listReferencesBySnapshotId = reference.getValue();
+
+            if  (!listReferencesBySnapshotId.stream().anyMatch(ref -> DataStoreRole.Image == ref.getRole())) {
+                snapshotIdsOnlyInPrimaryStorage.add(reference.getKey());
+            }
+        }
+
+        return snapshotIdsOnlyInPrimaryStorage;
+    }
+}
diff --git a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
index 7d17ec7de77..0b7d55d58c5 100644
--- a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
+++ b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
@@ -196,6 +196,8 @@
         <property name="templateAdapters" value="#{templateAdapterRegistry.registered}" />
     </bean>
 
+    <bean id="snapshotHelper" class="org.apache.cloudstack.snapshot.SnapshotHelper" />
+
     <bean id="uploadMonitorImpl" class="com.cloud.storage.upload.UploadMonitorImpl" />
     <bean id="usageServiceImpl" class="com.cloud.usage.UsageServiceImpl" />
     
diff --git a/server/src/test/java/com/cloud/storage/snapshot/SnapshotManagerTest.java b/server/src/test/java/com/cloud/storage/snapshot/SnapshotManagerTest.java
index cb0ed928b61..f17d300596a 100755
--- a/server/src/test/java/com/cloud/storage/snapshot/SnapshotManagerTest.java
+++ b/server/src/test/java/com/cloud/storage/snapshot/SnapshotManagerTest.java
@@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -36,6 +37,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy.SnapshotOperation;
+import org.apache.cloudstack.snapshot.SnapshotHelper;
 import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
@@ -149,6 +151,8 @@ public class SnapshotManagerTest {
     SnapshotDataStoreVO snapshotStoreMock;
     @Mock
     SnapshotService snapshotSrv;
+    @Mock
+    SnapshotHelper snapshotHelperMock;
 
     @Mock
     GlobalLock globalLockMock;
@@ -199,6 +203,7 @@ public class SnapshotManagerTest {
         _snapshotMgr._resourceMgr = _resourceMgr;
         _snapshotMgr._vmSnapshotDao = _vmSnapshotDao;
         _snapshotMgr._snapshotStoreDao = snapshotStoreDao;
+        _snapshotMgr.snapshotHelper = snapshotHelperMock;
         _snapshotMgr._snapshotPolicyDao = snapshotPolicyDaoMock;
         _snapshotMgr._snapSchedMgr = snapshotSchedulerMock;
         _snapshotMgr.taggedResourceService = taggedResourceServiceMock;
@@ -331,6 +336,7 @@ public class SnapshotManagerTest {
         when(vmMock.getState()).thenReturn(State.Stopped);
         when(vmMock.getHypervisorType()).thenReturn(Hypervisor.HypervisorType.XenServer);
         when(volumeMock.getFormat()).thenReturn(ImageFormat.VHD);
+        doReturn(DataStoreRole.Image).when(snapshotHelperMock).getDataStoreRole(Mockito.any());
         Snapshot snapshot = _snapshotMgr.revertSnapshot(TEST_SNAPSHOT_ID);
         Assert.assertNull(snapshot);
     }
@@ -344,6 +350,7 @@ public class SnapshotManagerTest {
         when(volumeMock.getFormat()).thenReturn(ImageFormat.QCOW2);
         when (snapshotStrategy.revertSnapshot(Mockito.any(SnapshotInfo.class))).thenReturn(true);
         when(_volumeDao.update(anyLong(), any(VolumeVO.class))).thenReturn(true);
+        doReturn(DataStoreRole.Image).when(snapshotHelperMock).getDataStoreRole(Mockito.any());
         Snapshot snapshot = _snapshotMgr.revertSnapshot(TEST_SNAPSHOT_ID);
         Assert.assertNotNull(snapshot);
     }
diff --git a/server/src/test/java/com/cloud/template/TemplateManagerImplTest.java b/server/src/test/java/com/cloud/template/TemplateManagerImplTest.java
index b6883373490..0a00d283292 100755
--- a/server/src/test/java/com/cloud/template/TemplateManagerImplTest.java
+++ b/server/src/test/java/com/cloud/template/TemplateManagerImplTest.java
@@ -118,6 +118,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
+import org.apache.cloudstack.snapshot.SnapshotHelper;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.nullable;
@@ -705,6 +707,16 @@ public class TemplateManagerImplTest {
             return Mockito.mock(HypervisorGuruManager.class);
         }
 
+        @Bean
+        public SnapshotHelper snapshotHelper() {
+            return Mockito.mock(SnapshotHelper.class);
+        }
+
+        @Bean
+        public SnapshotService snapshotService() {
+            return Mockito.mock(SnapshotService.class);
+        }
+
         public static class Library implements TypeFilter {
             @Override
             public boolean match(MetadataReader mdr, MetadataReaderFactory arg1) throws IOException {
diff --git a/server/src/test/java/org/apache/cloudstack/snapshot/SnapshotHelperTest.java b/server/src/test/java/org/apache/cloudstack/snapshot/SnapshotHelperTest.java
new file mode 100644
index 00000000000..555df93fce7
--- /dev/null
+++ b/server/src/test/java/org/apache/cloudstack/snapshot/SnapshotHelperTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.snapshot;
+
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
+import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SnapshotHelperTest {
+
+    SnapshotHelper snapshotHelperSpy = Mockito.spy(SnapshotHelper.class);
+
+    @Mock
+    SnapshotService snapshotServiceMock;
+
+    @Mock
+    SnapshotDataStoreDao snapshotDataStoreDaoMock;
+
+    @Mock
+    SnapshotInfo snapshotInfoMock;
+
+    @Mock
+    SnapshotInfo snapshotInfoMock2;
+
+    @Mock
+    SnapshotDataFactory snapshotDataFactoryMock;
+
+    @Mock
+    StorageStrategyFactory storageStrategyFactoryMock;
+
+    @Mock
+    SnapshotStrategy snapshotStrategyMock;
+
+    @Mock
+    SnapshotDao snapshotDaoMock;
+
+    @Mock
+    VolumeVO volumeVoMock;
+
+    List<DataStoreRole> dataStoreRoles = Arrays.asList(DataStoreRole.values());
+
+    @Before
+    public void init() {
+        snapshotHelperSpy.snapshotService = snapshotServiceMock;
+        snapshotHelperSpy.snapshotDataStoreDao = snapshotDataStoreDaoMock;
+        snapshotHelperSpy.snapshotFactory = snapshotDataFactoryMock;
+        snapshotHelperSpy.storageStrategyFactory = storageStrategyFactoryMock;
+        snapshotHelperSpy.snapshotDao = snapshotDaoMock;
+    }
+
+    @Test
+    public void validateExpungeTemporarySnapshotNotAKvmSnapshotOnPrimaryStorageDoNothing() {
+        snapshotHelperSpy.expungeTemporarySnapshot(false, snapshotInfoMock);
+        Mockito.verifyNoInteractions(snapshotServiceMock, snapshotDataStoreDaoMock);
+    }
+
+    @Test
+    public void validateExpungeTemporarySnapshotKvmSnapshotOnPrimaryStorageExpungesSnapshot() {
+        Mockito.doReturn(true).when(snapshotServiceMock).deleteSnapshot(Mockito.any());
+        Mockito.doReturn(true).when(snapshotDataStoreDaoMock).expungeReferenceBySnapshotIdAndDataStoreRole(Mockito.anyLong(), Mockito.any());
+
+        snapshotHelperSpy.expungeTemporarySnapshot(true, snapshotInfoMock);
+
+        Mockito.verify(snapshotServiceMock).deleteSnapshot(Mockito.any());
+        Mockito.verify(snapshotDataStoreDaoMock).expungeReferenceBySnapshotIdAndDataStoreRole(Mockito.anyLong(), Mockito.any());
+    }
+
+    @Test
+    public void validateIsKvmSnapshotOnlyInPrimaryStorageBackupToSecondaryTrue() {
+        List<Hypervisor.HypervisorType> hypervisorTypes = Arrays.asList(Hypervisor.HypervisorType.values());
+        snapshotHelperSpy.backupSnapshotAfterTakingSnapshot = true;
+
+        hypervisorTypes.forEach(type -> {
+            Mockito.doReturn(type).when(snapshotInfoMock).getHypervisorType();
+            dataStoreRoles.forEach(role -> {
+                Assert.assertFalse(snapshotHelperSpy.isKvmSnapshotOnlyInPrimaryStorage(snapshotInfoMock, role));
+            });
+        });
+    }
+
+    @Test
+    public void validateIsKvmSnapshotOnlyInPrimaryStorageBackupToSecondaryFalse() {
+        List<Hypervisor.HypervisorType> hypervisorTypes = Arrays.asList(Hypervisor.HypervisorType.values());
+        snapshotHelperSpy.backupSnapshotAfterTakingSnapshot = false;
+
+        hypervisorTypes.forEach(type -> {
+            Mockito.doReturn(type).when(snapshotInfoMock).getHypervisorType();
+            dataStoreRoles.forEach(role -> {
+                if (type == Hypervisor.HypervisorType.KVM && role == DataStoreRole.Primary) {
+                    Assert.assertTrue(snapshotHelperSpy.isKvmSnapshotOnlyInPrimaryStorage(snapshotInfoMock, role));
+                } else {
+                    Assert.assertFalse(snapshotHelperSpy.isKvmSnapshotOnlyInPrimaryStorage(snapshotInfoMock, role));
+                }
+            });
+        });
+    }
+
+    @Test
+    public void validateGetSnapshotInfoByIdAndRoleSnapInfoFoundReturnIt() {
+        Mockito.doReturn(snapshotInfoMock).when(snapshotDataFactoryMock).getSnapshot(Mockito.anyLong(), Mockito.any(DataStoreRole.class));
+
+        dataStoreRoles.forEach(role -> {
+            SnapshotInfo result = snapshotHelperSpy.getSnapshotInfoByIdAndRole(0, role);
+            Assert.assertEquals(snapshotInfoMock, result);
+        });
+    }
+
+    @Test(expected = CloudRuntimeException.class)
+    public void validateGetSnapshotInfoByIdAndRoleSnapInfoNotFoundThrowCloudRuntimeException() {
+        Mockito.doReturn(null).when(snapshotDataFactoryMock).getSnapshot(Mockito.anyLong(), Mockito.any(DataStoreRole.class));
+
+        dataStoreRoles.forEach(role -> {
+            snapshotHelperSpy.getSnapshotInfoByIdAndRole(0, role);
+        });
+    }
+
+    @Test
+    public void validateIsSnapshotBackupableSnapInfoNullAndAllRolesAndKvmSnapshotOnlyInPrimaryStorageFalse() {
+        dataStoreRoles.forEach(role -> {
+            if (role == DataStoreRole.Image) {
+                Assert.assertTrue(snapshotHelperSpy.isSnapshotBackupable(null, role, false));
+            } else {
+                Assert.assertFalse(snapshotHelperSpy.isSnapshotBackupable(null, role, false));
+            }
+        });
+    }
+
+    @Test
+    public void validateIsSnapshotBackupableSnapInfoNotNullAndAllRolesAndKvmSnapshotOnlyInPrimaryStorageFalse() {
+        dataStoreRoles.forEach(role -> {
+            Assert.assertFalse(snapshotHelperSpy.isSnapshotBackupable(snapshotInfoMock, role, false));
+        });
+    }
+
+    @Test
+    public void validateIsSnapshotBackupableSnapInfoNullAndAllRolesAndKvmSnapshotOnlyInPrimaryStorageTrue() {
+        dataStoreRoles.forEach(role -> {
+            Assert.assertTrue(snapshotHelperSpy.isSnapshotBackupable(null, role, true));
+        });
+    }
+
+    @Test
+    public void validateIsSnapshotBackupableSnapInfoNotNullAndAllRolesAndKvmSnapshotOnlyInPrimaryStorageTrue() {
+        dataStoreRoles.forEach(role -> {
+            Assert.assertTrue(snapshotHelperSpy.isSnapshotBackupable(snapshotInfoMock, role, true));
+        });
+    }
+
+    @Test
+    public void validateBackupSnapshotToSecondaryStorageIfNotExistsSnapshotIsNotBackupable(){
+        Mockito.doReturn(false).when(snapshotHelperSpy).isSnapshotBackupable(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+        SnapshotInfo result = snapshotHelperSpy.backupSnapshotToSecondaryStorageIfNotExists(snapshotInfoMock, DataStoreRole.Image, snapshotInfoMock, true);
+        Assert.assertEquals(snapshotInfoMock, result);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void validateBackupSnapshotToSecondaryStorageIfNotExistsGetSnapshotThrowsCloudRuntimeException(){
+        Mockito.doReturn(true).when(snapshotHelperSpy).isSnapshotBackupable(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+        Mockito.doThrow(CloudRuntimeException.class).when(snapshotHelperSpy).getSnapshotInfoByIdAndRole(Mockito.anyLong(), Mockito.any());
+
+        snapshotHelperSpy.backupSnapshotToSecondaryStorageIfNotExists(snapshotInfoMock, DataStoreRole.Image, snapshotInfoMock, true);
+    }
+
+    @Test
+    public void validateBackupSnapshotToSecondaryStorageIfNotExistsReturnSnapshotInfo(){
+        Mockito.doReturn(true).when(snapshotHelperSpy).isSnapshotBackupable(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+        Mockito.doReturn(snapshotInfoMock, snapshotInfoMock2).when(snapshotHelperSpy).getSnapshotInfoByIdAndRole(Mockito.anyLong(), Mockito.any());
+        Mockito.doReturn(snapshotStrategyMock).when(storageStrategyFactoryMock).getSnapshotStrategy(Mockito.any(), Mockito.any());
+        Mockito.doReturn(null).when(snapshotStrategyMock).backupSnapshot(Mockito.any());
+
+        SnapshotInfo result = snapshotHelperSpy.backupSnapshotToSecondaryStorageIfNotExists(snapshotInfoMock, DataStoreRole.Image, snapshotInfoMock, true);
+
+        Assert.assertEquals(snapshotInfoMock2, result);
+    }
+
+    private SnapshotDataStoreVO createSnapshotDataStoreVo(long snapshotId, DataStoreRole role) {
+        SnapshotDataStoreVO snapshotDataStoreVo = new SnapshotDataStoreVO(0, snapshotId);
+        snapshotDataStoreVo.setRole(role);
+        return snapshotDataStoreVo;
+    }
+
+    @Test
+    public void validateGetSnapshotIdsOnlyInPrimaryStorageAllSnapshotsInSecondaryStorageReturnEmpty() {
+        List<SnapshotDataStoreVO> snapshotDataStoreVos = new ArrayList<>();
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(1, DataStoreRole.Primary));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(1, DataStoreRole.Image));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(2, DataStoreRole.Primary));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(2, DataStoreRole.Image));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(3, DataStoreRole.Primary));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(3, DataStoreRole.Image));
+
+        Mockito.doReturn(snapshotDataStoreVos).when(snapshotDataStoreDaoMock).listReadyByVolumeId(Mockito.anyLong());
+
+        Set<Long> result = snapshotHelperSpy.getSnapshotIdsOnlyInPrimaryStorage(0);
+
+        Assert.assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void validateGetSnapshotIdsOnlyInPrimaryStorageEmptySnapshotsReturnEmpty() {
+        List<SnapshotDataStoreVO> snapshotDataStoreVos = new ArrayList<>();
+
+        Mockito.doReturn(snapshotDataStoreVos).when(snapshotDataStoreDaoMock).listReadyByVolumeId(Mockito.anyLong());
+
+        Set<Long> result = snapshotHelperSpy.getSnapshotIdsOnlyInPrimaryStorage(0);
+
+        Assert.assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void validateGetSnapshotIdsOnlyInPrimaryStorageSomeSnapshotsNotInSecondaryStorageReturnSnapshotIds() {
+        List<SnapshotDataStoreVO> snapshotDataStoreVos = new ArrayList<>();
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(1, DataStoreRole.Primary));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(2, DataStoreRole.Primary));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(2, DataStoreRole.Image));
+        snapshotDataStoreVos.add(createSnapshotDataStoreVo(3, DataStoreRole.Primary));
+
+        Mockito.doReturn(snapshotDataStoreVos).when(snapshotDataStoreDaoMock).listReadyByVolumeId(Mockito.anyLong());
+
+        Set<Long> result = snapshotHelperSpy.getSnapshotIdsOnlyInPrimaryStorage(0);
+
+        Assert.assertEquals(2, result.size());
+        Assert.assertTrue(result.contains(1l));
+        Assert.assertFalse(result.contains(2l));
+        Assert.assertTrue(result.contains(3l));
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void validateThrowCloudRuntimeExceptionOfSnapshotsOnlyInPrimaryStorage() {
+        Mockito.doReturn(new ArrayList<>()).when(snapshotDaoMock).listByIds(Mockito.any());
+        snapshotHelperSpy.throwCloudRuntimeExceptionOfSnapshotsOnlyInPrimaryStorage(null, new HashSet<>());
+    }
+
+    @Test
+    public void checkKvmVolumeSnapshotsOnlyInPrimaryStorageTestNotKvmDoNothing() {
+        Arrays.asList(HypervisorType.values()).forEach(hypervisorType -> {
+            if (hypervisorType == HypervisorType.KVM) {
+                return;
+            }
+
+            snapshotHelperSpy.checkKvmVolumeSnapshotsOnlyInPrimaryStorage(null, hypervisorType);
+        });
+
+        Mockito.verify(snapshotHelperSpy, Mockito.never()).getSnapshotIdsOnlyInPrimaryStorage(Mockito.anyLong());
+    }
+
+    @Test
+    public void checkKvmVolumeSnapshotsOnlyInPrimaryStorageTestAllSnapshotsInSecondaryStorageDoNothing() {
+        Mockito.doReturn(new HashSet<>()).when(snapshotHelperSpy).getSnapshotIdsOnlyInPrimaryStorage(Mockito.anyLong());
+
+        snapshotHelperSpy.checkKvmVolumeSnapshotsOnlyInPrimaryStorage(volumeVoMock, HypervisorType.KVM);
+    }
+
+    @Test (expected = CloudRuntimeException.class)
+    public void checkKvmVolumeSnapshotsOnlyInPrimaryStorageTestSomeSnapshotsNotInSecondaryStorageThrowsCloudRuntimeException() {
+        Mockito.doReturn(new HashSet<>(Arrays.asList(1l, 2l))).when(snapshotHelperSpy).getSnapshotIdsOnlyInPrimaryStorage(Mockito.anyLong());
+
+        snapshotHelperSpy.checkKvmVolumeSnapshotsOnlyInPrimaryStorage(volumeVoMock, HypervisorType.KVM);
+    }
+}