You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by wi...@apache.org on 2013/02/22 15:01:12 UTC

[18/58] [abbrv] refactor snapshot, move existing snapshot code into its own snapshotstrategy

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/ff047e75/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
index 58ca9a4..7df99d6 100755
--- a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
@@ -32,20 +32,21 @@ import org.apache.cloudstack.api.command.user.snapshot.DeleteSnapshotPoliciesCmd
 import org.apache.cloudstack.api.command.user.snapshot.ListSnapshotPoliciesCmd;
 import org.apache.cloudstack.api.command.user.snapshot.ListSnapshotsCmd;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.BackupSnapshotAnswer;
-import com.cloud.agent.api.BackupSnapshotCommand;
 import com.cloud.agent.api.Command;
 import com.cloud.agent.api.DeleteSnapshotBackupCommand;
 import com.cloud.agent.api.DeleteSnapshotsDirCommand;
 import com.cloud.agent.api.DownloadSnapshotFromS3Command;
-import com.cloud.agent.api.ManageSnapshotAnswer;
-import com.cloud.agent.api.ManageSnapshotCommand;
 import com.cloud.agent.api.downloadSnapshotFromSwiftCommand;
 import com.cloud.agent.api.to.S3TO;
 import com.cloud.agent.api.to.SwiftTO;
@@ -65,7 +66,6 @@ import com.cloud.event.EventTypes;
 import com.cloud.event.EventVO;
 import com.cloud.event.UsageEventUtils;
 import com.cloud.event.dao.EventDao;
-import com.cloud.event.dao.UsageEventDao;
 import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.exception.PermissionDeniedException;
 import com.cloud.exception.ResourceAllocationException;
@@ -75,7 +75,6 @@ import com.cloud.host.dao.HostDao;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.org.Grouping;
 import com.cloud.projects.Project.ListProjectResourcesCriteria;
-import com.cloud.resource.ResourceManager;
 import com.cloud.server.ResourceTag.TaggedResourceType;
 import com.cloud.storage.Snapshot;
 import com.cloud.storage.Snapshot.Type;
@@ -83,7 +82,6 @@ import com.cloud.storage.SnapshotPolicyVO;
 import com.cloud.storage.SnapshotScheduleVO;
 import com.cloud.storage.SnapshotVO;
 import com.cloud.storage.Storage;
-import com.cloud.storage.Storage.StoragePoolType;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.VMTemplateVO;
@@ -97,7 +95,6 @@ import com.cloud.storage.dao.SnapshotScheduleDao;
 import com.cloud.storage.dao.StoragePoolDao;
 import com.cloud.storage.dao.VMTemplateDao;
 import com.cloud.storage.dao.VolumeDao;
-import com.cloud.storage.listener.SnapshotStateListener;
 import com.cloud.storage.s3.S3Manager;
 import com.cloud.storage.secondary.SecondaryStorageVmManager;
 import com.cloud.storage.swift.SwiftManager;
@@ -123,14 +120,8 @@ import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.JoinBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
-import com.cloud.utils.db.Transaction;
 import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.utils.fsm.NoTransitionException;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
-import com.cloud.vm.VirtualMachine;
-import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.UserVmDao;
 
 import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotPolicyCmd;
@@ -189,8 +180,6 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     @Inject
     protected ClusterDao _clusterDao;
     @Inject
-    private UsageEventDao _usageEventDao;
-    @Inject
     private ResourceLimitService _resourceLimitMgr;
     @Inject
     private SwiftManager _swiftMgr;
@@ -199,12 +188,8 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     @Inject 
     private SecondaryStorageVmManager _ssvmMgr;
     @Inject
-    private ResourceManager _resourceMgr;
-    @Inject
     private DomainManager _domainMgr;
     @Inject
-    private VolumeDao _volumeDao;
-    @Inject
     private ResourceTagDao _resourceTagDao;
     @Inject
     private ConfigurationDao _configDao;
@@ -216,21 +201,20 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     @Inject TemplateManager templateMgr;
     @Inject VolumeManager volumeMgr;
     @Inject DataStoreManager dataStoreMgr;
+    @Inject List<SnapshotStrategy> snapshotStrategies;
+    @Inject VolumeDataFactory volFactory;
+    @Inject SnapshotDataFactory snapshotFactory;
     
 
     private int _totalRetries;
     private int _pauseInterval;
-    private int _deltaSnapshotMax;
     private int _backupsnapshotwait;
 
-    private StateMachine2<Snapshot.State, Snapshot.Event, Snapshot> _snapshotFsm;
-
     protected SearchBuilder<SnapshotVO> PolicySnapshotSearch;
     protected SearchBuilder<SnapshotPolicyVO> PoliciesForSnapSearch;
-
-    
     
-    protected Answer sendToPool(Volume vol, Command cmd) {
+    @Override
+    public Answer sendToPool(Volume vol, Command cmd) {
         StoragePool pool = (StoragePool)dataStoreMgr.getPrimaryDataStore(vol.getPoolId());
         long[] hostIdsToTryFirst = null;
         
@@ -283,126 +267,10 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     }
 
     @Override
-    public SnapshotVO createSnapshotOnPrimary(VolumeVO volume, Long policyId, Long snapshotId) {
-        SnapshotVO snapshot = _snapshotDao.findById(snapshotId);
-        if (snapshot == null) {
-            throw new CloudRuntimeException("Can not find snapshot " + snapshotId);
-        }
-
-        try {
-            stateTransitTo(snapshot, Snapshot.Event.CreateRequested);
-        } catch (NoTransitionException nte) {
-            s_logger.debug("Failed to update snapshot state due to " + nte.getMessage());
-        }
-
-        // Send a ManageSnapshotCommand to the agent
-        String vmName = this.volumeMgr.getVmNameOnVolume(volume);
-        long volumeId = volume.getId();
-        long preId = _snapshotDao.getLastSnapshot(volumeId, snapshotId);
-
-        String preSnapshotPath = null;
-        SnapshotVO preSnapshotVO = null;
-        if (preId != 0 && !(volume.getLastPoolId() != null && !volume.getLastPoolId().equals(volume.getPoolId()))) {
-            preSnapshotVO = _snapshotDao.findByIdIncludingRemoved(preId);
-            if (preSnapshotVO != null && preSnapshotVO.getBackupSnapshotId() != null) {
-                preSnapshotPath = preSnapshotVO.getPath();
-            }
-        }
-        StoragePool srcPool = (StoragePool)dataStoreMgr.getPrimaryDataStore(volume.getPoolId());
-        // RBD volumes do not support snapshotting in the way CloudStack does it.
-        // For now we leave the snapshot feature disabled for RBD volumes
-        if (srcPool.getPoolType() == StoragePoolType.RBD) {
-            throw new CloudRuntimeException("RBD volumes do not support snapshotting");
-        }
-
-        ManageSnapshotCommand cmd = new ManageSnapshotCommand(snapshotId, volume.getPath(), srcPool, preSnapshotPath, snapshot.getName(), vmName);
-      
-        ManageSnapshotAnswer answer = (ManageSnapshotAnswer) sendToPool(volume, cmd);
-        // Update the snapshot in the database
-        if ((answer != null) && answer.getResult()) {
-            // The snapshot was successfully created
-            if (preSnapshotPath != null && preSnapshotPath.equals(answer.getSnapshotPath())) {
-                // empty snapshot
-                s_logger.debug("CreateSnapshot: this is empty snapshot ");
-                try {
-                snapshot.setPath(preSnapshotPath);
-                snapshot.setBackupSnapshotId(preSnapshotVO.getBackupSnapshotId());
-                snapshot.setSwiftId(preSnapshotVO.getSwiftId());
-                snapshot.setPrevSnapshotId(preId);
-                snapshot.setSecHostId(preSnapshotVO.getSecHostId());
-                    stateTransitTo(snapshot, Snapshot.Event.OperationNotPerformed);
-                }  catch (NoTransitionException nte) {
-                    s_logger.debug("CreateSnapshot: failed to update state of snapshot due to " + nte.getMessage());
-                }
-            } else {
-                long preSnapshotId = 0;
-               
-                if (preSnapshotVO != null && preSnapshotVO.getBackupSnapshotId() != null) {
-                    preSnapshotId = preId;
-                    // default delta snap number is 16
-                    int deltaSnap = _deltaSnapshotMax;
-
-                    int i;
-                    for (i = 1; i < deltaSnap; i++) {
-                        String prevBackupUuid = preSnapshotVO.getBackupSnapshotId();
-                        // previous snapshot doesn't have backup, create a full snapshot
-                        if (prevBackupUuid == null) {
-                            preSnapshotId = 0;
-                            break;
-                        }
-                        long preSSId = preSnapshotVO.getPrevSnapshotId();
-                        if (preSSId == 0) {
-                            break;
-                        }
-                        preSnapshotVO = _snapshotDao.findByIdIncludingRemoved(preSSId);
-                    }
-                    if (i >= deltaSnap) {
-                        preSnapshotId = 0;
-                    }
-                }
-                
-                //If the volume is moved around, backup a full snapshot to secondary storage
-                if (volume.getLastPoolId() != null && !volume.getLastPoolId().equals(volume.getPoolId())) {
-                	preSnapshotId = 0;
-                	volume.setLastPoolId(volume.getPoolId());
-                	_volumeDao.update(volume.getId(), volume);
-                }
-                snapshot = updateDBOnCreate(snapshotId, answer.getSnapshotPath(), preSnapshotId);
-            }
-            // Get the snapshot_schedule table entry for this snapshot and
-            // policy id.
-            // Set the snapshotId to retrieve it back later.
-            if (policyId != Snapshot.MANUAL_POLICY_ID) {
-                SnapshotScheduleVO snapshotSchedule = _snapshotScheduleDao.getCurrentSchedule(volumeId, policyId, true);
-                assert snapshotSchedule != null;
-                snapshotSchedule.setSnapshotId(snapshotId);
-                _snapshotScheduleDao.update(snapshotSchedule.getId(), snapshotSchedule);
-            }
-
-        } else {
-            if (answer != null) {
-                s_logger.error(answer.getDetails());
-            }
-            try {
-                stateTransitTo(snapshot, Snapshot.Event.OperationFailed);
-            } catch (NoTransitionException nte) {
-                s_logger.debug("Failed to update snapshot state due to " + nte.getMessage());
-            }
-            throw new CloudRuntimeException("Creating snapshot for volume " + volumeId + " on primary storage failed.");
-        }
-
-        return snapshot;
-    }
-
-    public SnapshotVO createSnapshotImpl(long volumeId, long policyId) throws ResourceAllocationException {
-        return null;
-    }
-
-    @Override
     @DB
     @ActionEvent(eventType = EventTypes.EVENT_SNAPSHOT_CREATE, eventDescription = "creating snapshot", async = true)
-    public SnapshotVO createSnapshot(Long volumeId, Long policyId, Long snapshotId, Account snapshotOwner) {
-        VolumeVO volume = _volsDao.findById(volumeId);   
+    public Snapshot createSnapshot(Long volumeId, Long policyId, Long snapshotId, Account snapshotOwner) {
+        VolumeInfo volume = this.volFactory.getVolume(volumeId);
         if (volume == null) {
         	throw new InvalidParameterValueException("No such volume exist");
         }
@@ -411,120 +279,50 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         	throw new InvalidParameterValueException("Volume is not in ready state");
         }
         
-        SnapshotVO snapshot = null;
+        SnapshotInfo snapshot = null;
      
         boolean backedUp = false;
-        UserVmVO uservm = null;
         // does the caller have the authority to act on this volume
         _accountMgr.checkAccess(UserContext.current().getCaller(), null, true, volume);
         
-        try {
-    
-            Long poolId = volume.getPoolId();
-            if (poolId == null) {
-                throw new CloudRuntimeException("You cannot take a snapshot of a volume until it has been attached to an instance");
-            }
-
-            if (_volsDao.getHypervisorType(volume.getId()).equals(HypervisorType.KVM)) {
-            	uservm = _vmDao.findById(volume.getInstanceId());
-            	if (uservm != null && uservm.getType() != VirtualMachine.Type.User) {
-            		throw new CloudRuntimeException("Can't take a snapshot on system vm ");
-            	}
-            	
-                StoragePoolVO storagePool = _storagePoolDao.findById(volume.getPoolId());
-                ClusterVO cluster = _clusterDao.findById(storagePool.getClusterId());
-                List<HostVO> hosts = _resourceMgr.listAllHostsInCluster(cluster.getId());
-                if (hosts != null && !hosts.isEmpty()) {
-                    HostVO host = hosts.get(0);
-                    if (!hostSupportSnapsthot(host)) {
-                        throw new CloudRuntimeException("KVM Snapshot is not supported on cluster: " + host.getId());
-                    }
-                }
-            }
-
-            // if volume is attached to a vm in destroyed or expunging state; disallow
-            // if volume is attached to a vm in taking vm snapshot; disallow
-            if (volume.getInstanceId() != null) {
-                UserVmVO userVm = _vmDao.findById(volume.getInstanceId());
-                if (userVm != null) {
-                    if (userVm.getState().equals(State.Destroyed) || userVm.getState().equals(State.Expunging)) {
-                        throw new CloudRuntimeException("Creating snapshot failed due to volume:" + volumeId + " is associated with vm:" + userVm.getInstanceName() + " is in "
-                                + userVm.getState().toString() + " state");
-                    }
-                    
-                    if(userVm.getHypervisorType() == HypervisorType.VMware || userVm.getHypervisorType() == HypervisorType.KVM) {
-                        List<SnapshotVO> activeSnapshots = _snapshotDao.listByInstanceId(volume.getInstanceId(), Snapshot.State.Creating,  Snapshot.State.CreatedOnPrimary,  Snapshot.State.BackingUp);
-                        if(activeSnapshots.size() > 1)
-                            throw new CloudRuntimeException("There is other active snapshot tasks on the instance to which the volume is attached, please try again later");
-                    }
-                    /*List<VMSnapshotVO> activeVMSnapshots = _vmSnapshotDao.listByInstanceId(userVm.getId(),
-                            VMSnapshot.State.Creating, VMSnapshot.State.Reverting, VMSnapshot.State.Expunging);
-                    if (activeVMSnapshots.size() > 0) {
-                        throw new CloudRuntimeException(
-                                "There is other active vm snapshot tasks on the instance to which the volume is attached, please try again later");
-                    }			*/
-                }
-            }
-
-            snapshot = createSnapshotOnPrimary(volume, policyId, snapshotId);
-            if (snapshot != null) {
-                if (snapshot.getState() == Snapshot.State.CreatedOnPrimary) {
-                    backedUp = backupSnapshotToSecondaryStorage(snapshot);
-                } else if (snapshot.getState() == Snapshot.State.BackedUp) {
-                    // For empty snapshot we set status to BackedUp in createSnapshotOnPrimary
-                    backedUp = true;
-                } else {
-                    throw new CloudRuntimeException("Failed to create snapshot: " + snapshot + " on primary storage");
-                }
-                if (!backedUp) {
-                    throw new CloudRuntimeException("Created snapshot: " + snapshot + " on primary but failed to backup on secondary");
-                }
-            } else {
-                throw new CloudRuntimeException("Failed to create snapshot: " + snapshot + " on primary storage");
-            }
-        } finally {
-            // Cleanup jobs to do after the snapshot has been created; decrement resource count
-            if (snapshot != null) {
-                postCreateSnapshot(volumeId, snapshot.getId(), policyId, backedUp);
-                //Check if the snapshot was removed while backingUp. If yes, do not log snapshot create usage event
-                SnapshotVO freshSnapshot = _snapshotDao.findById(snapshot.getId());
-                if ((freshSnapshot != null) && backedUp) {
-                    UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_CREATE, snapshot.getAccountId(),
-                            snapshot.getDataCenterId(), snapshotId, snapshot.getName(), null, null,
-                            volume.getSize(), snapshot.getClass().getName(), snapshot.getUuid());
-                }
-                if( !backedUp ) {
-
-                } else {
-                    _resourceLimitMgr.incrementResourceCount(snapshotOwner.getId(), ResourceType.snapshot);
-                }
-            }
-
-            /*
-            try {
-            	_storageMgr.stateTransitTo(volume, Volume.Event.OperationSucceeded);
-            } catch (NoTransitionException e) {
-            	s_logger.debug("Failed to transit volume state: " + e.toString());
-            }*/
 
+        SnapshotInfo snap = this.snapshotFactory.getSnapshot(snapshotId);
+        SnapshotStrategy strategy = null;
+        for (SnapshotStrategy st : snapshotStrategies) {
+        	if (st.canHandle(snap)) {
+        		strategy = st;
+        		break;
+        	}
         }
 
-        return snapshot;
-    }
-
-    private SnapshotVO updateDBOnCreate(Long id, String snapshotPath, long preSnapshotId) {
-        SnapshotVO createdSnapshot = _snapshotDao.findByIdIncludingRemoved(id);
-        createdSnapshot.setPath(snapshotPath);
-        createdSnapshot.setPrevSnapshotId(preSnapshotId);
         try {
-            stateTransitTo(createdSnapshot, Snapshot.Event.OperationSucceeded);
-        } catch (NoTransitionException nte) {
-            s_logger.debug("Faile to update state of snapshot due to " + nte.getMessage());
+        	snapshot = strategy.takeSnapshot(volume, snapshotId);
+        	if (snapshot != null) {
+        		postCreateSnapshot(volumeId, snapshot.getId(), policyId);
+        		//Check if the snapshot was removed while backingUp. If yes, do not log snapshot create usage event
+        		SnapshotVO freshSnapshot = _snapshotDao.findById(snapshot.getId());
+        		if ((freshSnapshot != null) && backedUp) {
+        			UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_CREATE, snapshot.getAccountId(),
+        					snapshot.getDataCenterId(), snapshotId, snapshot.getName(), null, null,
+        					volume.getSize(), snapshot.getClass().getName(), snapshot.getUuid());
+        		}
+
+        		_resourceLimitMgr.incrementResourceCount(snapshotOwner.getId(), ResourceType.snapshot);
+        	}
+        	
+        	Boolean backup = Boolean.parseBoolean(this._configDao.getValue(Config.BackupSnapshotAferTakingSnapshot.toString()));
+        	if (backup) {
+        		this.backupSnapshot(snapshotId);
+        	}
+        } catch(Exception e) {
+        	s_logger.debug("Failed to create snapshot", e);
+        	throw new CloudRuntimeException("Failed to create snapshot", e);
         }
-        return createdSnapshot;
+
+        return snapshot;
     }
 
-    private static void checkObjectStorageConfiguration(SwiftTO swift, S3TO s3) {
+    private void checkObjectStorageConfiguration(SwiftTO swift, S3TO s3) {
 
         if (swift != null && s3 != null) {
             throw new CloudRuntimeException(
@@ -534,26 +332,6 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     }
 
     @Override
-    public void deleteSnapshotsForVolume (String secondaryStoragePoolUrl, Long dcId, Long accountId, Long volumeId ){
-        SwiftTO swift = _swiftMgr.getSwiftTO();
-        S3TO s3 = _s3Mgr.getS3TO();
-
-        checkObjectStorageConfiguration(swift, s3);
-
-        DeleteSnapshotBackupCommand cmd = new DeleteSnapshotBackupCommand(
-                swift, s3, secondaryStoragePoolUrl, dcId, accountId, volumeId,
-                null, true);
-        try {
-            Answer ans = _agentMgr.sendToSSVM(dcId, cmd);
-            if ( ans == null || !ans.getResult() ) {
-                s_logger.warn("DeleteSnapshotBackupCommand failed due to " + ans.getDetails() + " volume id: " + volumeId);
-            }
-        } catch (Exception e) {
-            s_logger.warn("DeleteSnapshotBackupCommand failed due to" + e.toString() + " volume id: " + volumeId);
-        }
-    }
-
-    @Override
     public void deleteSnapshotsDirForVolume(String secondaryStoragePoolUrl, Long dcId, Long accountId, Long volumeId) {
         DeleteSnapshotsDirCommand cmd = new DeleteSnapshotsDirCommand(secondaryStoragePoolUrl, dcId, accountId, volumeId);
         try {
@@ -566,6 +344,23 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         }
     }
 
+    @Override
+    public Snapshot backupSnapshot(Long snapshotId) {
+    	 SnapshotInfo snapshot = this.snapshotFactory.getSnapshot(snapshotId);
+    	 if (snapshot == null) {
+    		 throw new CloudRuntimeException("Can't find snapshot:" + snapshotId);
+    	 }
+    	 
+    	 SnapshotStrategy strategy = null;
+         for (SnapshotStrategy st : snapshotStrategies) {
+         	if (st.canHandle(snapshot)) {
+         		strategy = st;
+         		break;
+         	}
+         }
+         
+         return strategy.backupSnapshot(snapshot);
+    }
 
     @Override
     public void downloadSnapshotsFromSwift(SnapshotVO ss) {
@@ -652,133 +447,17 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         }
 
     }
-
+    
     @Override
-    @DB
-    public boolean backupSnapshotToSecondaryStorage(SnapshotVO ss) {
-        long snapshotId = ss.getId();
-        SnapshotVO snapshot = _snapshotDao.acquireInLockTable(snapshotId);
-        if (snapshot == null) {
-            throw new CloudRuntimeException("Can not acquire lock for snapshot: " + ss);
-        }
-        try {
-            try {
-                stateTransitTo(snapshot, Snapshot.Event.BackupToSecondary);
-            } catch (NoTransitionException nte) {
-                s_logger.debug("Failed to update the state of snapshot while backing up snapshot");
-            }
-
-            long volumeId = snapshot.getVolumeId();
-            VolumeVO volume = _volsDao.lockRow(volumeId, true);
-
-            Long dcId = volume.getDataCenterId();
-            Long accountId = volume.getAccountId();
-            
-            HostVO secHost = getSecHost(volumeId, volume.getDataCenterId());
-            
-            String secondaryStoragePoolUrl = secHost.getStorageUrl();
-            String snapshotUuid = snapshot.getPath();
-            // In order to verify that the snapshot is not empty,
-            // we check if the parent of the snapshot is not the same as the parent of the previous snapshot.
-            // We pass the uuid of the previous snapshot to the plugin to verify this.
-            SnapshotVO prevSnapshot = null;
-            String prevSnapshotUuid = null;
-            String prevBackupUuid = null;
-
-
-            SwiftTO swift = _swiftMgr.getSwiftTO();
-            S3TO s3 = _s3Mgr.getS3TO();
-
-            checkObjectStorageConfiguration(swift, s3);
-            
-            long prevSnapshotId = snapshot.getPrevSnapshotId();
-            if (prevSnapshotId > 0) {
-                prevSnapshot = _snapshotDao.findByIdIncludingRemoved(prevSnapshotId);
-                if ( prevSnapshot.getBackupSnapshotId() != null && swift == null) {
-                    if (prevSnapshot.getVersion() != null && prevSnapshot.getVersion().equals("2.2")) {                   
-                        prevBackupUuid = prevSnapshot.getBackupSnapshotId();
-                        prevSnapshotUuid = prevSnapshot.getPath();
-                    }
-                } else if ((prevSnapshot.getSwiftId() != null && swift != null)
-                        || (prevSnapshot.getS3Id() != null && s3 != null)) {
-                    prevBackupUuid = prevSnapshot.getBackupSnapshotId();
-                    prevSnapshotUuid = prevSnapshot.getPath();
-                }
-            }
-            boolean isVolumeInactive = this.volumeMgr.volumeInactive(volume);
-            String vmName = this.volumeMgr.getVmNameOnVolume(volume);
-            StoragePool srcPool = (StoragePool)dataStoreMgr.getPrimaryDataStore(volume.getPoolId());
-            BackupSnapshotCommand backupSnapshotCommand = new BackupSnapshotCommand(secondaryStoragePoolUrl, dcId, accountId, volumeId, snapshot.getId(), volume.getPath(), srcPool, snapshotUuid,
-                    snapshot.getName(), prevSnapshotUuid, prevBackupUuid, isVolumeInactive, vmName, _backupsnapshotwait);
-
-            if ( swift != null ) {
-                backupSnapshotCommand.setSwift(swift);
-            } else if (s3 != null) {
-                backupSnapshotCommand.setS3(s3);
-            }
-            
-            String backedUpSnapshotUuid = null;
-            // By default, assume failed.
-            boolean backedUp = false;
-            BackupSnapshotAnswer answer = (BackupSnapshotAnswer) sendToPool(volume, backupSnapshotCommand);
-            if (answer != null && answer.getResult()) {
-                backedUpSnapshotUuid = answer.getBackupSnapshotName();
-                if (backedUpSnapshotUuid != null) {
-                    backedUp = true;
-                }
-            } else if (answer != null) {
-                s_logger.error(answer.getDetails());
-            }
-            // Update the status in all cases.
-            Transaction txn = Transaction.currentTxn();
-            txn.start();
-
-            if (backedUp) {
-                if (backupSnapshotCommand.getSwift() != null ) {
-                    snapshot.setSwiftId(swift.getId());
-                    snapshot.setBackupSnapshotId(backedUpSnapshotUuid);
-                } else if (backupSnapshotCommand.getS3() != null) {
-                    snapshot.setS3Id(s3.getId());
-                    snapshot.setBackupSnapshotId(backedUpSnapshotUuid);
-                } else {
-                    snapshot.setSecHostId(secHost.getId());
-                    snapshot.setBackupSnapshotId(backedUpSnapshotUuid);
-                }
-                if (answer.isFull()) {
-                    snapshot.setPrevSnapshotId(0);
-                }
-                try {
-                    stateTransitTo(snapshot, Snapshot.Event.OperationSucceeded);
-                } catch (NoTransitionException nte) {
-                    s_logger.debug("Failed to update the state of snapshot while backing up snapshot");
-                }
-
-            } else {
-                try {
-                    stateTransitTo(snapshot, Snapshot.Event.OperationFailed);
-                } catch (NoTransitionException nte) {
-                    s_logger.debug("Failed to update the state of snapshot while backing up snapshot");
-                }
-                s_logger.warn("Failed to back up snapshot on secondary storage, deleting the record from the DB");
-                _snapshotDao.remove(snapshotId);
-            }
-            txn.commit();
-
-            return backedUp;
-        } finally {
-            if (snapshot != null) {
-                _snapshotDao.releaseFromLockTable(snapshotId);
-            }
-        }
-
-    }
-
-    private HostVO getSecHost(long volumeId, long dcId) {
-        Long id = _snapshotDao.getSecHostId(volumeId);
-        if ( id != null) { 
-            return _hostDao.findById(id);
-        }
-        return this.templateMgr.getSecondaryStorageHost(dcId);
+    public SnapshotVO getParentSnapshot(VolumeInfo volume, Snapshot snapshot) {
+    	 long preId = _snapshotDao.getLastSnapshot(volume.getId(), snapshot.getId());
+
+         SnapshotVO preSnapshotVO = null;
+         if (preId != 0 && !(volume.getLastPoolId() != null && !volume.getLastPoolId().equals(volume.getPoolId()))) {
+             preSnapshotVO = _snapshotDao.findByIdIncludingRemoved(preId);
+         }
+         
+         return preSnapshotVO;
     }
 
     private Long getSnapshotUserId() {
@@ -789,11 +468,15 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         return userId;
     }
 
-    @Override
-    @DB
-    public void postCreateSnapshot(Long volumeId, Long snapshotId, Long policyId, boolean backedUp) {
+    private void postCreateSnapshot(Long volumeId, Long snapshotId, Long policyId) {
         Long userId = getSnapshotUserId();
         SnapshotVO snapshot = _snapshotDao.findById(snapshotId);
+        if (policyId != Snapshot.MANUAL_POLICY_ID) {
+            SnapshotScheduleVO snapshotSchedule = _snapshotScheduleDao.getCurrentSchedule(volumeId, policyId, true);
+            assert snapshotSchedule != null;
+            snapshotSchedule.setSnapshotId(snapshotId);
+            _snapshotScheduleDao.update(snapshotSchedule.getId(), snapshotSchedule);
+        }
         
         if (snapshot != null && snapshot.isRecursive()) {
             postCreateRecurringSnapshotForPolicy(userId, volumeId, snapshotId, policyId);
@@ -803,7 +486,7 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     private void postCreateRecurringSnapshotForPolicy(long userId, long volumeId, long snapshotId, long policyId) {
         // Use count query
         SnapshotVO spstVO = _snapshotDao.findById(snapshotId);
-        Type type = spstVO.getType();
+        Type type = spstVO.getRecurringType();
         int maxSnaps = type.getMax();
 
         List<SnapshotVO> snaps = listSnapsforVolumeType(volumeId, type);
@@ -815,7 +498,7 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
             SnapshotVO oldestSnapshot = snaps.get(0);
             long oldSnapId = oldestSnapshot.getId();
             s_logger.debug("Max snaps: " + policy.getMaxSnaps() + " exceeded for snapshot policy with Id: " + policyId + ". Deleting oldest snapshot: " + oldSnapId);
-            if(deleteSnapshotInternal(oldSnapId)){
+            if(deleteSnapshot(oldSnapId)){
             	//log Snapshot delete event
                 ActionEventUtils.onCompletedActionEvent(User.UID_SYSTEM, oldestSnapshot.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_SNAPSHOT_DELETE, "Successfully deleted oldest snapshot: " + oldSnapId, 0);
             }
@@ -830,98 +513,38 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         Account caller = UserContext.current().getCaller();
 
         // Verify parameters
-        Snapshot snapshotCheck = _snapshotDao.findById(snapshotId);
+        SnapshotInfo snapshotCheck = this.snapshotFactory.getSnapshot(snapshotId);
         if (snapshotCheck == null) {
             throw new InvalidParameterValueException("unable to find a snapshot with id " + snapshotId);
         }
         
         _accountMgr.checkAccess(caller, null, true, snapshotCheck);
         
-        if( !Snapshot.State.BackedUp.equals(snapshotCheck.getState() ) ) {
-            throw new InvalidParameterValueException("Can't delete snapshotshot " + snapshotId + " due to it is not in BackedUp Status");
-        }
-        
-        return deleteSnapshotInternal(snapshotId);
-    }
-
-    @DB
-    private boolean deleteSnapshotInternal(Long snapshotId) {
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Calling deleteSnapshot for snapshotId: " + snapshotId);
-        }
-        SnapshotVO lastSnapshot = null;
-        SnapshotVO snapshot = _snapshotDao.findById(snapshotId);
-        if (snapshot.getBackupSnapshotId() != null) {
-            List<SnapshotVO> snaps = _snapshotDao.listByBackupUuid(snapshot.getVolumeId(), snapshot.getBackupSnapshotId());
-            if (snaps != null && snaps.size() > 1) {
-                snapshot.setBackupSnapshotId(null);
-                _snapshotDao.update(snapshot.getId(), snapshot);
-            }
-        }
-
-        Transaction txn = Transaction.currentTxn();
-        txn.start();
-        _snapshotDao.remove(snapshotId);
-        if (snapshot.getState() == Snapshot.State.BackedUp) {
-            UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_DELETE, snapshot.getAccountId(),
-                    snapshot.getDataCenterId(), snapshotId, snapshot.getName(), null, null, 0L,
-                    snapshot.getClass().getName(), snapshot.getUuid());
+        SnapshotStrategy strategy = null;
+        for (SnapshotStrategy st : snapshotStrategies) {
+        	if (st.canHandle(snapshotCheck)) {
+        		strategy = st;
+        		break;
+        	}
         }
-        _resourceLimitMgr.decrementResourceCount(snapshot.getAccountId(), ResourceType.snapshot);
-        txn.commit();
-
-        long lastId = snapshotId;
-        boolean destroy = false;
-        while (true) {
-            lastSnapshot = _snapshotDao.findNextSnapshot(lastId);
-            if (lastSnapshot == null) {
-                // if all snapshots after this snapshot in this chain are removed, remove those snapshots.
-                destroy = true;
-                break;
-            }
-            if (lastSnapshot.getRemoved() == null) {
-                // if there is one child not removed, then can not remove back up snapshot.
-                break;
-            }
-            lastId = lastSnapshot.getId();
-        }
-        if (destroy) {
-            lastSnapshot = _snapshotDao.findByIdIncludingRemoved(lastId);
-            while (lastSnapshot.getRemoved() != null) {
-                String BackupSnapshotId = lastSnapshot.getBackupSnapshotId();
-                if (BackupSnapshotId != null) {
-                    List<SnapshotVO> snaps = _snapshotDao.listByBackupUuid(lastSnapshot.getVolumeId(), BackupSnapshotId);
-                    if (snaps != null && snaps.size() > 1) {
-                        lastSnapshot.setBackupSnapshotId(null);
-                        _snapshotDao.update(lastSnapshot.getId(), lastSnapshot);
-                    } else {
-                        if (destroySnapshotBackUp(lastId)) {
-
-                        } else {
-                            s_logger.debug("Destroying snapshot backup failed " + lastSnapshot);
-                            break;
-                        }
-                    }
-                }
-                lastId = lastSnapshot.getPrevSnapshotId();
-                if (lastId == 0) {
-                    break;
-                }
-                lastSnapshot = _snapshotDao.findByIdIncludingRemoved(lastId);
-            }
+        try {
+        	boolean result = strategy.deleteSnapshot(snapshotCheck);
+        	if (result) {
+        		if (snapshotCheck.getState() == Snapshot.State.BackedUp) {
+        			UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_DELETE, snapshotCheck.getAccountId(),
+        					snapshotCheck.getDataCenterId(), snapshotId, snapshotCheck.getName(), null, null, 0L,
+        					snapshotCheck.getClass().getName(), snapshotCheck.getUuid());
+        		}
+        		_resourceLimitMgr.decrementResourceCount(snapshotCheck.getAccountId(), ResourceType.snapshot);
+        	}
+        	return result;
+        } catch (Exception e) {
+        	s_logger.debug("Failed to delete snapshot: " + snapshotCheck.getId() + ":" + e.toString());
+        	throw new CloudRuntimeException("Failed to delete snapshot:" + e.toString());
         }
-        return true;
-    }
-
-    @Override
-    @DB
-    public boolean destroySnapshot(long userId, long snapshotId, long policyId) {
-        return true;
     }
 
-
-    @Override
-    public HostVO getSecondaryStorageHost(SnapshotVO snapshot) {
+    private HostVO getSecondaryStorageHost(SnapshotVO snapshot) {
         HostVO secHost = null;
         if( snapshot.getSwiftId() == null || snapshot.getSwiftId() == 0) {
             secHost = _hostDao.findById(snapshot.getSecHostId());
@@ -942,51 +565,6 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     }
 
     @Override
-    @DB
-    public boolean destroySnapshotBackUp(long snapshotId) {
-        boolean success = false;
-        String details;
-        SnapshotVO snapshot = _snapshotDao.findByIdIncludingRemoved(snapshotId);
-        if (snapshot == null) {
-            throw new CloudRuntimeException("Destroying snapshot " + snapshotId + " backup failed due to unable to find snapshot ");
-        }
-        String secondaryStoragePoolUrl = getSecondaryStorageURL(snapshot);
-        Long dcId = snapshot.getDataCenterId();
-        Long accountId = snapshot.getAccountId();
-        Long volumeId = snapshot.getVolumeId();
-
-        String backupOfSnapshot = snapshot.getBackupSnapshotId();
-        if (backupOfSnapshot == null) {
-            return true;
-        }
-        SwiftTO swift = _swiftMgr.getSwiftTO(snapshot.getSwiftId());
-        S3TO s3 = _s3Mgr.getS3TO();
-
-        checkObjectStorageConfiguration(swift, s3);
-
-        DeleteSnapshotBackupCommand cmd = new DeleteSnapshotBackupCommand(
-                swift, s3, secondaryStoragePoolUrl, dcId, accountId, volumeId,
-                backupOfSnapshot, false);
-        Answer answer = _agentMgr.sendToSSVM(dcId, cmd);
-
-        if ((answer != null) && answer.getResult()) {
-            snapshot.setBackupSnapshotId(null);
-            _snapshotDao.update(snapshotId, snapshot);
-            success = true;
-            details = "Successfully deleted snapshot " + snapshotId + " for volumeId: " + volumeId;
-            s_logger.debug(details);
-        } else if (answer != null) {
-            details = "Failed to destroy snapshot id:" + snapshotId + " for volume: " + volumeId + " due to ";
-            if (answer.getDetails() != null) {
-                details += answer.getDetails();
-            }
-            s_logger.error(details);
-        }
-        return success;
-
-    }
-
-    @Override
     public Pair<List<? extends Snapshot>, Integer> listSnapshots(ListSnapshotsCmd cmd) {
         Long volumeId = cmd.getVolumeId();
         String name = cmd.getSnapshotName();
@@ -1161,7 +739,7 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
             List<SnapshotVO> snapshots = listSnapsforVolume(volumeId);
             for (SnapshotVO snapshot : snapshots) {
                 if (_snapshotDao.expunge(snapshot.getId())) {
-                    if (snapshot.getType() == Type.MANUAL) {
+                    if (snapshot.getRecurringType() == Type.MANUAL) {
                         _resourceLimitMgr.decrementResourceCount(accountId, ResourceType.snapshot);
                     }
 
@@ -1271,8 +849,7 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         return policy;
     }
 
-    @Override
-    public boolean deletePolicy(long userId, Long policyId) {
+    protected boolean deletePolicy(long userId, Long policyId) {
         SnapshotPolicyVO snapshotPolicy = _snapshotPolicyDao.findById(policyId);
         _snapSchedMgr.removeSchedule(snapshotPolicy.getVolumeId(), snapshotPolicy.getId());
         return _snapshotPolicyDao.remove(policyId);
@@ -1290,31 +867,16 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         return new Pair<List<? extends SnapshotPolicy>, Integer>(result.first(), result.second());
     }
 
-    @Override
-    public List<SnapshotPolicyVO> listPoliciesforVolume(long volumeId) {
+    
+    private List<SnapshotPolicyVO> listPoliciesforVolume(long volumeId) {
         return _snapshotPolicyDao.listByVolumeId(volumeId);
     }
-
-    @Override
-    public List<SnapshotPolicyVO> listPoliciesforSnapshot(long snapshotId) {
-        SearchCriteria<SnapshotPolicyVO> sc = PoliciesForSnapSearch.create();
-        sc.setJoinParameters("policyRef", "snapshotId", snapshotId);
-        return _snapshotPolicyDao.search(sc, null);
-    }
-
-    @Override
-    public List<SnapshotVO> listSnapsforPolicy(long policyId, Filter filter) {
-        SearchCriteria<SnapshotVO> sc = PolicySnapshotSearch.create();
-        sc.setJoinParameters("policy", "policyId", policyId);
-        return _snapshotDao.search(sc, filter);
-    }
-
-    @Override
-    public List<SnapshotVO> listSnapsforVolume(long volumeId) {
+    
+    private List<SnapshotVO> listSnapsforVolume(long volumeId) {
         return _snapshotDao.listByVolumeId(volumeId);
     }
 
-    public List<SnapshotVO> listSnapsforVolumeType(long volumeId, Type type) {
+    private List<SnapshotVO> listSnapsforVolumeType(long volumeId, Type type) {
         return _snapshotDao.listByVolumeIdType(volumeId, type);
     }
 
@@ -1333,9 +895,6 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         }
     }
 
-    /**
-     * {@inheritDoc}
-     */
     @Override
     public List<SnapshotScheduleVO> findRecurringSnapshotSchedule(ListRecurringSnapshotScheduleCmd cmd) {
         Long volumeId = cmd.getVolumeId();
@@ -1374,12 +933,7 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         return snapshotSchedules;
     }
 
-    @Override
-    public SnapshotPolicyVO getPolicyForVolume(long volumeId) {
-        return _snapshotPolicyDao.findOneByVolume(volumeId);
-    }
-
-    public Type getSnapshotType(Long policyId) {
+    private Type getSnapshotType(Long policyId) {
         if (policyId.equals(Snapshot.MANUAL_POLICY_ID)) {
             return Type.MANUAL;
         } else {
@@ -1389,7 +943,7 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         }
     }
 
-    public Type getSnapshotType(IntervalType intvType) {
+    private Type getSnapshotType(IntervalType intvType) {
         if (intvType.equals(IntervalType.HOURLY)) {
             return Type.HOURLY;
         } else if (intvType.equals(IntervalType.DAILY)) {
@@ -1489,15 +1043,11 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
         Type.DAILY.setMax(NumbersUtil.parseInt(_configDao.getValue("snapshot.max.daily"), DAILYMAX));
         Type.WEEKLY.setMax(NumbersUtil.parseInt(_configDao.getValue("snapshot.max.weekly"), WEEKLYMAX));
         Type.MONTHLY.setMax(NumbersUtil.parseInt(_configDao.getValue("snapshot.max.monthly"), MONTHLYMAX));
-        _deltaSnapshotMax = NumbersUtil.parseInt(_configDao.getValue("snapshot.delta.max"), DELTAMAX);
         _totalRetries = NumbersUtil.parseInt(_configDao.getValue("total.retries"), 4);
         _pauseInterval = 2 * NumbersUtil.parseInt(_configDao.getValue("ping.interval"), 60);
 
         s_logger.info("Snapshot Manager is configured.");
 
-        _snapshotFsm = Snapshot.State.getStateMachine();
-        _snapshotFsm.registerListener(new SnapshotStateListener());
-
         return true;
     }
 
@@ -1558,24 +1108,6 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
 
         return success;
     }
-
-    private boolean hostSupportSnapsthot(HostVO host) {
-        if (host.getHypervisorType() != HypervisorType.KVM) {
-            return true;
-        }
-        // Determine host capabilities
-        String caps = host.getCapabilities();
-
-        if (caps != null) {
-            String[] tokens = caps.split(",");
-            for (String token : tokens) {
-                if (token.contains("snapshot")) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
     
     @Override
     public boolean canOperateOnVolume(Volume volume) {
@@ -1586,8 +1118,4 @@ public class SnapshotManagerImpl extends ManagerBase implements SnapshotManager,
     	}
     	return true;
     }
-
-    protected boolean stateTransitTo(Snapshot snapshot, Snapshot.Event e) throws NoTransitionException {
-        return _snapshotFsm.transitTo(snapshot, e, null, _snapshotDao);
-    }
 }