You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by nv...@apache.org on 2022/04/08 00:42:18 UTC

[cloudstack] branch main updated: Storage-based Snapshots for KVM VMs (#3724)

This is an automated email from the ASF dual-hosted git repository.

nvazquez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b075ed39e2 Storage-based Snapshots for KVM VMs (#3724)
2b075ed39e2 is described below

commit 2b075ed39e22714b1984ecb98151edffdc03f500
Author: slavkap <51...@users.noreply.github.com>
AuthorDate: Fri Apr 8 03:42:12 2022 +0300

    Storage-based Snapshots for KVM VMs (#3724)
    
    * VM snapshots of running KVM instance using storage providers plugins for disk snapshots
    
    Added new virtual machine snapshot strategy which is using storage providers plugins to take/revert/delete snapshots.
    You can take VM snapshot without VM memory on KVM instance, using storage providers implementations for disk snapshots.
    Also revert and delete is added as functionality. Added Thaw/Freeze command for KVM instance.
    The snapshots will be consistent, because we freeze the VM during the snapshotting. Backup to secondary storage is executed after
    thaw of the VM and if it is enabled in global settings.
    
    * Removed duplicated functionality
    
    Set few methods in DefaultVMSnapshotStrategy to protected to reuse them
    without duplicating the code. Remove code that is actualy not needed
    
    * Added requirements in global setting kvm.vmstoragesnapshot.enabled
    
    Added more information in kvm.vmstoragesnapshot.enabled global setting,
    that it needs installation of:
    - qemu version 1.6+
    - qemu-guest-agent installed on guest virtual machine
    
    when the option is enabled
    
    * Added Apache license header
    
    * Removed commented code
    
    * If "kvm.vmstoragesnapshot.enabled" is null should be considered as false
    
    * removed unused imports, replaced default template
    
    Removed unused imports which causing failures and replaced template to
    CentOS8
    
    * "kvm.vmstoragesnapshot.enabled" set to dynamic
    
    * Getting status of freeze/thaw commands not the return code
    
    Will chacke the status if freeze/thaw of Guest VM succeded, rather than
    looking for return code. Code refactoring
    
    * removed "CreatingKVM" VMsnapshot state and events related to it
    
    * renamed AllocatedKVM to AllocatedVM
    
    the states should not be associated to a hypervisor type
    
    * loggin the result of "drive-backup" command
    
    * Check which VM snapshot strategy could handle the vm snapshots
    
    gets the best match of VM snapshot strategy which could handle the vm
    snapshots on KVM.
    Other storage plugins could integrate with this functionality to support group snapshots
    
    * Added poolId in canHandle for KVM hypervisors
    
    Added poolId into canHandle method used to check if all volumes are on
    the same PowerFlex's storage pool
    
    * skip smoke tests if the hypervisor's OS type is CentOS
    
    This PR works with functionality included in qemu-kvm-ev which
    does not come by default on CentOS. The smoke tests will be skipped if
    the hypervisor OS is CentOS
    
    * Added missed import in smoke test
    
    * Suggested change to use ` org.apache.commons.lang.StringUtils.isNotBlank`
    
    * Fix getting device on Ubuntu
    
    On Ubuntu the device isn't provided and we have to get it from
    node-name parameter. For drive-backup command (for Ubuntu) is needed and job-id which
    is the value of node-name (this extra param works on Ubuntu and CentOS as well).
    
    * Removed new snapshot states and functionality for NFS
    
    * throw CloudRuntimeException
    
    provide a properer error message when delete VM snapshot fails
    
    * exclude GROUP snapshots when listing snapshots
    
    * Skip tests if there is pool with NFS/Local
    
    * address comments
---
 api/src/main/java/com/cloud/storage/Snapshot.java  |   2 +-
 .../com/cloud/agent/api/FreezeThawVMAnswer.java    |  39 ++
 .../com/cloud/agent/api/FreezeThawVMCommand.java   |  56 +++
 .../api/storage/StorageStrategyFactory.java        |   8 +
 .../subsystem/api/storage/VMSnapshotStrategy.java  |   8 +
 .../vmsnapshot/DefaultVMSnapshotStrategy.java      |  22 +-
 .../vmsnapshot/ScaleIOVMSnapshotStrategy.java      |  30 ++
 .../vmsnapshot/StorageVMSnapshotStrategy.java      | 481 +++++++++++++++++++++
 ...ing-engine-storage-snapshot-storage-context.xml |   3 +
 .../vmsnapshot/VMSnapshotStrategyKVMTest.java      | 436 +++++++++++++++++++
 .../storage/helper/StorageStrategyFactoryImpl.java |  10 +
 .../wrapper/LibvirtFreezeThawVMCommandWrapper.java | 103 +++++
 .../apache/cloudstack/utils/qemu/QemuCommand.java  |  51 +++
 .../cloud/storage/snapshot/SnapshotManager.java    |   2 +
 .../storage/snapshot/SnapshotManagerImpl.java      |  17 +-
 .../cloud/vm/snapshot/VMSnapshotManagerImpl.java   |  61 +--
 test/integration/smoke/test_vm_snapshot_kvm.py     | 326 ++++++++++++++
 17 files changed, 1606 insertions(+), 49 deletions(-)

diff --git a/api/src/main/java/com/cloud/storage/Snapshot.java b/api/src/main/java/com/cloud/storage/Snapshot.java
index 6b87de63e9a..5b25843f48b 100644
--- a/api/src/main/java/com/cloud/storage/Snapshot.java
+++ b/api/src/main/java/com/cloud/storage/Snapshot.java
@@ -26,7 +26,7 @@ import java.util.Date;
 
 public interface Snapshot extends ControlledEntity, Identity, InternalIdentity, StateObject<Snapshot.State> {
     public enum Type {
-        MANUAL, RECURRING, TEMPLATE, HOURLY, DAILY, WEEKLY, MONTHLY;
+        MANUAL, RECURRING, TEMPLATE, HOURLY, DAILY, WEEKLY, MONTHLY, GROUP;
         private int max = 8;
 
         public void setMax(int max) {
diff --git a/core/src/main/java/com/cloud/agent/api/FreezeThawVMAnswer.java b/core/src/main/java/com/cloud/agent/api/FreezeThawVMAnswer.java
new file mode 100644
index 00000000000..8e2b40996e0
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/FreezeThawVMAnswer.java
@@ -0,0 +1,39 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+public class FreezeThawVMAnswer extends Answer {
+
+    public FreezeThawVMAnswer() {
+        super();
+    }
+
+    public FreezeThawVMAnswer(FreezeThawVMCommand command, boolean success, String details) {
+        super(command, success, details);
+    }
+
+    public FreezeThawVMAnswer(FreezeThawVMCommand command, Exception e) {
+        super(command, e);
+    }
+
+    public FreezeThawVMAnswer(FreezeThawVMCommand command) {
+        super(command);
+    }
+}
diff --git a/core/src/main/java/com/cloud/agent/api/FreezeThawVMCommand.java b/core/src/main/java/com/cloud/agent/api/FreezeThawVMCommand.java
new file mode 100644
index 00000000000..b39955379d6
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/FreezeThawVMCommand.java
@@ -0,0 +1,56 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.agent.api;
+
+public class FreezeThawVMCommand extends Command{
+
+    public static final String FREEZE = "frozen";
+    public static final String THAW = "thawed";
+    public static final String STATUS = "status";
+
+    private String vmName;
+    private String option;
+
+    public FreezeThawVMCommand(String vmName) {
+        this.vmName = vmName;
+    }
+
+    public String getVmName() {
+        return vmName;
+    }
+
+    public void setVmName(String vmName) {
+        this.vmName = vmName;
+    }
+
+    public String getOption() {
+        return option;
+    }
+
+    public void setOption(String option) {
+        this.option = option;
+    }
+
+    @Override
+    public boolean executeInSequence() {
+        return false;
+    }
+
+}
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StorageStrategyFactory.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StorageStrategyFactory.java
index 91bcc1fc12f..eac9a313278 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StorageStrategyFactory.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StorageStrategyFactory.java
@@ -36,4 +36,12 @@ public interface StorageStrategyFactory {
 
     VMSnapshotStrategy getVmSnapshotStrategy(VMSnapshot vmSnapshot);
 
+    /**
+     * Used only for KVM hypervisors when allocating a VM snapshot
+     * @param vmId the ID of the virtual machine
+     * @param snapshotMemory for VM snapshots with memory
+     * @return VMSnapshotStrategy
+     */
+    VMSnapshotStrategy getVmSnapshotStrategy(Long vmId, Long rootPoolId, boolean snapshotMemory);
+
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java
index a009a940a74..223229b5ee8 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/VMSnapshotStrategy.java
@@ -29,6 +29,14 @@ public interface VMSnapshotStrategy {
 
     StrategyPriority canHandle(VMSnapshot vmSnapshot);
 
+    /**
+     * Used only for KVM hypervisors when allocating a VM snapshot
+     * @param vmId the ID of the virtual machine
+     * @param snapshotMemory for VM snapshots with memory
+     * @return StrategyPriority
+     */
+    StrategyPriority canHandle(Long vmId, Long poolId, boolean snapshotMemory);
+
     /**
      * Delete vm snapshot only from database. Introduced as a Vmware optimization in which vm snapshots are deleted when
      * the vm gets deleted on hypervisor (no need to delete each vm snapshot before deleting vm, just mark them as deleted on DB)
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java
index b7d565f3df2..c647b11d22c 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/DefaultVMSnapshotStrategy.java
@@ -53,6 +53,7 @@ import com.cloud.storage.DiskOfferingVO;
 import com.cloud.storage.GuestOSHypervisorVO;
 import com.cloud.storage.GuestOSVO;
 import com.cloud.storage.VolumeVO;
+import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.dao.DiskOfferingDao;
 import com.cloud.storage.dao.GuestOSDao;
 import com.cloud.storage.dao.GuestOSHypervisorDao;
@@ -67,6 +68,7 @@ import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.fsm.NoTransitionException;
 import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.snapshot.VMSnapshot;
 import com.cloud.vm.snapshot.VMSnapshotVO;
@@ -332,7 +334,7 @@ public class DefaultVMSnapshotStrategy extends ManagerBase implements VMSnapshot
         }
     }
 
-    private void publishUsageEvent(String type, VMSnapshot vmSnapshot, UserVm userVm, VolumeObjectTO volumeTo) {
+    protected void publishUsageEvent(String type, VMSnapshot vmSnapshot, UserVm userVm, VolumeObjectTO volumeTo) {
         VolumeVO volume = volumeDao.findById(volumeTo.getId());
         Long diskOfferingId = volume.getDiskOfferingId();
         Long offeringId = null;
@@ -350,7 +352,7 @@ public class DefaultVMSnapshotStrategy extends ManagerBase implements VMSnapshot
                 volumeTo.getSize(), VMSnapshot.class.getName(), vmSnapshot.getUuid(), details);
     }
 
-    private void publishUsageEvent(String type, VMSnapshot vmSnapshot, UserVm userVm, Long vmSnapSize, Long virtualSize) {
+    protected void publishUsageEvent(String type, VMSnapshot vmSnapshot, UserVm userVm, Long vmSnapSize, Long virtualSize) {
         try {
             Map<String, String> details = new HashMap<>();
             if (vmSnapshot != null) {
@@ -449,4 +451,20 @@ public class DefaultVMSnapshotStrategy extends ManagerBase implements VMSnapshot
         }
         return vmSnapshotDao.remove(vmSnapshot.getId());
     }
+
+    @Override
+    public StrategyPriority canHandle(Long vmId, Long rootPoolId, boolean snapshotMemory) {
+        UserVmVO vm = userVmDao.findById(vmId);
+        if (vm.getState() == State.Running && !snapshotMemory) {
+            return StrategyPriority.CANT_HANDLE;
+        }
+
+        List<VolumeVO> volumes = volumeDao.findByInstance(vmId);
+        for (VolumeVO volume : volumes) {
+            if (volume.getFormat() != ImageFormat.QCOW2) {
+                return StrategyPriority.CANT_HANDLE;
+            }
+        }
+        return StrategyPriority.DEFAULT;
+    }
 }
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java
index 34ea4908b7e..50afa647dc3 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/ScaleIOVMSnapshotStrategy.java
@@ -37,6 +37,7 @@ import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.util.ScaleIOUtil;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.log4j.Logger;
 
 import com.cloud.agent.api.VMSnapshotTO;
@@ -47,6 +48,7 @@ import com.cloud.event.UsageEventVO;
 import com.cloud.server.ManagementServerImpl;
 import com.cloud.storage.DiskOfferingVO;
 import com.cloud.storage.Storage;
+import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.VolumeVO;
 import com.cloud.storage.dao.DiskOfferingDao;
 import com.cloud.storage.dao.VolumeDao;
@@ -105,6 +107,13 @@ public class ScaleIOVMSnapshotStrategy extends ManagerBase implements VMSnapshot
             throw new CloudRuntimeException("Failed to get the volumes for the vm snapshot: " + vmSnapshot.getUuid());
         }
 
+        if (!VMSnapshot.State.Allocated.equals(vmSnapshot.getState())) {
+            List<VMSnapshotDetailsVO> vmDetails = vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), "SnapshotGroupId" );
+            if (CollectionUtils.isEmpty(vmDetails)) {
+                return StrategyPriority.CANT_HANDLE;
+            }
+        }
+
         if (volumeTOs != null && !volumeTOs.isEmpty()) {
             for (VolumeObjectTO volumeTO: volumeTOs) {
                 Long poolId  = volumeTO.getPoolId();
@@ -118,6 +127,27 @@ public class ScaleIOVMSnapshotStrategy extends ManagerBase implements VMSnapshot
         return StrategyPriority.HIGHEST;
     }
 
+    @Override
+    public StrategyPriority canHandle(Long vmId, Long rootPoolId, boolean snapshotMemory) {
+        if (snapshotMemory) {
+            return StrategyPriority.CANT_HANDLE;
+        }
+        List<VolumeObjectTO> volumeTOs = vmSnapshotHelper.getVolumeTOList(vmId);
+        if (volumeTOs == null || volumeTOs.isEmpty()) {
+            return StrategyPriority.CANT_HANDLE;
+        }
+
+        for (VolumeObjectTO volumeTO : volumeTOs) {
+            Long poolId = volumeTO.getPoolId();
+            Storage.StoragePoolType poolType = vmSnapshotHelper.getStoragePoolType(poolId);
+            if (poolType != Storage.StoragePoolType.PowerFlex || volumeTO.getFormat() != ImageFormat.RAW || poolId != rootPoolId) {
+                return StrategyPriority.CANT_HANDLE;
+            }
+        }
+
+        return StrategyPriority.HIGHEST;
+    }
+
     @Override
     public VMSnapshot takeVMSnapshot(VMSnapshot vmSnapshot) {
         UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/StorageVMSnapshotStrategy.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/StorageVMSnapshotStrategy.java
new file mode 100644
index 00000000000..958290085fc
--- /dev/null
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/vmsnapshot/StorageVMSnapshotStrategy.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.vmsnapshot;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProviderManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy.SnapshotOperation;
+import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
+import org.apache.cloudstack.engine.subsystem.api.storage.VMSnapshotOptions;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.log4j.Logger;
+
+import com.cloud.agent.api.CreateVMSnapshotAnswer;
+import com.cloud.agent.api.CreateVMSnapshotCommand;
+import com.cloud.agent.api.DeleteVMSnapshotAnswer;
+import com.cloud.agent.api.DeleteVMSnapshotCommand;
+import com.cloud.agent.api.FreezeThawVMAnswer;
+import com.cloud.agent.api.FreezeThawVMCommand;
+import com.cloud.agent.api.RevertToVMSnapshotAnswer;
+import com.cloud.agent.api.RevertToVMSnapshotCommand;
+import com.cloud.agent.api.VMSnapshotTO;
+import com.cloud.event.EventTypes;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.CreateSnapshotPayload;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.GuestOSVO;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.Storage;
+import com.cloud.storage.VolumeApiService;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.snapshot.SnapshotApiService;
+import com.cloud.storage.snapshot.SnapshotManager;
+import com.cloud.user.AccountService;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.fsm.NoTransitionException;
+import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.snapshot.VMSnapshot;
+import com.cloud.vm.snapshot.VMSnapshotDetailsVO;
+import com.cloud.vm.snapshot.VMSnapshotVO;
+import com.cloud.vm.snapshot.dao.VMSnapshotDetailsDao;
+
+public class StorageVMSnapshotStrategy extends DefaultVMSnapshotStrategy {
+    private static final Logger s_logger = Logger.getLogger(StorageVMSnapshotStrategy.class);
+    @Inject
+    VolumeApiService volumeService;
+    @Inject
+    AccountService accountService;
+    @Inject
+    VolumeDataFactory volumeDataFactory;
+    @Inject
+    SnapshotDao snapshotDao;
+    @Inject
+    StorageStrategyFactory storageStrategyFactory;
+    @Inject
+    SnapshotDataFactory snapshotDataFactory;
+    @Inject
+    PrimaryDataStoreDao storagePool;
+    @Inject
+    DataStoreProviderManager dataStoreProviderMgr;
+    @Inject
+    SnapshotApiService snapshotApiService;
+    @Inject
+    VMSnapshotDetailsDao vmSnapshotDetailsDao;
+
+    private static final String STORAGE_SNAPSHOT = "kvmStorageSnapshot";
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+       return super.configure(name, params);
+    }
+
+    @Override
+    public VMSnapshot takeVMSnapshot(VMSnapshot vmSnapshot) {
+        Long hostId = vmSnapshotHelper.pickRunningHost(vmSnapshot.getVmId());
+        UserVm userVm = userVmDao.findById(vmSnapshot.getVmId());
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+
+        CreateVMSnapshotAnswer answer = null;
+        FreezeThawVMCommand freezeCommand = null;
+        FreezeThawVMAnswer freezeAnswer = null;
+        FreezeThawVMCommand thawCmd = null;
+        FreezeThawVMAnswer thawAnswer = null;
+        List<SnapshotInfo> forRollback = new ArrayList<>();
+        long startFreeze = 0;
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, VMSnapshot.Event.CreateRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        boolean result = false;
+        try {
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            List<VolumeObjectTO> volumeTOs = vmSnapshotHelper.getVolumeTOList(userVm.getId());
+
+            long prev_chain_size = 0;
+            long virtual_size = 0;
+
+            VMSnapshotTO current = null;
+            VMSnapshotVO currentSnapshot = vmSnapshotDao.findCurrentSnapshotByVmId(userVm.getId());
+            if (currentSnapshot != null) {
+                current = vmSnapshotHelper.getSnapshotWithParents(currentSnapshot);
+            }
+            VMSnapshotOptions options = ((VMSnapshotVO) vmSnapshot).getOptions();
+            boolean quiescevm = true;
+            if (options != null) {
+                quiescevm = options.needQuiesceVM();
+            }
+            VMSnapshotTO target = new VMSnapshotTO(vmSnapshot.getId(), vmSnapshot.getName(), vmSnapshot.getType(), null, vmSnapshot.getDescription(), false, current, quiescevm);
+            if (current == null) {
+                vmSnapshotVO.setParent(null);
+            } else {
+                vmSnapshotVO.setParent(current.getId());
+            }
+            CreateVMSnapshotCommand ccmd = new CreateVMSnapshotCommand(userVm.getInstanceName(), userVm.getUuid(), target, volumeTOs,  guestOS.getDisplayName());
+            s_logger.info("Creating VM snapshot for KVM hypervisor without memory");
+
+            List<VolumeInfo> vinfos = new ArrayList<>();
+            for (VolumeObjectTO volumeObjectTO : volumeTOs) {
+                vinfos.add(volumeDataFactory.getVolume(volumeObjectTO.getId()));
+                virtual_size += volumeObjectTO.getSize();
+                VolumeVO volumeVO = volumeDao.findById(volumeObjectTO.getId());
+                prev_chain_size += volumeVO.getVmSnapshotChainSize() == null ? 0 : volumeVO.getVmSnapshotChainSize();
+            }
+
+            freezeCommand = new FreezeThawVMCommand(userVm.getInstanceName());
+            freezeCommand.setOption(FreezeThawVMCommand.FREEZE);
+            freezeAnswer = (FreezeThawVMAnswer) agentMgr.send(hostId, freezeCommand);
+            startFreeze = System.nanoTime();
+
+            thawCmd = new FreezeThawVMCommand(userVm.getInstanceName());
+            thawCmd.setOption(FreezeThawVMCommand.THAW);
+            if (freezeAnswer != null && freezeAnswer.getResult()) {
+                s_logger.info("The virtual machine is frozen");
+                for (VolumeInfo vol : vinfos) {
+                    long startSnapshtot = System.nanoTime();
+                    SnapshotInfo snapInfo = createDiskSnapshot(vmSnapshot, forRollback, vol);
+
+                    if (snapInfo == null) {
+                        thawAnswer = (FreezeThawVMAnswer) agentMgr.send(hostId, thawCmd);
+                        throw new CloudRuntimeException("Could not take snapshot for volume with id=" + vol.getId());
+                    }
+                    s_logger.info(String.format("Snapshot with id=%s, took  %s miliseconds", snapInfo.getId(),
+                            TimeUnit.MILLISECONDS.convert(elapsedTime(startSnapshtot), TimeUnit.NANOSECONDS)));
+                }
+                answer = new CreateVMSnapshotAnswer(ccmd, true, "");
+                answer.setVolumeTOs(volumeTOs);
+                thawAnswer = (FreezeThawVMAnswer) agentMgr.send(hostId, thawCmd);
+                if (thawAnswer != null && thawAnswer.getResult()) {
+                    s_logger.info(String.format(
+                            "Virtual machne is thawed. The freeze of virtual machine took %s miliseconds.",
+                            TimeUnit.MILLISECONDS.convert(elapsedTime(startFreeze), TimeUnit.NANOSECONDS)));
+                }
+            } else {
+                throw new CloudRuntimeException("Could not freeze VM." + freezeAnswer.getDetails());
+            }
+            if (answer != null && answer.getResult()) {
+                processAnswer(vmSnapshotVO, userVm, answer, null);
+                s_logger.debug("Create vm snapshot " + vmSnapshot.getName() + " succeeded for vm: " + userVm.getInstanceName());
+                long new_chain_size = 0;
+                for (VolumeObjectTO volumeTo : answer.getVolumeTOs()) {
+                    publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_CREATE, vmSnapshot, userVm, volumeTo);
+                    new_chain_size += volumeTo.getSize();
+                }
+                publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_ON_PRIMARY, vmSnapshot, userVm, new_chain_size - prev_chain_size, virtual_size);
+                result = true;
+                return vmSnapshot;
+            } else {
+                String errMsg = "Creating VM snapshot: " + vmSnapshot.getName() + " failed";
+                s_logger.error(errMsg);
+                throw new CloudRuntimeException(errMsg);
+            }
+        } catch (OperationTimedoutException e) {
+            s_logger.debug("Creating VM snapshot: " + vmSnapshot.getName() + " failed: " + e.toString());
+            throw new CloudRuntimeException(
+                    "Creating VM snapshot: " + vmSnapshot.getName() + " failed: " + e.toString());
+        } catch (AgentUnavailableException e) {
+            s_logger.debug("Creating VM snapshot: " + vmSnapshot.getName() + " failed", e);
+            throw new CloudRuntimeException(
+                    "Creating VM snapshot: " + vmSnapshot.getName() + " failed: " + e.toString());
+        } catch (CloudRuntimeException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        } finally {
+            if (thawAnswer == null && freezeAnswer != null) {
+                s_logger.info(String.format("Freeze of virtual machine took %s miliseconds.", TimeUnit.MILLISECONDS
+                                                .convert(elapsedTime(startFreeze), TimeUnit.NANOSECONDS)));
+                try {
+                    thawAnswer = (FreezeThawVMAnswer) agentMgr.send(hostId, thawCmd);
+                } catch (AgentUnavailableException | OperationTimedoutException e) {
+                    s_logger.debug("Could not unfreeze the VM due to " + e);
+                }
+            }
+            if (!result) {
+                for (SnapshotInfo snapshotInfo : forRollback) {
+                    rollbackDiskSnapshot(snapshotInfo);
+                }
+                try {
+                    List<VMSnapshotDetailsVO> vmSnapshotDetails = vmSnapshotDetailsDao.listDetails(vmSnapshot.getId());
+                    for (VMSnapshotDetailsVO vmSnapshotDetailsVO : vmSnapshotDetails) {
+                        if (vmSnapshotDetailsVO.getName().equals(STORAGE_SNAPSHOT)) {
+                            vmSnapshotDetailsDao.remove(vmSnapshotDetailsVO.getId());
+                        }
+                    }
+                    vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, VMSnapshot.Event.OperationFailed);
+                } catch (NoTransitionException e1) {
+                    s_logger.error("Cannot set vm snapshot state due to: " + e1.getMessage());
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean deleteVMSnapshot(VMSnapshot vmSnapshot) {
+        UserVmVO userVm = userVmDao.findById(vmSnapshot.getVmId());
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, VMSnapshot.Event.ExpungeRequested);
+        } catch (NoTransitionException e) {
+            s_logger.debug("Failed to change vm snapshot state with event ExpungeRequested");
+            throw new CloudRuntimeException(
+                    "Failed to change vm snapshot state with event ExpungeRequested: " + e.getMessage());
+        }
+
+        List<VolumeObjectTO> volumeTOs = vmSnapshotHelper.getVolumeTOList(vmSnapshot.getVmId());
+
+        String vmInstanceName = userVm.getInstanceName();
+        VMSnapshotTO parent = vmSnapshotHelper.getSnapshotWithParents(vmSnapshotVO).getParent();
+        VMSnapshotTO vmSnapshotTO = new VMSnapshotTO(vmSnapshot.getId(), vmSnapshot.getName(), vmSnapshot.getType(),
+                vmSnapshot.getCreated().getTime(), vmSnapshot.getDescription(), vmSnapshot.getCurrent(), parent, true);
+        GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+        DeleteVMSnapshotCommand deleteSnapshotCommand = new DeleteVMSnapshotCommand(vmInstanceName, vmSnapshotTO,
+                volumeTOs, guestOS.getDisplayName());
+
+        try {
+            deleteDiskSnapshot(vmSnapshot);
+            processAnswer(vmSnapshotVO, userVm, new DeleteVMSnapshotAnswer(deleteSnapshotCommand, volumeTOs), null);
+            long full_chain_size = 0;
+            for (VolumeObjectTO volumeTo : volumeTOs) {
+                publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_DELETE, vmSnapshot, userVm, volumeTo);
+                full_chain_size += volumeTo.getSize();
+            }
+            publishUsageEvent(EventTypes.EVENT_VM_SNAPSHOT_OFF_PRIMARY, vmSnapshot, userVm, full_chain_size, 0L);
+            return true;
+        } catch (CloudRuntimeException err) {
+            //In case of failure all volume's snapshots will be visible for delete/revert separately, because they won't be consistent
+            List<VMSnapshotDetailsVO> listSnapshots = vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+            for (VMSnapshotDetailsVO vmSnapshotDetailsVO : listSnapshots) {
+                SnapshotVO snapshot = snapshotDao.findById(Long.parseLong(vmSnapshotDetailsVO.getValue()));
+                if (snapshot != null) {
+                    snapshot.setSnapshotType((short) Snapshot.Type.MANUAL.ordinal());
+                    snapshot.setTypeDescription("MANUAL");
+                    snapshotDao.update(snapshot.getId(), snapshot);
+                    vmSnapshotDetailsDao.remove(vmSnapshotDetailsVO.getId());
+                }
+            }
+            String errMsg = String.format("Delete of VM snapshot [%s] of VM [%s] failed due to [%s]", vmSnapshot.getName(), userVm.getUserId(), err);
+            s_logger.error(errMsg, err);
+            throw new CloudRuntimeException(errMsg, err);
+        }
+    }
+
+    @Override
+    public boolean revertVMSnapshot(VMSnapshot vmSnapshot) {
+        VMSnapshotVO vmSnapshotVO = (VMSnapshotVO) vmSnapshot;
+        UserVmVO userVm = userVmDao.findById(vmSnapshot.getVmId());
+        try {
+            vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshotVO, VMSnapshot.Event.RevertRequested);
+        } catch (NoTransitionException e) {
+            throw new CloudRuntimeException(e.getMessage());
+        }
+
+        boolean result = false;
+        try {
+            List<VolumeObjectTO> volumeTOs = vmSnapshotHelper.getVolumeTOList(userVm.getId());
+            String vmInstanceName = userVm.getInstanceName();
+            VMSnapshotTO parent = vmSnapshotHelper.getSnapshotWithParents(vmSnapshotVO).getParent();
+
+            VMSnapshotTO vmSnapshotTO = new VMSnapshotTO(vmSnapshotVO.getId(), vmSnapshotVO.getName(), vmSnapshotVO.getType(),
+                                       vmSnapshotVO.getCreated().getTime(), vmSnapshotVO.getDescription(), vmSnapshotVO.getCurrent(), parent, true);
+            GuestOSVO guestOS = guestOSDao.findById(userVm.getGuestOSId());
+            RevertToVMSnapshotCommand revertToSnapshotCommand = new RevertToVMSnapshotCommand(vmInstanceName,
+                    userVm.getUuid(), vmSnapshotTO, volumeTOs, guestOS.getDisplayName());
+            List<VolumeInfo> volumeInfos = new ArrayList<>();
+            for (VolumeObjectTO volumeObjectTO : volumeTOs) {
+                volumeInfos.add(volumeDataFactory.getVolume(volumeObjectTO.getId()));
+            }
+            revertDiskSnapshot(vmSnapshot);
+            RevertToVMSnapshotAnswer answer = new RevertToVMSnapshotAnswer(revertToSnapshotCommand, true, "");
+            answer.setVolumeTOs(volumeTOs);
+            processAnswer(vmSnapshotVO, userVm, answer, null);
+            result = true;
+        } catch (CloudRuntimeException e) {
+            s_logger.error(e);
+            throw new CloudRuntimeException(e);
+        } finally {
+            if (!result) {
+                try {
+                    vmSnapshotHelper.vmSnapshotStateTransitTo(vmSnapshot, VMSnapshot.Event.OperationFailed);
+                } catch (NoTransitionException e1) {
+                    s_logger.error("Cannot set vm snapshot state due to: " + e1.getMessage());
+                }
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public StrategyPriority canHandle(VMSnapshot vmSnapshot) {
+       UserVmVO userVm = userVmDao.findById(vmSnapshot.getVmId());
+       if (!VMSnapshot.State.Allocated.equals(vmSnapshot.getState())) {
+           List<VMSnapshotDetailsVO> vmSnapshotDetails = vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+           if (CollectionUtils.isEmpty(vmSnapshotDetails)) {
+               return StrategyPriority.CANT_HANDLE;
+           }
+       }
+
+       if ( SnapshotManager.VmStorageSnapshotKvm.value() && userVm.getHypervisorType() == Hypervisor.HypervisorType.KVM
+                    && vmSnapshot.getType() == VMSnapshot.Type.Disk) {
+           return StrategyPriority.HYPERVISOR;
+       }
+       return StrategyPriority.CANT_HANDLE;
+    }
+
+    @Override
+    public StrategyPriority canHandle(Long vmId, Long rootPoolId, boolean snapshotMemory) {
+        //This check could be removed when PR #5297 is merged
+        if (vmHasNFSOrLocalVolumes(vmId)) {
+            return StrategyPriority.CANT_HANDLE;
+        }
+        if (SnapshotManager.VmStorageSnapshotKvm.value() && !snapshotMemory) {
+            UserVmVO vm = userVmDao.findById(vmId);
+            if (vm.getState() == VirtualMachine.State.Running) {
+                return StrategyPriority.HYPERVISOR;
+            }
+        }
+        return StrategyPriority.CANT_HANDLE;
+    }
+
+    @Override
+    public boolean deleteVMSnapshotFromDB(VMSnapshot vmSnapshot, boolean unmanage) {
+       return super.deleteVMSnapshotFromDB(vmSnapshot, unmanage);
+    }
+
+    private long elapsedTime(long startTime) {
+        long endTime = System.nanoTime();
+        return endTime - startTime;
+    }
+
+    //Rollback if one of disks snapshot fails
+    protected void rollbackDiskSnapshot(SnapshotInfo snapshotInfo) {
+        Long snapshotID = snapshotInfo.getId();
+        SnapshotVO snapshot = snapshotDao.findById(snapshotID);
+        deleteSnapshotByStrategy(snapshot);
+        s_logger.debug("Rollback is executed: deleting snapshot with id:" + snapshotID);
+    }
+
+    protected void deleteSnapshotByStrategy(SnapshotVO snapshot) {
+        //The snapshot could not be deleted separately, that's why we set snapshot state to BackedUp for operation delete VM snapshots and rollback
+        SnapshotStrategy strategy = storageStrategyFactory.getSnapshotStrategy(snapshot, SnapshotOperation.DELETE);
+        if (strategy != null) {
+            boolean snapshotForDelete = strategy.deleteSnapshot(snapshot.getId());
+            if (!snapshotForDelete) {
+                throw new CloudRuntimeException("Failed to delete snapshot");
+            }
+        } else {
+            throw new CloudRuntimeException("Could not find the primary storage of the snapshot");
+        }
+    }
+
+    protected void deleteDiskSnapshot(VMSnapshot vmSnapshot) {
+        //we can find disks snapshots related to vmSnapshot in vm_snapshot_details table
+        List<VMSnapshotDetailsVO> listSnapshots = vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+        for (VMSnapshotDetailsVO vmSnapshotDetailsVO : listSnapshots) {
+                SnapshotVO snapshot = snapshotDao.findById(Long.parseLong(vmSnapshotDetailsVO.getValue()));
+                if (snapshot == null) {
+                    throw new CloudRuntimeException("Could not find snapshot for VM snapshot");
+                }
+                deleteSnapshotByStrategy(snapshot);
+                vmSnapshotDetailsDao.remove(vmSnapshotDetailsVO.getId());
+        }
+    }
+
+    protected void revertDiskSnapshot(VMSnapshot vmSnapshot) {
+        List<VMSnapshotDetailsVO> listSnapshots = vmSnapshotDetailsDao.findDetails(vmSnapshot.getId(), STORAGE_SNAPSHOT);
+        for (VMSnapshotDetailsVO vmSnapshotDetailsVO : listSnapshots) {
+            SnapshotInfo sInfo = snapshotDataFactory.getSnapshot(Long.parseLong(vmSnapshotDetailsVO.getValue()), DataStoreRole.Primary);
+            SnapshotStrategy snapshotStrategy = storageStrategyFactory.getSnapshotStrategy(sInfo, SnapshotOperation.REVERT);
+            if (snapshotStrategy == null) {
+                throw new CloudRuntimeException(String.format("Could not find strategy for snapshot uuid [%s]", sInfo.getId()));
+            }
+            if (!snapshotStrategy.revertSnapshot(sInfo)) {
+                throw new CloudRuntimeException("Failed to revert snapshot");
+            }
+        }
+    }
+
+    protected SnapshotInfo createDiskSnapshot(VMSnapshot vmSnapshot, List<SnapshotInfo> forRollback, VolumeInfo vol) {
+        String snapshotName = vmSnapshot.getId() + "_" + vol.getUuid();
+        SnapshotVO snapshot = new SnapshotVO(vol.getDataCenterId(), vol.getAccountId(), vol.getDomainId(), vol.getId(), vol.getDiskOfferingId(),
+                              snapshotName, (short) Snapshot.Type.GROUP.ordinal(),  Snapshot.Type.GROUP.name(),  vol.getSize(), vol.getMinIops(),  vol.getMaxIops(), Hypervisor.HypervisorType.KVM, null);
+
+        snapshot = snapshotDao.persist(snapshot);
+        vol.addPayload(setPayload(vol, snapshot));
+        SnapshotInfo snapshotInfo = snapshotDataFactory.getSnapshot(snapshot.getId(), vol.getDataStore());
+        snapshotInfo.addPayload(vol.getpayload());
+        SnapshotStrategy snapshotStrategy = storageStrategyFactory.getSnapshotStrategy(snapshotInfo, SnapshotOperation.TAKE);
+        if (snapshotStrategy == null) {
+            throw new CloudRuntimeException("Could not find strategy for snapshot uuid:" + snapshotInfo.getUuid());
+        }
+        snapshotInfo = snapshotStrategy.takeSnapshot(snapshotInfo);
+        if (snapshotInfo == null) {
+            throw new CloudRuntimeException("Failed to create snapshot");
+        } else {
+          forRollback.add(snapshotInfo);
+        }
+        vmSnapshotDetailsDao.persist(new VMSnapshotDetailsVO(vmSnapshot.getId(), STORAGE_SNAPSHOT, String.valueOf(snapshot.getId()), true));
+        snapshotInfo.markBackedUp();
+        return snapshotInfo;
+    }
+
+    protected CreateSnapshotPayload setPayload(VolumeInfo vol, SnapshotVO snapshotCreate) {
+        CreateSnapshotPayload payload = new CreateSnapshotPayload();
+        payload.setSnapshotId(snapshotCreate.getId());
+        payload.setSnapshotPolicyId(SnapshotVO.MANUAL_POLICY_ID);
+        payload.setLocationType(snapshotCreate.getLocationType());
+        payload.setAccount(accountService.getAccount(vol.getAccountId()));
+        payload.setAsyncBackup(false);
+        payload.setQuiescevm(false);
+        return payload;
+    }
+
+    private boolean vmHasNFSOrLocalVolumes(long vmId) {
+        List<VolumeObjectTO> volumeTOs = vmSnapshotHelper.getVolumeTOList(vmId);
+
+        for (VolumeObjectTO volumeTO : volumeTOs) {
+            Long poolId = volumeTO.getPoolId();
+            Storage.StoragePoolType poolType = vmSnapshotHelper.getStoragePoolType(poolId);
+            if (poolType == Storage.StoragePoolType.NetworkFilesystem || poolType == Storage.StoragePoolType.Filesystem) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/engine/storage/snapshot/src/main/resources/META-INF/cloudstack/storage/spring-engine-storage-snapshot-storage-context.xml b/engine/storage/snapshot/src/main/resources/META-INF/cloudstack/storage/spring-engine-storage-snapshot-storage-context.xml
index 2084ce26f69..56c6fff63e0 100644
--- a/engine/storage/snapshot/src/main/resources/META-INF/cloudstack/storage/spring-engine-storage-snapshot-storage-context.xml
+++ b/engine/storage/snapshot/src/main/resources/META-INF/cloudstack/storage/spring-engine-storage-snapshot-storage-context.xml
@@ -42,6 +42,9 @@
     <bean id="DefaultVMSnapshotStrategy"
         class="org.apache.cloudstack.storage.vmsnapshot.DefaultVMSnapshotStrategy" />
 
+	<bean id="StorageVMSnapshotStrategy"
+        class="org.apache.cloudstack.storage.vmsnapshot.StorageVMSnapshotStrategy" />
+
     <bean id="ScaleIOVMSnapshotStrategy"
           class="org.apache.cloudstack.storage.vmsnapshot.ScaleIOVMSnapshotStrategy" />
 
diff --git a/engine/storage/snapshot/src/test/java/org/apache/cloudstack/storage/vmsnapshot/VMSnapshotStrategyKVMTest.java b/engine/storage/snapshot/src/test/java/org/apache/cloudstack/storage/vmsnapshot/VMSnapshotStrategyKVMTest.java
new file mode 100644
index 00000000000..7ba14c9ed15
--- /dev/null
+++ b/engine/storage/snapshot/src/test/java/org/apache/cloudstack/storage/vmsnapshot/VMSnapshotStrategyKVMTest.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.vmsnapshot;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProviderManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy.SnapshotOperation;
+import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.cloudstack.test.utils.SpringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.FilterType;
+import org.springframework.core.type.classreading.MetadataReader;
+import org.springframework.core.type.classreading.MetadataReaderFactory;
+import org.springframework.core.type.filter.TypeFilter;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.support.AnnotationConfigContextLoader;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.DeleteVMSnapshotAnswer;
+import com.cloud.agent.api.RevertToVMSnapshotAnswer;
+import com.cloud.agent.api.VMSnapshotTO;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.storage.GuestOSHypervisorVO;
+import com.cloud.storage.GuestOSVO;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VolumeApiService;
+import com.cloud.storage.dao.DiskOfferingDao;
+import com.cloud.storage.dao.GuestOSDao;
+import com.cloud.storage.dao.GuestOSHypervisorDao;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.snapshot.SnapshotApiService;
+import com.cloud.user.AccountService;
+import com.cloud.utils.component.ComponentContext;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.net.NetUtils;
+import com.cloud.vm.UserVmVO;
+import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.snapshot.VMSnapshot;
+import com.cloud.vm.snapshot.VMSnapshotDetailsVO;
+import com.cloud.vm.snapshot.VMSnapshotVO;
+import com.cloud.vm.snapshot.dao.VMSnapshotDao;
+import com.cloud.vm.snapshot.dao.VMSnapshotDetailsDao;
+
+import junit.framework.TestCase;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(loader = AnnotationConfigContextLoader.class)
+public class VMSnapshotStrategyKVMTest extends TestCase{
+    List<StoragePoolVO> storage;
+    @Inject
+    VMSnapshotHelper vmSnapshotHelper;
+    @Inject
+    GuestOSDao guestOSDao;
+    @Inject
+    GuestOSHypervisorDao guestOsHypervisorDao;
+    @Inject
+    UserVmDao userVmDao;
+    @Inject
+    VMSnapshotDao vmSnapshotDao;
+    @Inject
+    ConfigurationDao configurationDao;
+    @Inject
+    AgentManager agentMgr;
+    @Inject
+    VolumeDao volumeDao;
+    @Inject
+    DiskOfferingDao diskOfferingDao;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    VolumeApiService _volumeService;
+    @Inject
+    AccountService _accountService;
+    @Inject
+    VolumeDataFactory volumeDataFactory;
+    @Inject
+    SnapshotApiService _snapshotService;
+    @Inject
+    SnapshotDao _snapshotDao;
+    @Inject
+    StorageStrategyFactory _storageStrategyFactory;
+    @Inject
+    SnapshotDataFactory _snapshotDataFactory;
+    @Inject
+    PrimaryDataStoreDao primaryDataStoreDao;
+    @Inject
+    DataStoreManager _dataStoreMgr;
+    @Inject
+    StorageVMSnapshotStrategy vmStrategy;
+    @Inject
+    VMSnapshotDetailsDao vmSnapshotDetailsDao;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        ComponentContext.initComponentsLifeCycle();
+    }
+
+    @Test
+    public void testCreateDiskSnapshotBasedOnStrategy() throws Exception {
+        VMSnapshot vmSnapshot = Mockito.mock(VMSnapshot.class);
+        List<SnapshotInfo> forRollback = new ArrayList<>();
+        VolumeInfo vol = Mockito.mock(VolumeInfo.class);
+        SnapshotInfo snapshotInfo = Mockito.mock(SnapshotInfo.class);
+        SnapshotStrategy strategy = Mockito.mock(SnapshotStrategy.class);
+        DataStore dataStore = Mockito.mock(DataStore.class);
+        String volUuid = UUID.randomUUID().toString();
+        String vmUuid = UUID.randomUUID().toString();
+        SnapshotVO snapshot = new SnapshotVO(vol.getDataCenterId(), vol.getAccountId(), vol.getDomainId(),
+                               vol.getId(),vol.getDiskOfferingId(), vmUuid + "_" + volUuid,(short) SnapshotVO.MANUAL_POLICY_ID,
+                               "MANUAL",vol.getSize(),vol.getMinIops(),vol.getMaxIops(), Hypervisor.HypervisorType.KVM, null);
+        PowerMockito.whenNew(SnapshotVO.class).withAnyArguments().thenReturn(snapshot);
+        when(vmSnapshot.getUuid()).thenReturn(vmUuid);
+        when(vol.getUuid()).thenReturn(volUuid);
+        when(_snapshotDao.persist(any())).thenReturn(snapshot);
+        when(vol.getDataStore()).thenReturn(dataStore);
+        when(_snapshotDataFactory.getSnapshot(snapshot.getId(), vol.getDataStore())).thenReturn(snapshotInfo);
+        when(_storageStrategyFactory.getSnapshotStrategy(snapshotInfo, SnapshotOperation.TAKE)).thenReturn(strategy);
+
+        SnapshotInfo info = null;
+
+        when(strategy.takeSnapshot(any())).thenReturn(snapshotInfo);
+        VMSnapshotDetailsVO vmDetails = new VMSnapshotDetailsVO(vmSnapshot.getId(), volUuid, String.valueOf(snapshot.getId()), false);
+        PowerMockito.whenNew(VMSnapshotDetailsVO.class).withAnyArguments().thenReturn(vmDetails);
+        when(vmSnapshotDetailsDao.persist(any())).thenReturn(vmDetails);
+
+        info =  vmStrategy.createDiskSnapshot(vmSnapshot, forRollback, vol);
+        assertNotNull(info);
+    }
+
+    @Test
+    public void testRevertVMsnapshot() throws AgentUnavailableException, OperationTimedoutException{
+        Long hostId = 1L;
+        Long vmId = 1L;
+        Long guestOsId = 1L;
+        HypervisorType hypervisorType = HypervisorType.KVM;
+        String hypervisorVersion = "default";
+        String guestOsName = "Other";
+        List<VolumeObjectTO> volumeObjectTOs = new ArrayList<VolumeObjectTO>();
+        VMSnapshotVO vmSnapshot = Mockito.mock(VMSnapshotVO.class);
+        UserVmVO userVmVO = Mockito.mock(UserVmVO.class);
+        Mockito.when(userVmVO.getGuestOSId()).thenReturn(guestOsId);
+        Mockito.when(vmSnapshot.getVmId()).thenReturn(vmId);
+        Mockito.when(vmSnapshotHelper.pickRunningHost(Matchers.anyLong())).thenReturn(hostId);
+        Mockito.when(vmSnapshotHelper.getVolumeTOList(Matchers.anyLong())).thenReturn(volumeObjectTOs);
+        Mockito.when(userVmDao.findById(Matchers.anyLong())).thenReturn(userVmVO);
+        GuestOSVO guestOSVO = Mockito.mock(GuestOSVO.class);
+        Mockito.when(guestOSDao.findById(Matchers.anyLong())).thenReturn(guestOSVO);
+        GuestOSHypervisorVO guestOSHypervisorVO = Mockito.mock(GuestOSHypervisorVO.class);
+        Mockito.when(guestOSHypervisorVO.getGuestOsName()).thenReturn(guestOsName);
+        Mockito.when(guestOsHypervisorDao.findById(Matchers.anyLong())).thenReturn(guestOSHypervisorVO);
+        Mockito.when(guestOsHypervisorDao.findByOsIdAndHypervisor(Matchers.anyLong(), Matchers.anyString(), Matchers.anyString())).thenReturn(guestOSHypervisorVO);
+        VMSnapshotTO vmSnapshotTO = Mockito.mock(VMSnapshotTO.class);
+        Mockito.when(vmSnapshotHelper.getSnapshotWithParents(Matchers.any(VMSnapshotVO.class))).thenReturn(vmSnapshotTO);
+        Mockito.when(vmSnapshotDao.findById(Matchers.anyLong())).thenReturn(vmSnapshot);
+        Mockito.when(vmSnapshot.getId()).thenReturn(1L);
+        Mockito.when(vmSnapshot.getCreated()).thenReturn(new Date());
+        HostVO hostVO = Mockito.mock(HostVO.class);
+        Mockito.when(hostDao.findById(Matchers.anyLong())).thenReturn(hostVO);
+        Mockito.when(hostVO.getHypervisorType()).thenReturn(hypervisorType);
+        Mockito.when(hostVO.getHypervisorVersion()).thenReturn(hypervisorVersion);
+
+        RevertToVMSnapshotAnswer answer = Mockito.mock(RevertToVMSnapshotAnswer.class);
+        Mockito.when(answer.getResult()).thenReturn(Boolean.TRUE);
+        boolean result = vmStrategy.revertVMSnapshot(vmSnapshot);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testRevertDiskSnapshot() throws Exception {
+        VMSnapshot vmSnapshot = Mockito.mock(VMSnapshot.class);
+        VolumeInfo vol = Mockito.mock(VolumeInfo.class);
+        SnapshotVO snapshotVO = Mockito.mock(SnapshotVO.class);
+        Snapshot snap = Mockito.mock(Snapshot.class);
+        DataStore dataStore = Mockito.mock(DataStore.class);
+
+        String volUuid = UUID.randomUUID().toString();
+        String vmUuid = UUID.randomUUID().toString();
+        String name = vmUuid + "_" + volUuid;
+        when(vol.getUuid()).thenReturn(volUuid);
+        when(vmSnapshot.getUuid()).thenReturn(vmUuid);
+        when(vol.getDataStore()).thenReturn(dataStore);
+        when(snapshotVO.getId()).thenReturn(1L);
+        when(_snapshotService.revertSnapshot(snapshotVO.getId())).thenReturn(snap);
+    //    testFindSnapshotByName(name);
+        vmStrategy.revertDiskSnapshot(vmSnapshot);
+    }
+
+    @Test
+    public void testDeleteDiskSnapshot() {
+        VMSnapshot vmSnapshot = Mockito.mock(VMSnapshot.class);
+        VolumeInfo vol = Mockito.mock(VolumeInfo.class);
+        SnapshotVO snapshotVO = Mockito.mock(SnapshotVO.class);
+        Snapshot snap = Mockito.mock(Snapshot.class);
+        SnapshotInfo info = Mockito.mock(SnapshotInfo.class);
+        SnapshotStrategy strategy = Mockito.mock(SnapshotStrategy.class);
+        String volUuid = UUID.randomUUID().toString();
+        String vmUuid = UUID.randomUUID().toString();
+        String name = vmUuid + "_" + volUuid;
+        when(vol.getUuid()).thenReturn(volUuid);
+        when(vmSnapshot.getUuid()).thenReturn(vmUuid);
+        when(snapshotVO.getId()).thenReturn(1L);
+        when( _snapshotDataFactory.getSnapshot(snapshotVO.getId(), vol.getDataStore())).thenReturn(info);
+        when(_storageStrategyFactory.getSnapshotStrategy(info, SnapshotOperation.DELETE)).thenReturn(strategy);
+        testFindSnapshotByName(name);
+        vmStrategy.deleteDiskSnapshot(vmSnapshot);
+    }
+
+    @Test
+    public void testDeleteVMsnapshot() throws AgentUnavailableException, OperationTimedoutException{
+        Long hostId = 1L;
+        Long vmId = 1L;
+        Long guestOsId = 1L;
+        HypervisorType hypervisorType = HypervisorType.KVM;
+        String hypervisorVersion = "default";
+        String guestOsName = "Other";
+        List<VolumeObjectTO> volumeObjectTOs = new ArrayList<VolumeObjectTO>();
+        VMSnapshotVO vmSnapshot = Mockito.mock(VMSnapshotVO.class);
+        UserVmVO userVmVO = Mockito.mock(UserVmVO.class);
+        Mockito.when(userVmVO.getGuestOSId()).thenReturn(guestOsId);
+        Mockito.when(vmSnapshot.getVmId()).thenReturn(vmId);
+        Mockito.when(vmSnapshotHelper.pickRunningHost(Matchers.anyLong())).thenReturn(hostId);
+        Mockito.when(vmSnapshotHelper.getVolumeTOList(Matchers.anyLong())).thenReturn(volumeObjectTOs);
+        Mockito.when(userVmDao.findById(Matchers.anyLong())).thenReturn(userVmVO);
+        GuestOSVO guestOSVO = Mockito.mock(GuestOSVO.class);
+        Mockito.when(guestOSDao.findById(Matchers.anyLong())).thenReturn(guestOSVO);
+        GuestOSHypervisorVO guestOSHypervisorVO = Mockito.mock(GuestOSHypervisorVO.class);
+        Mockito.when(guestOSHypervisorVO.getGuestOsName()).thenReturn(guestOsName);
+        Mockito.when(guestOsHypervisorDao.findById(Matchers.anyLong())).thenReturn(guestOSHypervisorVO);
+        Mockito.when(guestOsHypervisorDao.findByOsIdAndHypervisor(Matchers.anyLong(), Matchers.anyString(), Matchers.anyString())).thenReturn(guestOSHypervisorVO);
+        VMSnapshotTO vmSnapshotTO = Mockito.mock(VMSnapshotTO.class);
+        Mockito.when(vmSnapshotHelper.getSnapshotWithParents(Matchers.any(VMSnapshotVO.class))).thenReturn(vmSnapshotTO);
+        Mockito.when(vmSnapshotDao.findById(Matchers.anyLong())).thenReturn(vmSnapshot);
+        Mockito.when(vmSnapshot.getId()).thenReturn(1L);
+        Mockito.when(vmSnapshot.getCreated()).thenReturn(new Date());
+        HostVO hostVO = Mockito.mock(HostVO.class);
+        Mockito.when(hostDao.findById(Matchers.anyLong())).thenReturn(hostVO);
+        Mockito.when(hostVO.getHypervisorType()).thenReturn(hypervisorType);
+        Mockito.when(hostVO.getHypervisorVersion()).thenReturn(hypervisorVersion);
+        DeleteVMSnapshotAnswer answer = Mockito.mock(DeleteVMSnapshotAnswer.class);
+        Mockito.when(answer.getResult()).thenReturn(true);
+
+        boolean result = vmStrategy.deleteVMSnapshot(vmSnapshot);
+        assertTrue(result);
+    }
+
+    @SuppressWarnings("unchecked")
+    private SnapshotVO testFindSnapshotByName(String snapshotName) {
+        SearchBuilder<SnapshotVO> sb = Mockito.mock(SearchBuilder.class);
+        when(_snapshotDao.createSearchBuilder()).thenReturn(sb);
+        SearchCriteria<SnapshotVO> sc = Mockito.mock(SearchCriteria.class);
+        when(sb.create()).thenReturn(sc);
+        SnapshotVO snap = Mockito.mock(SnapshotVO.class);
+        when(_snapshotDao.findOneBy(sc)).thenReturn(snap);
+        return snap;
+    }
+
+    @Configuration
+    @ComponentScan(basePackageClasses = {NetUtils.class, StorageVMSnapshotStrategy.class}, includeFilters = {
+            @ComponentScan.Filter(value = TestConfiguration.Library.class, type = FilterType.CUSTOM)}, useDefaultFilters = false)
+    public static class TestConfiguration extends SpringUtils.CloudStackTestConfiguration {
+
+        public static class Library implements TypeFilter {
+            @Override
+            public boolean match(MetadataReader mdr, MetadataReaderFactory arg1) throws IOException {
+                mdr.getClassMetadata().getClassName();
+                ComponentScan cs = TestConfiguration.class.getAnnotation(ComponentScan.class);
+                return SpringUtils.includedInBasePackageClasses(mdr.getClassMetadata().getClassName(), cs);
+            }
+        }
+
+        @Bean
+        public VMSnapshotHelper vmSnapshotHelper() {
+            return Mockito.mock(VMSnapshotHelper.class);
+        }
+
+        @Bean
+        public GuestOSDao guestOSDao() {
+            return Mockito.mock(GuestOSDao.class);
+        }
+
+        @Bean
+        public GuestOSHypervisorDao guestOsHypervisorDao() {
+            return Mockito.mock(GuestOSHypervisorDao.class);
+        }
+
+        @Bean
+        public UserVmDao userVmDao() {
+            return Mockito.mock(UserVmDao.class);
+        }
+
+        @Bean
+        public VMSnapshotDao vmSnapshotDao() {
+            return Mockito.mock(VMSnapshotDao.class);
+        }
+
+        @Bean
+        public ConfigurationDao configurationDao() {
+            return Mockito.mock(ConfigurationDao.class);
+        }
+
+        @Bean
+        public AgentManager agentManager() {
+            return Mockito.mock(AgentManager.class);
+        }
+
+        @Bean
+        public VolumeDao volumeDao() {
+            return Mockito.mock(VolumeDao.class);
+        }
+
+        @Bean
+        public DiskOfferingDao diskOfferingDao() {
+            return Mockito.mock(DiskOfferingDao.class);
+        }
+
+        @Bean
+        public HostDao hostDao() {
+            return Mockito.mock(HostDao.class);
+        }
+
+        @Bean
+        public VolumeApiService volumeApiService() {
+            return Mockito.mock(VolumeApiService.class);
+        }
+
+        @Bean
+        public AccountService accountService() {
+            return Mockito.mock(AccountService.class);
+        }
+
+        @Bean
+        public VolumeDataFactory volumeDataFactory() {
+            return Mockito.mock(VolumeDataFactory.class);
+        }
+
+        @Bean
+        public SnapshotApiService snapshotApiService() {
+            return Mockito.mock(SnapshotApiService.class);
+        }
+
+        @Bean
+        public SnapshotDao snapshotDao() {
+            return Mockito.mock(SnapshotDao.class);
+        }
+
+        @Bean
+        public StorageStrategyFactory storageStrategyFactory() {
+            return Mockito.mock(StorageStrategyFactory.class);
+        }
+
+        @Bean
+        public SnapshotDataFactory snapshotDataFactory() {
+            return Mockito.mock(SnapshotDataFactory.class);
+        }
+
+        @Bean
+        public VMSnapshotVO vmSnapshotVO() {
+            return Mockito.mock(VMSnapshotVO.class);
+        }
+
+        @Bean
+        protected  PrimaryDataStoreDao storagePool() {
+            return Mockito.mock(PrimaryDataStoreDao.class);
+        }
+
+        @Bean
+         public DataStoreManager dataStoreMgr() {
+            return Mockito.mock(DataStoreManager.class);
+        }
+
+        @Bean
+        public DataStoreProviderManager manager() {
+            return Mockito.mock(DataStoreProviderManager.class);
+        }
+
+        @Bean
+        public VMSnapshotDetailsDao vmSnapshotDetailsDao () {
+            return Mockito.mock(VMSnapshotDetailsDao.class);
+        }
+    }
+}
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/helper/StorageStrategyFactoryImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/helper/StorageStrategyFactoryImpl.java
index 6d578852220..9dbaf13010a 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/helper/StorageStrategyFactoryImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/helper/StorageStrategyFactoryImpl.java
@@ -84,6 +84,16 @@ public class StorageStrategyFactoryImpl implements StorageStrategyFactory {
         });
     }
 
+    @Override
+    public VMSnapshotStrategy getVmSnapshotStrategy(final Long vmId, Long rootPoolId, boolean snapshotMemory) {
+        return bestMatch(vmSnapshotStrategies, new CanHandle<VMSnapshotStrategy>() {
+            @Override
+            public StrategyPriority canHandle(VMSnapshotStrategy strategy) {
+                return strategy.canHandle(vmId, rootPoolId, snapshotMemory);
+            }
+        });
+    }
+
     private static <T> T bestMatch(Collection<T> collection, final CanHandle<T> canHandle) {
         if (collection.size() == 0)
             return null;
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtFreezeThawVMCommandWrapper.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtFreezeThawVMCommandWrapper.java
new file mode 100644
index 00000000000..808d3a20bfb
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtFreezeThawVMCommandWrapper.java
@@ -0,0 +1,103 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import org.apache.cloudstack.utils.qemu.QemuCommand;
+import org.apache.log4j.Logger;
+import org.libvirt.Connect;
+import org.libvirt.Domain;
+import org.libvirt.DomainInfo.DomainState;
+import org.libvirt.LibvirtException;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.FreezeThawVMAnswer;
+import com.cloud.agent.api.FreezeThawVMCommand;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.google.gson.JsonParser;
+
+@ResourceWrapper(handles = FreezeThawVMCommand.class)
+public class LibvirtFreezeThawVMCommandWrapper extends CommandWrapper<FreezeThawVMCommand, Answer, LibvirtComputingResource> {
+
+    private static final Logger s_logger = Logger.getLogger(LibvirtFreezeThawVMCommandWrapper.class);
+
+    @Override
+    public Answer execute(FreezeThawVMCommand command, LibvirtComputingResource serverResource) {
+        String vmName = command.getVmName();
+        Domain domain = null;
+
+        try {
+            final LibvirtUtilitiesHelper libvirtUtilitiesHelper = serverResource.getLibvirtUtilitiesHelper();
+            Connect connect = libvirtUtilitiesHelper.getConnection();
+            domain = serverResource.getDomain(connect, vmName);
+            if (domain == null) {
+                return new FreezeThawVMAnswer(command, false, String.format("Failed to %s due to %s was not found",
+                        command.getOption(), vmName));
+            }
+            DomainState domainState = domain.getInfo().state ;
+            if (domainState != DomainState.VIR_DOMAIN_RUNNING) {
+                return new FreezeThawVMAnswer(command, false,
+                        String.format("%s of VM failed due to vm %s is in %s state", command.getOption(),
+                                vmName, domainState));
+            }
+
+            String result = getResultOfQemuCommand(command.getOption(), domain);
+            s_logger.debug(String.format("Result of %s command is %s", command.getOption(), result));
+            if (result == null || (result.startsWith("error"))) {
+                return new FreezeThawVMAnswer(command, false, String.format("Failed to %s vm %s due to result status is: %s",
+                        command.getOption(), vmName, result));
+            }
+            String status = getResultOfQemuCommand(FreezeThawVMCommand.STATUS, domain);
+            s_logger.debug(String.format("Status of %s command is %s", command.getOption(), status));
+            if (status != null && new JsonParser().parse(status).isJsonObject()) {
+                String statusResult = new JsonParser().parse(status).getAsJsonObject().get("return").getAsString();
+                if (statusResult.equals(command.getOption())) {
+                    return new FreezeThawVMAnswer(command, true, String.format("%s of VM - %s is successful", command.getOption(), vmName));
+                }
+            }
+            return new FreezeThawVMAnswer(command, false, String.format("Failed to %s vm %s due to result status is: %s",
+                    command.getOption(), vmName, status));
+        } catch (LibvirtException libvirtException) {
+            return new FreezeThawVMAnswer(command, false,  String.format("Failed to %s VM - %s due to %s",
+                    command.getOption(), vmName, libvirtException.getMessage()));
+        } finally {
+            if (domain != null) {
+                try {
+                    domain.free();
+                } catch (LibvirtException e) {
+                    s_logger.trace("Ingore error ", e);
+                }
+            }
+        }
+    }
+
+    private String getResultOfQemuCommand(String cmd, Domain domain) throws LibvirtException {
+        String result = null;
+        if (cmd.equals(FreezeThawVMCommand.FREEZE)) {
+            result = domain.qemuAgentCommand(QemuCommand.buildQemuCommand(QemuCommand.AGENT_FREEZE, null), 10, 0);
+        } else if (cmd.equals(FreezeThawVMCommand.THAW)) {
+            result = domain.qemuAgentCommand(QemuCommand.buildQemuCommand(QemuCommand.AGENT_THAW, null), 10, 0);
+        } else if (cmd.equals(FreezeThawVMCommand.STATUS)) {
+            result = domain.qemuAgentCommand(QemuCommand.buildQemuCommand(QemuCommand.AGENT_FREEZE_STATUS, null), 10, 0);
+        }
+        return result;
+    }
+}
diff --git a/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuCommand.java b/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuCommand.java
new file mode 100644
index 00000000000..56d44ff51ba
--- /dev/null
+++ b/plugins/hypervisors/kvm/src/main/java/org/apache/cloudstack/utils/qemu/QemuCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.utils.qemu;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import com.google.gson.Gson;
+
+public class QemuCommand {
+    //Qemu agent commands
+    public static final String AGENT_FREEZE = "guest-fsfreeze-freeze";
+    public static final String AGENT_THAW = "guest-fsfreeze-thaw";
+    public static final String AGENT_FREEZE_STATUS = "guest-fsfreeze-status";
+
+    public static final String QEMU_CMD = "execute";
+
+    /**
+     * Used to build a command for qemu-agent-command/qemu-monitor-command<p>
+     * Examples:<p>
+     *  {"execute": "eject", "arguments": {"device": "ide1-cd0"}}<p>
+     *  {"execute":"guest-fsfreeze-status"}
+     * @param command The command that will be executed with virDomainQemuAgentCommand/virDomainQemuMonitorCommand
+     * @param args The arguments needed for the command
+     * @return String command in a Json format
+     */
+    public static String buildQemuCommand(String command, Map<String, String> args ){
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put(QEMU_CMD, command);
+        if (args != null) {
+            params.put("arguments", args);
+        }
+        return new Gson().toJson(params).toString();
+    }
+}
diff --git a/server/src/main/java/com/cloud/storage/snapshot/SnapshotManager.java b/server/src/main/java/com/cloud/storage/snapshot/SnapshotManager.java
index 4bad3469235..1012deba16f 100644
--- a/server/src/main/java/com/cloud/storage/snapshot/SnapshotManager.java
+++ b/server/src/main/java/com/cloud/storage/snapshot/SnapshotManager.java
@@ -59,6 +59,8 @@ public interface SnapshotManager extends Configurable {
     public static final ConfigKey<Boolean> BackupSnapshotAfterTakingSnapshot = new ConfigKey<Boolean>(Boolean.class, "snapshot.backup.to.secondary",  "Snapshots", "true",
             "Indicates whether to always backup primary storage snapshot to secondary storage. Keeping snapshots only on Primary storage is applicable for KVM + Ceph only.", false, ConfigKey.Scope.Global, null);
 
+    public static final ConfigKey<Boolean> VmStorageSnapshotKvm = new ConfigKey<>(Boolean.class, "kvm.vmstoragesnapshot.enabled", "Snapshots", "false", "For live snapshot of virtual machine instance on KVM hypervisor without memory. Requieres qemu version 1.6+ (on NFS or Local file system) and qemu-guest-agent installed on guest VM", true, ConfigKey.Scope.Global, null);
+
     void deletePoliciesForVolume(Long volumeId);
 
     /**
diff --git a/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java b/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java
index 63adac2d5e0..049754a04d8 100755
--- a/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/snapshot/SnapshotManagerImpl.java
@@ -224,7 +224,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
     @Override
     public ConfigKey<?>[] getConfigKeys() {
         return new ConfigKey<?>[] {BackupRetryAttempts, BackupRetryInterval, SnapshotHourlyMax, SnapshotDailyMax, SnapshotMonthlyMax, SnapshotWeeklyMax, usageSnapshotSelection,
-                BackupSnapshotAfterTakingSnapshot};
+                BackupSnapshotAfterTakingSnapshot, VmStorageSnapshotKvm};
     }
 
     @Override
@@ -288,6 +288,9 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             throw new InvalidParameterValueException("No such snapshot");
         }
 
+        if (Type.GROUP.name().equals(snapshot.getTypeDescription())) {
+            throw new InvalidParameterValueException(String.format("The snapshot [%s] is part of a [%s] snapshots and cannot be reverted separately", snapshotId, snapshot.getTypeDescription()));
+        }
         VolumeVO volume = _volsDao.findById(snapshot.getVolumeId());
         if (volume.getState() != Volume.State.Ready) {
             throw new InvalidParameterValueException("The volume is not in Ready state.");
@@ -304,7 +307,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             }
             // If target VM has associated VM snapshots then don't allow to revert from snapshot
             List<VMSnapshotVO> vmSnapshots = _vmSnapshotDao.findByVm(instanceId);
-            if (vmSnapshots.size() > 0) {
+            if (vmSnapshots.size() > 0 && !Type.GROUP.name().equals(snapshot.getTypeDescription())) {
                 throw new InvalidParameterValueException("Unable to revert snapshot for VM, please remove VM snapshots before reverting VM from snapshot");
             }
         }
@@ -575,6 +578,10 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             throw new InvalidParameterValueException("unable to find a snapshot with id " + snapshotId);
         }
 
+        if (Type.GROUP.name().equals(snapshotCheck.getTypeDescription())) {
+            throw new InvalidParameterValueException(String.format("The snapshot [%s] is part of a [%s] snapshots and cannot be deleted separately", snapshotId, snapshotCheck.getTypeDescription()));
+        }
+
         if (snapshotCheck.getState() == Snapshot.State.Destroyed) {
             throw new InvalidParameterValueException("Snapshot with id: " + snapshotId + " is already destroyed");
         }
@@ -675,7 +682,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
         sb.and("id", sb.entity().getId(), SearchCriteria.Op.EQ);
         sb.and("idIN", sb.entity().getId(), SearchCriteria.Op.IN);
         sb.and("snapshotTypeEQ", sb.entity().getSnapshotType(), SearchCriteria.Op.IN);
-        sb.and("snapshotTypeNEQ", sb.entity().getSnapshotType(), SearchCriteria.Op.NEQ);
+        sb.and("snapshotTypeNEQ", sb.entity().getSnapshotType(), SearchCriteria.Op.NIN);
         sb.and("dataCenterId", sb.entity().getDataCenterId(), SearchCriteria.Op.EQ);
 
         if (tags != null && !tags.isEmpty()) {
@@ -747,7 +754,7 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
             sc.setParameters("snapshotTypeEQ", type.ordinal());
         } else {
             // Show only MANUAL and RECURRING snapshot types
-            sc.setParameters("snapshotTypeNEQ", Snapshot.Type.TEMPLATE.ordinal());
+            sc.setParameters("snapshotTypeNEQ", Snapshot.Type.TEMPLATE.ordinal(), Snapshot.Type.GROUP.ordinal());
         }
 
         Pair<List<SnapshotVO>, Integer> result = _snapshotDao.searchAndCount(sc, searchFilter);
@@ -1354,6 +1361,8 @@ public class SnapshotManagerImpl extends MutualExclusiveIdsManagerBase implement
         StoragePoolVO storagePoolVO = _storagePoolDao.findById(storagePoolId);
         if (storagePoolVO.getPoolType() == StoragePoolType.RBD) {
             return DataStoreRole.Primary;
+        } else if (snapshot.getSnapshotType() == Type.GROUP.ordinal()) {
+            return DataStoreRole.Primary;
         }
 
         return DataStoreRole.Image;
diff --git a/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index bd66fe89ec6..41c4d491a22 100644
--- a/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -81,7 +81,6 @@ import com.cloud.storage.GuestOSVO;
 import com.cloud.storage.Snapshot;
 import com.cloud.storage.SnapshotVO;
 import com.cloud.storage.Storage;
-import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.Volume;
 import com.cloud.storage.Volume.Type;
 import com.cloud.storage.VolumeVO;
@@ -116,7 +115,6 @@ import com.cloud.vm.UserVmManager;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
-import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.VirtualMachineManager;
 import com.cloud.vm.VirtualMachineProfile;
 import com.cloud.vm.VmDetailConstants;
@@ -378,21 +376,15 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
             throw new CloudRuntimeException("Unable to find root volume storage pool for the user vm:" + userVmVo.getUuid());
         }
 
-        // for KVM, only allow snapshot with memory when VM is in running state
         if (userVmVo.getHypervisorType() == HypervisorType.KVM) {
-            if (rootVolumePool.getPoolType() != Storage.StoragePoolType.PowerFlex) {
-                if (userVmVo.getState() == State.Running && !snapshotMemory) {
-                    throw new InvalidParameterValueException("KVM VM does not allow to take a disk-only snapshot when VM is in running state");
-                }
-            } else {
-                if (snapshotMemory) {
-                    throw new InvalidParameterValueException("Can not snapshot memory for PowerFlex storage pool");
-                }
-
-                // All volumes should be on the same PowerFlex storage pool for VM Snapshot
-                if (!isVolumesOfUserVmOnSameStoragePool(userVmVo.getId(), rootVolumePool.getId())) {
-                    throw new InvalidParameterValueException("All volumes of the VM: " + userVmVo.getUuid() + " should be on the same PowerFlex storage pool");
-                }
+            //DefaultVMSnapshotStrategy - allows snapshot with memory when VM is in running state and all volumes have to be in QCOW format
+            //ScaleIOVMSnapshotStrategy - allows group snapshots without memory; all VM's volumes should be on same storage pool; The state of VM could be Running/Stopped; RAW image format is only supported
+            //StorageVMSnapshotStrategy - allows volume snapshots without memory; VM has to be in Running state; No limitation of the image format if the storage plugin supports volume snapshots; "kvm.vmstoragesnapshot.enabled" has to be enabled
+            //Other Storage volume plugins could integrate this with their own functionality for group snapshots
+            VMSnapshotStrategy snapshotStrategy = storageStrategyFactory.getVmSnapshotStrategy(userVmVo.getId(), rootVolumePool.getId(), snapshotMemory);
+
+            if (snapshotStrategy == null) {
+                throw new CloudRuntimeException("Could not find snapshot strategy for VM snapshot");
             }
         }
 
@@ -412,15 +404,6 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
             if (activeSnapshots.size() > 0) {
                 throw new CloudRuntimeException("There is other active volume snapshot tasks on the instance to which the volume is attached, please try again later.");
             }
-            if (userVmVo.getHypervisorType() == HypervisorType.KVM) {
-                if (volume.getPoolType() != Storage.StoragePoolType.PowerFlex) {
-                    if (volume.getFormat() != ImageFormat.QCOW2) {
-                        throw new CloudRuntimeException("We only support create vm snapshots from vm with QCOW2 image");
-                    }
-                } else if (volume.getFormat() != ImageFormat.RAW) {
-                    throw new CloudRuntimeException("Only support create vm snapshots for volumes on PowerFlex with RAW image");
-                }
-            }
         }
 
         // check if there are other active VM snapshot tasks
@@ -445,21 +428,6 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
         return null;
     }
 
-    private boolean isVolumesOfUserVmOnSameStoragePool(Long userVmId, Long poolId) {
-        List<VolumeVO> volumesOfVm = _volumeDao.findCreatedByInstance(userVmId);
-        if (volumesOfVm == null || volumesOfVm.isEmpty()) {
-            throw new CloudRuntimeException("Unable to find volumes for the user vm:" + userVmId);
-        }
-
-        for (VolumeVO volume : volumesOfVm) {
-            if (volume == null || volume.getPoolId() != poolId) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
     /**
      * Create, persist and return vm snapshot for userVmVo with given parameters.
      * Persistence and support for custom service offerings are done on the same transaction
@@ -566,6 +534,8 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
             if (jobResult != null) {
                 if (jobResult instanceof ConcurrentOperationException)
                     throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof CloudRuntimeException)
+                    throw (CloudRuntimeException)jobResult;
                 else if (jobResult instanceof Throwable)
                     throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
             }
@@ -602,8 +572,9 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
             VMSnapshot snapshot = strategy.takeVMSnapshot(vmSnapshot);
             return snapshot;
         } catch (Exception e) {
-            s_logger.debug("Failed to create vm snapshot: " + vmSnapshotId, e);
-            throw new CloudRuntimeException("Failed to create vm snapshot: " + vmSnapshotId, e);
+            String errMsg = String.format("Failed to create vm snapshot: [%s] due to: %s", vmSnapshotId, e.getMessage());
+            s_logger.debug(errMsg, e);
+            throw new CloudRuntimeException(errMsg, e);
         }
     }
 
@@ -671,6 +642,8 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
             if (jobResult != null) {
                 if (jobResult instanceof ConcurrentOperationException)
                     throw (ConcurrentOperationException)jobResult;
+                else if (jobResult instanceof CloudRuntimeException)
+                    throw (CloudRuntimeException)jobResult;
                 else if (jobResult instanceof Throwable)
                     throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
             }
@@ -798,6 +771,8 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
                     throw (InsufficientCapacityException)jobResult;
                 else if (jobResult instanceof ResourceUnavailableException)
                     throw (ResourceUnavailableException)jobResult;
+                else if (jobResult instanceof CloudRuntimeException)
+                    throw (CloudRuntimeException)jobResult;
                 else if (jobResult instanceof Throwable)
                     throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
             }
@@ -1047,6 +1022,8 @@ public class VMSnapshotManagerImpl extends MutualExclusiveIdsManagerBase impleme
                     throw (ConcurrentOperationException)jobResult;
                 else if (jobResult instanceof InvalidParameterValueException)
                     throw (InvalidParameterValueException)jobResult;
+                else if (jobResult instanceof CloudRuntimeException)
+                    throw (CloudRuntimeException)jobResult;
                 else if (jobResult instanceof Throwable)
                     throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
             }
diff --git a/test/integration/smoke/test_vm_snapshot_kvm.py b/test/integration/smoke/test_vm_snapshot_kvm.py
new file mode 100644
index 00000000000..1186f8e32ce
--- /dev/null
+++ b/test/integration/smoke/test_vm_snapshot_kvm.py
@@ -0,0 +1,326 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Import Local Modules
+from marvin.codes import FAILED, KVM, PASS, XEN_SERVER, RUNNING
+from nose.plugins.attrib import attr
+from marvin.cloudstackTestCase import cloudstackTestCase
+from marvin.lib.utils import random_gen, cleanup_resources, validateList, is_snapshot_on_nfs, isAlmostEqual
+from marvin.lib.base import (Account,
+                             Configurations,
+                             ServiceOffering,
+                             StoragePool,
+                             Template,
+                             VirtualMachine,
+                             VmSnapshot,
+                             Host)
+from marvin.lib.common import (get_zone,
+                               get_domain,
+                               get_template,
+                               list_snapshots,
+                               list_virtual_machines,
+                               list_configurations)
+from marvin.cloudstackAPI import (listTemplates)
+import time
+import unittest
+
+class TestVmSnapshot(cloudstackTestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        testClient = super(TestVmSnapshot, cls).getClsTestClient()
+        cls.apiclient = testClient.getApiClient()
+        cls._cleanup = []
+        cls.unsupportedHypervisor = False
+        cls.hypervisor = testClient.getHypervisorInfo()
+        if cls.hypervisor.lower() != "kvm":
+            cls.unsupportedHypervisor = True
+            return
+
+        cls.services = testClient.getParsedTestDataConfig()
+        # Get Zone, Domain and templates
+        cls.domain = get_domain(cls.apiclient)
+        cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
+
+        hosts = Host.list(
+            cls.apiclient,
+            zoneid=cls.zone.id,
+            type='Routing',
+            hypervisor='KVM')
+
+        pools = StoragePool.list(
+            cls.apiclient,
+            zoneid=cls.zone.id)
+
+        for pool in pools:
+            if pool.type == "NetworkFilesystem" or pool.type == "Filesystem":
+                raise unittest.SkipTest("Storage-based snapshots functionality is not supported for NFS/Local primary storage")
+
+        for host in hosts:
+            if host.details['Host.OS'] in ['CentOS']:
+                raise unittest.SkipTest("The standard `qemu-kvm` which is the default for CentOS does not support the new functionality. It has to be installed `qemu-kvm-ev`")
+
+        Configurations.update(cls.apiclient,
+            name = "kvm.vmstoragesnapshot.enabled",
+            value = "true")
+        #The version of CentOS has to be supported
+        templ = {
+            "name": "CentOS8",
+            "displaytext": "CentOS 8",
+            "format": "QCOW2",
+            "url": "http://download.cloudstack.org/releases/4.14/default-tmpl-centos8.0.qcow2.bz2",
+            "ispublic": "True",
+            "isextractable": "True",
+            "hypervisor": cls.hypervisor,
+            "zoneid": cls.zone.id,
+            "ostype": "CentOS 8",
+            "directdownload": True,
+        }
+
+        template = Template.register(cls.apiclient, templ, zoneid=cls.zone.id, hypervisor=cls.hypervisor)
+        if template == FAILED:
+            assert False, "get_template() failed to return template\
+                    with description %s" % cls.services["ostype"]
+
+        cls.services["domainid"] = cls.domain.id
+        cls.services["small"]["zoneid"] = cls.zone.id
+        cls.services["templates"]["ostypeid"] = template.ostypeid
+        cls.services["zoneid"] = cls.zone.id
+
+        cls.account = Account.create(
+            cls.apiclient,
+            cls.services["account"],
+            domainid=cls.domain.id
+        )
+        cls._cleanup.append(cls.account)
+
+        service_offerings_nfs = {
+            "name": "nfs",
+                "displaytext": "nfs",
+                "cpunumber": 1,
+                "cpuspeed": 500,
+                "memory": 512,
+                "storagetype": "shared",
+                "customizediops": False,
+            }
+
+        cls.service_offering = ServiceOffering.create(
+            cls.apiclient,
+            service_offerings_nfs,
+        )
+
+        cls._cleanup.append(cls.service_offering)
+
+        cls.virtual_machine = VirtualMachine.create(
+            cls.apiclient,
+            cls.services,
+            zoneid=cls.zone.id,
+            templateid=template.id,
+            accountid=cls.account.name,
+            domainid=cls.account.domainid,
+            serviceofferingid=cls.service_offering.id,
+            mode=cls.zone.networktype,
+            hypervisor=cls.hypervisor,
+            rootdisksize=20,
+        )
+        cls.random_data_0 = random_gen(size=100)
+        cls.test_dir = "/tmp"
+        cls.random_data = "random.data"
+        return
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            Configurations.update(cls.apiclient,
+            name = "kvm.vmstoragesnapshot.enabled",
+            value = "false")
+            # Cleanup resources used
+            cleanup_resources(cls.apiclient, cls._cleanup)
+        except Exception as e:
+            raise Exception("Warning: Exception during cleanup : %s" % e)
+        return
+
+    def setUp(self):
+        self.apiclient = self.testClient.getApiClient()
+        self.dbclient = self.testClient.getDbConnection()
+
+        if self.unsupportedHypervisor:
+            self.skipTest("Skipping test because unsupported hypervisor\
+                    %s" % self.hypervisor)
+        return
+
+    def tearDown(self):
+        return
+
+    @attr(tags=["advanced", "advancedns", "smoke"], required_hardware="true")
+    def test_01_create_vm_snapshots(self):
+        """Test to create VM snapshots
+        """
+        try:
+            # Login to VM and write data to file system
+            ssh_client = self.virtual_machine.get_ssh_client()
+
+            cmds = [
+                "echo %s > %s/%s" %
+                (self.random_data_0, self.test_dir, self.random_data),
+                "cat %s/%s" %
+                (self.test_dir, self.random_data)]
+
+            for c in cmds:
+                self.debug(c)
+                result = ssh_client.execute(c)
+                self.debug(result)
+        except Exception:
+            self.fail("SSH failed for Virtual machine: %s" %
+                      self.virtual_machine.ipaddress)
+        self.assertEqual(
+            self.random_data_0,
+            result[0],
+            "Check the random data has be write into temp file!"
+        )
+
+        time.sleep(30)
+
+        MemorySnapshot = False
+
+        vm_snapshot = VmSnapshot.create(
+            self.apiclient,
+            self.virtual_machine.id,
+            MemorySnapshot,
+            "TestSnapshot",
+            "Display Text"
+        )
+        self.assertEqual(
+            vm_snapshot.state,
+            "Ready",
+            "Check the snapshot of vm is ready!"
+        )
+
+        return
+
+    @attr(tags=["advanced", "advancedns", "smoke"], required_hardware="true")
+    def test_02_revert_vm_snapshots(self):
+        """Test to revert VM snapshots
+        """
+
+        try:
+            ssh_client = self.virtual_machine.get_ssh_client()
+
+            cmds = [
+                "rm -rf %s/%s" % (self.test_dir, self.random_data),
+                "ls %s/%s" % (self.test_dir, self.random_data)
+            ]
+
+            for c in cmds:
+                self.debug(c)
+                result = ssh_client.execute(c)
+                self.debug(result)
+
+        except Exception:
+            self.fail("SSH failed for Virtual machine: %s" %
+                      self.virtual_machine.ipaddress)
+
+        if str(result[0]).index("No such file or directory") == -1:
+            self.fail("Check the random data has be delete from temp file!")
+
+        time.sleep(30)
+
+        list_snapshot_response = VmSnapshot.list(
+            self.apiclient,
+            virtualmachineid=self.virtual_machine.id,
+            listall=True)
+
+        self.assertEqual(
+            isinstance(list_snapshot_response, list),
+            True,
+            "Check list response returns a valid list"
+        )
+        self.assertNotEqual(
+            list_snapshot_response,
+            None,
+            "Check if snapshot exists in ListSnapshot"
+        )
+
+        self.assertEqual(
+            list_snapshot_response[0].state,
+            "Ready",
+            "Check the snapshot of vm is ready!"
+        )
+
+        self.virtual_machine.stop(self.apiclient)
+
+        VmSnapshot.revertToSnapshot(
+            self.apiclient,
+            list_snapshot_response[0].id)
+
+        self.virtual_machine.start(self.apiclient)
+
+        try:
+            ssh_client = self.virtual_machine.get_ssh_client(reconnect=True)
+
+            cmds = [
+                "cat %s/%s" % (self.test_dir, self.random_data)
+            ]
+
+            for c in cmds:
+                self.debug(c)
+                result = ssh_client.execute(c)
+                self.debug(result)
+
+        except Exception:
+            self.fail("SSH failed for Virtual machine: %s" %
+                      self.virtual_machine.ipaddress)
+
+        self.assertEqual(
+            self.random_data_0,
+            result[0],
+            "Check the random data is equal with the ramdom file!"
+        )
+
+    @attr(tags=["advanced", "advancedns", "smoke"], required_hardware="true")
+    def test_03_delete_vm_snapshots(self):
+        """Test to delete vm snapshots
+        """
+
+        list_snapshot_response = VmSnapshot.list(
+            self.apiclient,
+            virtualmachineid=self.virtual_machine.id,
+            listall=True)
+
+        self.assertEqual(
+            isinstance(list_snapshot_response, list),
+            True,
+            "Check list response returns a valid list"
+        )
+        self.assertNotEqual(
+            list_snapshot_response,
+            None,
+            "Check if snapshot exists in ListSnapshot"
+        )
+        VmSnapshot.deleteVMSnapshot(
+            self.apiclient,
+            list_snapshot_response[0].id)
+
+        time.sleep(30)
+
+        list_snapshot_response = VmSnapshot.list(
+            self.apiclient,
+            virtualmachineid=self.virtual_machine.id,
+            listall=False)
+        self.debug('list_snapshot_response -------------------- %s' % list_snapshot_response)
+        
+        self.assertIsNone(list_snapshot_response, "snapshot is already deleted")