You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/25 01:52:14 UTC

[38/50] [abbrv] merge is complete

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7b7db056/server/src/com/cloud/storage/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/storage/VolumeManagerImpl.java
index 8726465,4e7b335..235adf6
--- a/server/src/com/cloud/storage/VolumeManagerImpl.java
+++ b/server/src/com/cloud/storage/VolumeManagerImpl.java
@@@ -36,6 -31,6 +31,7 @@@ import java.util.concurrent.ExecutionEx
  import javax.inject.Inject;
  import javax.naming.ConfigurationException;
  
++import org.apache.commons.lang.StringUtils;
  import org.apache.log4j.Logger;
  import org.springframework.stereotype.Component;
  
@@@ -43,12 -38,13 +39,12 @@@ import org.apache.cloudstack.api.BaseCm
  import org.apache.cloudstack.api.command.user.volume.AttachVolumeCmd;
  import org.apache.cloudstack.api.command.user.volume.CreateVolumeCmd;
  import org.apache.cloudstack.api.command.user.volume.DetachVolumeCmd;
+ import org.apache.cloudstack.api.command.user.volume.ExtractVolumeCmd;
  import org.apache.cloudstack.api.command.user.volume.MigrateVolumeCmd;
  import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.UpdateVolumeCmd;
  import org.apache.cloudstack.api.command.user.volume.UploadVolumeCmd;
 -
 -import com.cloud.storage.dao.*;
 -import org.apache.cloudstack.api.command.user.volume.*;
 +import org.apache.cloudstack.context.CallContext;
- import org.apache.cloudstack.engine.subsystem.api.storage.CommandResult;
  import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
  import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
  import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProviderManager;
@@@ -66,20 -60,32 +60,30 @@@ import org.apache.cloudstack.engine.sub
  import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
  import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult;
  import org.apache.cloudstack.framework.async.AsyncCallFuture;
 +import org.apache.cloudstack.framework.jobs.AsyncJob;
 +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 +import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+ import org.apache.cloudstack.storage.command.AttachAnswer;
+ import org.apache.cloudstack.storage.command.AttachCommand;
+ import org.apache.cloudstack.storage.command.CommandResult;
+ import org.apache.cloudstack.storage.command.DettachCommand;
  import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
  import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+ import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+ import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+ import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
  
  import com.cloud.agent.AgentManager;
  import com.cloud.agent.api.Answer;
- import com.cloud.agent.api.AttachVolumeAnswer;
- import com.cloud.agent.api.AttachVolumeCommand;
+ import com.cloud.agent.api.storage.CreateVolumeOVAAnswer;
+ import com.cloud.agent.api.storage.CreateVolumeOVACommand;
+ import com.cloud.agent.api.to.DataTO;
+ import com.cloud.agent.api.to.DiskTO;
  import com.cloud.agent.api.to.VirtualMachineTO;
- import com.cloud.agent.api.to.VolumeTO;
  import com.cloud.alert.AlertManager;
  import com.cloud.api.ApiDBUtils;
 -import com.cloud.async.AsyncJobExecutor;
 -import com.cloud.async.AsyncJobManager;
 -import com.cloud.async.AsyncJobVO;
 -import com.cloud.async.BaseAsyncJobExecutor;
  import com.cloud.capacity.CapacityManager;
  import com.cloud.capacity.dao.CapacityDao;
  import com.cloud.configuration.Config;
@@@ -135,8 -140,6 +138,7 @@@ import com.cloud.storage.dao.VMTemplate
  import com.cloud.storage.dao.VMTemplateS3Dao;
  import com.cloud.storage.dao.VMTemplateSwiftDao;
  import com.cloud.storage.dao.VolumeDao;
 +import com.cloud.storage.dao.VolumeDetailsDao;
- import com.cloud.storage.dao.VolumeHostDao;
  import com.cloud.storage.download.DownloadMonitor;
  import com.cloud.storage.s3.S3Manager;
  import com.cloud.storage.secondary.SecondaryStorageVmManager;
@@@ -333,8 -341,7 +339,7 @@@ public class VolumeManagerImpl extends 
      private final int _customDiskOfferingMaxSize = 1024;
      private long _maxVolumeSizeInGb;
      private boolean _recreateSystemVmEnabled;
-     protected SearchBuilder<VMTemplateHostVO> HostTemplateStatesSearch;
 -
 +    
      public VolumeManagerImpl() {
          _volStateMachine = Volume.State.getStateMachine();
      }
@@@ -509,9 -516,10 +514,10 @@@
          newVol.setDeviceId(oldVol.getDeviceId());
          newVol.setInstanceId(oldVol.getInstanceId());
          newVol.setRecreatable(oldVol.isRecreatable());
+         newVol.setFormat(oldVol.getFormat());
          return _volsDao.persist(newVol);
      }
 -
 +    
      @DB
      protected VolumeInfo createVolumeFromSnapshot(VolumeVO volume,
              SnapshotVO snapshot) {
@@@ -538,13 -546,13 +544,13 @@@
              while ((pool = storageMgr.findStoragePool(dskCh, dc, pod.first(), null, null,
                      null, poolsToAvoid)) != null) {
                  break;
 -
 +                
              }
          }
 -
 +        
          VolumeInfo vol = volFactory.getVolume(volume.getId());
          DataStore store = dataStoreMgr.getDataStore(pool.getId(), DataStoreRole.Primary);
-         SnapshotInfo snapInfo = snapshotFactory.getSnapshot(snapshot.getId());
+         SnapshotInfo snapInfo = snapshotFactory.getSnapshot(snapshot.getId(), DataStoreRole.Image);
          AsyncCallFuture<VolumeApiResult> future = volService.createVolumeFromSnapshot(vol, store, snapInfo);
          try {
              VolumeApiResult result = future.get();
@@@ -742,15 -744,11 +742,11 @@@
          // volume.setSize(size);
          volume.setInstanceId(null);
          volume.setUpdated(new Date());
- 
+         volume.setDomainId((owner == null) ? Domain.ROOT_DOMAIN : owner
+                 .getDomainId());
+         volume.setFormat(ImageFormat.valueOf(format));
          volume = _volsDao.persist(volume);
-         try {
-             stateTransitTo(volume, Event.UploadRequested);
-         } catch (NoTransitionException e) {
-             // TODO Auto-generated catch block
-             e.printStackTrace();
-         }
 -        UserContext.current().setEventDetails("Volume Id: " + volume.getId());
 +        CallContext.current().setEventDetails("Volume Id: " + volume.getId());
  
          // Increment resource count during allocation; if actual creation fails,
          // decrement it
@@@ -1370,6 -1369,7 +1367,7 @@@
              vol.setDeviceId(1l);
          }
  
 -        vol.setFormat(this.getSupportedImageFormatForCluster(vm.getHypervisorType()));
++        vol.setFormat(getSupportedImageFormatForCluster(vm.getHypervisorType()));
          vol = _volsDao.persist(vol);
  
          // Save usage event and update resource count for user vm volumes
@@@ -1399,6 -1399,7 +1397,7 @@@
  
          VolumeVO vol = new VolumeVO(type, name, vm.getDataCenterId(),
                  owner.getDomainId(), owner.getId(), offering.getId(), size);
 -        vol.setFormat(this.getSupportedImageFormatForCluster(template.getHypervisorType()));
++        vol.setFormat(getSupportedImageFormatForCluster(template.getHypervisorType()));
          if (vm != null) {
              vol.setInstanceId(vm.getId());
          }
@@@ -1437,18 -1438,16 +1436,16 @@@
          }
          return toDiskProfile(vol, offering);
      }
 -
 + 
-     private String getSupportedImageFormatForCluster(Long clusterId) {
-         ClusterVO cluster = ApiDBUtils.findClusterById(clusterId);
- 
-         if (cluster.getHypervisorType() == HypervisorType.XenServer) {
-             return "vhd";
-         } else if (cluster.getHypervisorType() == HypervisorType.KVM) {
-             return "qcow2";
-         } else if (cluster.getHypervisorType() == HypervisorType.VMware) {
-             return "ova";
-         } else if (cluster.getHypervisorType() == HypervisorType.Ovm) {
-             return "raw";
+     private  ImageFormat getSupportedImageFormatForCluster(HypervisorType hyperType) {
+         if (hyperType == HypervisorType.XenServer) {
+             return ImageFormat.VHD;
+         } else if (hyperType == HypervisorType.KVM) {
+             return ImageFormat.QCOW2;
+         } else if (hyperType == HypervisorType.VMware) {
+             return ImageFormat.OVA;
+         } else if (hyperType == HypervisorType.Ovm) {
+             return ImageFormat.RAW;
          } else {
              return null;
          }
@@@ -1513,7 -1510,11 +1508,11 @@@
                          ResourceType.primary_storage, new Long(volume.getSize()));
              }
          }
-         return vol;
+ 
 -        VolumeVO volVO = this._volsDao.findById(vol.getId());
 -        volVO.setFormat(this.getSupportedImageFormatForCluster(rootDiskHyperType));
 -        this._volsDao.update(volVO.getId(), volVO);
 -        return this.volFactory.getVolume(volVO.getId());
++        VolumeVO volVO = _volsDao.findById(vol.getId());
++        volVO.setFormat(getSupportedImageFormatForCluster(rootDiskHyperType));
++        _volsDao.update(volVO.getId(), volVO);
++        return volFactory.getVolume(volVO.getId());
      }
  
      private boolean needMoveVolume(VolumeVO rootVolumeOfVm, VolumeInfo volume) {
@@@ -2246,20 -2239,14 +2226,14 @@@
          if (vm.getType() == VirtualMachine.Type.User) {
              UserVmVO userVM = (UserVmVO) vm.getVirtualMachine();
              if (userVM.getIsoId() != null) {
-                 Pair<String, String> isoPathPair = _tmpltMgr.getAbsoluteIsoPath(
-                         userVM.getIsoId(), userVM.getDataCenterId());
-                 if (isoPathPair != null) {
-                     String isoPath = isoPathPair.first();
-                     VolumeTO iso = new VolumeTO(vm.getId(), Volume.Type.ISO,
-                             StoragePoolType.ISO, null, null, null, isoPath, 0,
-                             null, null);
+                 DataTO dataTO = tmplFactory.getTemplate(userVM.getIsoId(), DataStoreRole.Image, userVM.getDataCenterId()).getTO();
+                 DiskTO iso = new DiskTO(dataTO, 3L, Volume.Type.ISO);
 -                vm.addDisk(iso);
 +                    vm.addDisk(iso);
 +                }
              }
          }
--    }
 -
  
 +   
  
      private static enum VolumeTaskType {
          RECREATE,
@@@ -2470,10 -2457,12 +2444,12 @@@
                  pool = (StoragePool)dataStoreMgr.getDataStore(result.second().getId(), DataStoreRole.Primary);
                  vol = result.first();
              }
-             vm.addDisk(new VolumeTO(vol, pool));
+             DataTO volumeTO = volFactory.getVolume(vol.getId()).getTO();
+             DiskTO disk = new DiskTO(volumeTO, vol.getDeviceId(), vol.getVolumeType());
+             vm.addDisk(disk);
          }
      }
 -
 +    
      private Long getDeviceId(long vmId, Long deviceId) {
          // allocate deviceId
          List<VolumeVO> vols = _volsDao.findByInstance(vmId);
@@@ -2509,49 -2498,9 +2485,9 @@@
          return _volStateMachine.transitTo(vol, event, null, _volsDao);
      }
  
 +    
-     private String validateUrl(String url) {
-         try {
-             URI uri = new URI(url);
-             if ((uri.getScheme() == null)
-                     || (!uri.getScheme().equalsIgnoreCase("http")
-                             && !uri.getScheme().equalsIgnoreCase("https") && !uri
-                             .getScheme().equalsIgnoreCase("file"))) {
-                 throw new IllegalArgumentException(
-                         "Unsupported scheme for url: " + url);
-             }
  
-             int port = uri.getPort();
-             if (!(port == 80 || port == 443 || port == -1)) {
-                 throw new IllegalArgumentException(
-                         "Only ports 80 and 443 are allowed");
-             }
-             String host = uri.getHost();
-             try {
-                 InetAddress hostAddr = InetAddress.getByName(host);
-                 if (hostAddr.isAnyLocalAddress()
-                         || hostAddr.isLinkLocalAddress()
-                         || hostAddr.isLoopbackAddress()
-                         || hostAddr.isMulticastAddress()) {
-                     throw new IllegalArgumentException(
-                             "Illegal host specified in url");
-                 }
-                 if (hostAddr instanceof Inet6Address) {
-                     throw new IllegalArgumentException(
-                             "IPV6 addresses not supported ("
-                                     + hostAddr.getHostAddress() + ")");
-                 }
-             } catch (UnknownHostException uhe) {
-                 throw new IllegalArgumentException("Unable to resolve " + host);
-             }
--
-             return uri.toString();
-         } catch (URISyntaxException e) {
-             throw new IllegalArgumentException("Invalid URL " + url);
-         }
--
-     }
 +    
      @Override
      public boolean canVmRestartOnAnotherServer(long vmId) {
          List<VolumeVO> vols = _volsDao.findCreatedByInstance(vmId);
@@@ -2628,6 -2562,188 +2549,188 @@@
  
  
      @Override
+     public Snapshot takeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account) throws ResourceAllocationException {
+         VolumeInfo volume = volFactory.getVolume(volumeId);
+         if (volume == null) {
+             throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
+         }
+ 
+         if (volume.getState() != Volume.State.Ready) {
+             throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
+         }
+ 
+         CreateSnapshotPayload payload = new CreateSnapshotPayload();
+         payload.setSnapshotId(snapshotId);
+         payload.setSnapshotPolicyId(policyId);
+         payload.setAccount(account);
+         volume.addPayload(payload);
+         return volService.takeSnapshot(volume);
+     }
+ 
+     @Override
+     public Snapshot allocSnapshot(Long volumeId, Long policyId) throws ResourceAllocationException {
 -        Account caller = UserContext.current().getCaller();
++        Account caller = CallContext.current().getCallingAccount();
+ 
+         VolumeInfo volume = volFactory.getVolume(volumeId);
+         if (volume == null) {
+             throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
+         }
+         DataCenter zone = _dcDao.findById(volume.getDataCenterId());
+         if (zone == null) {
+             throw new InvalidParameterValueException("Can't find zone by id " + volume.getDataCenterId());
+         }
+ 
+         if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(caller.getType())) {
+             throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zone.getName());
+         }
+ 
+         if (volume.getState() != Volume.State.Ready) {
+             throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
+         }
+ 
+         if ( volume.getTemplateId() != null ) {
+             VMTemplateVO  template = _templateDao.findById(volume.getTemplateId());
+             if( template != null && template.getTemplateType() == Storage.TemplateType.SYSTEM ) {
+                 throw new InvalidParameterValueException("VolumeId: " + volumeId + " is for System VM , Creating snapshot against System VM volumes is not supported");
+             }
+         }
+ 
+         StoragePool storagePool = (StoragePool)volume.getDataStore();
+         if (storagePool == null) {
+             throw new InvalidParameterValueException("VolumeId: " + volumeId + " please attach this volume to a VM before create snapshot for it");
+         }
+ 
+         return snapshotMgr.allocSnapshot(volumeId, policyId);
+     }
+ 
+ 
+     @Override
+     @ActionEvent(eventType = EventTypes.EVENT_VOLUME_EXTRACT, eventDescription = "extracting volume", async = true)
+     public String extractVolume(ExtractVolumeCmd cmd) {
+         Long volumeId = cmd.getId();
+         Long zoneId = cmd.getZoneId();
+         String mode = cmd.getMode();
 -        Account account = UserContext.current().getCaller();
++        Account account = CallContext.current().getCallingAccount();
+ 
+         if (!_accountMgr.isRootAdmin(account.getType()) && ApiDBUtils.isExtractionDisabled()) {
+             throw new PermissionDeniedException("Extraction has been disabled by admin");
+         }
+ 
+         VolumeVO volume = _volumeDao.findById(volumeId);
+         if (volume == null) {
+             InvalidParameterValueException ex = new InvalidParameterValueException("Unable to find volume with specified volumeId");
+             ex.addProxyObject(volumeId.toString(), "volumeId");
+             throw ex;
+         }
+ 
+         // perform permission check
+         _accountMgr.checkAccess(account, null, true, volume);
+ 
+         if (_dcDao.findById(zoneId) == null) {
+             throw new InvalidParameterValueException("Please specify a valid zone.");
+         }
+         if (volume.getPoolId() == null) {
+             throw new InvalidParameterValueException("The volume doesnt belong to a storage pool so cant extract it");
+         }
+         // Extract activity only for detached volumes or for volumes whose
+         // instance is stopped
+         if (volume.getInstanceId() != null && ApiDBUtils.findVMInstanceById(volume.getInstanceId()).getState() != State.Stopped) {
+             s_logger.debug("Invalid state of the volume with ID: " + volumeId
+                     + ". It should be either detached or the VM should be in stopped state.");
+             PermissionDeniedException ex = new PermissionDeniedException(
+                     "Invalid state of the volume with specified ID. It should be either detached or the VM should be in stopped state.");
+             ex.addProxyObject(volume.getUuid(), "volumeId");
+             throw ex;
+         }
+ 
+         if (volume.getVolumeType() != Volume.Type.DATADISK) {
+             // Datadisk dont have any template dependence.
+ 
+             VMTemplateVO template = ApiDBUtils.findTemplateById(volume.getTemplateId());
+             if (template != null) { // For ISO based volumes template = null and
+                 // we allow extraction of all ISO based
+                 // volumes
+                 boolean isExtractable = template.isExtractable() && template.getTemplateType() != Storage.TemplateType.SYSTEM;
+                 if (!isExtractable && account != null && account.getType() != Account.ACCOUNT_TYPE_ADMIN) {
+                     // Global admins are always allowed to extract
+                     PermissionDeniedException ex = new PermissionDeniedException("The volume with specified volumeId is not allowed to be extracted");
+                     ex.addProxyObject(volume.getUuid(), "volumeId");
+                     throw ex;
+                 }
+             }
+         }
+ 
+         Upload.Mode extractMode;
+         if (mode == null || (!mode.equals(Upload.Mode.FTP_UPLOAD.toString()) && !mode.equals(Upload.Mode.HTTP_DOWNLOAD.toString()))) {
+             throw new InvalidParameterValueException("Please specify a valid extract Mode ");
+         } else {
+             extractMode = mode.equals(Upload.Mode.FTP_UPLOAD.toString()) ? Upload.Mode.FTP_UPLOAD : Upload.Mode.HTTP_DOWNLOAD;
+         }
+ 
+         // Clean up code to remove all those previous uploadVO and uploadMonitor code. Previous code is trying to fake an async operation purely in
+         // db table with uploadVO and async_job entry, but internal implementation is actually synchronous.
 -        StoragePool srcPool = (StoragePool) this.dataStoreMgr.getPrimaryDataStore(volume.getPoolId());
 -        ImageStoreEntity secStore = (ImageStoreEntity) this.dataStoreMgr.getImageStore(zoneId);
++        StoragePool srcPool = (StoragePool) dataStoreMgr.getPrimaryDataStore(volume.getPoolId());
++        ImageStoreEntity secStore = (ImageStoreEntity) dataStoreMgr.getImageStore(zoneId);
+         String secondaryStorageURL = secStore.getUri();
+ 
 -        String value = this._configDao.getValue(Config.CopyVolumeWait.toString());
++        String value = _configDao.getValue(Config.CopyVolumeWait.toString());
+         int copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
+         // Copy volume from primary to secondary storage
 -        VolumeInfo srcVol = this.volFactory.getVolume(volume.getId());
 -        AsyncCallFuture<VolumeApiResult> cvAnswer = this.volService.copyVolume(srcVol, secStore);
++        VolumeInfo srcVol = volFactory.getVolume(volume.getId());
++        AsyncCallFuture<VolumeApiResult> cvAnswer = volService.copyVolume(srcVol, secStore);
+         // Check if you got a valid answer.
+         VolumeApiResult cvResult = null;
+         try {
+             cvResult = cvAnswer.get();
+         } catch (InterruptedException e1) {
+             s_logger.debug("failed copy volume", e1);
+             throw new CloudRuntimeException("Failed to copy volume", e1);
+         } catch (ExecutionException e1) {
+             s_logger.debug("failed copy volume", e1);
+             throw new CloudRuntimeException("Failed to copy volume", e1);
+         }
+         if (cvResult == null || cvResult.isFailed()) {
+             String errorString = "Failed to copy the volume from the source primary storage pool to secondary storage.";
+             throw new CloudRuntimeException(errorString);
+         }
+ 
+         VolumeInfo vol = cvResult.getVolume();
+         String volumeLocalPath = vol.getPath();
+         String volumeName = StringUtils.substringBeforeLast(StringUtils.substringAfterLast(volumeLocalPath, "/"), ".");
+         // volss, handle the ova special case;
+         if (getFormatForPool(srcPool) == "ova") {
+             // TODO: need to handle this for S3 as secondary storage
+             CreateVolumeOVACommand cvOVACmd = new CreateVolumeOVACommand(secondaryStorageURL, volumeLocalPath, volumeName, srcPool, copyvolumewait);
+             CreateVolumeOVAAnswer OVAanswer = null;
+ 
+             try {
+                 cvOVACmd.setContextParam("hypervisor", HypervisorType.VMware.toString());
+                 // for extract volume, create the ova file here;
+                 OVAanswer = (CreateVolumeOVAAnswer) storageMgr.sendToPool(srcPool, cvOVACmd);
+             } catch (StorageUnavailableException e) {
+                 s_logger.debug("Storage unavailable");
+             }
+         }
+         return secStore.createEntityExtractUrl(vol.getPath(), vol.getFormat());
+     }
+ 
+     private String getFormatForPool(StoragePool pool) {
+         ClusterVO cluster = ApiDBUtils.findClusterById(pool.getClusterId());
+ 
+         if (cluster.getHypervisorType() == HypervisorType.XenServer) {
+             return "vhd";
+         } else if (cluster.getHypervisorType() == HypervisorType.KVM) {
+             return "qcow2";
+         } else if (cluster.getHypervisorType() == HypervisorType.VMware) {
+             return "ova";
+         } else if (cluster.getHypervisorType() == HypervisorType.Ovm) {
+             return "raw";
+         } else {
+             return null;
+         }
+     }
+ 
+     @Override
      public String getVmNameFromVolumeId(long volumeId) {
          Long instanceId;
          VolumeVO volume = _volsDao.findById(volumeId);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7b7db056/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
index d4e360a,954c7e9..59aa6b3
--- a/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
+++ b/server/src/com/cloud/storage/secondary/SecondaryStorageManagerImpl.java
@@@ -29,9 -29,12 +29,14 @@@ import javax.ejb.Local
  import javax.inject.Inject;
  import javax.naming.ConfigurationException;
  
 +import org.apache.log4j.Logger;
 +
 +import org.apache.cloudstack.context.CallContext;
+ import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+ import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+ import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
+ import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
+ import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
 -import org.apache.log4j.Logger;
  
  import com.cloud.agent.AgentManager;
  import com.cloud.agent.api.Answer;
@@@ -47,7 -50,9 +52,7 @@@ import com.cloud.agent.api.StartupSecon
  import com.cloud.agent.api.StopAnswer;
  import com.cloud.agent.api.check.CheckSshAnswer;
  import com.cloud.agent.api.check.CheckSshCommand;
+ import com.cloud.agent.api.to.NfsTO;
 -import com.cloud.agent.api.to.NicTO;
 -import com.cloud.agent.api.to.VirtualMachineTO;
  import com.cloud.agent.manager.Commands;
  import com.cloud.capacity.dao.CapacityDao;
  import com.cloud.cluster.ClusterManager;
@@@ -108,9 -112,8 +107,9 @@@ import com.cloud.utils.DateUtil
  import com.cloud.utils.NumbersUtil;
  import com.cloud.utils.Pair;
  import com.cloud.utils.component.ManagerBase;
++import com.cloud.utils.db.EntityManager;
  import com.cloud.utils.db.GlobalLock;
  import com.cloud.utils.db.SearchCriteria.Op;
- import com.cloud.utils.db.EntityManager;
  import com.cloud.utils.db.SearchCriteria2;
  import com.cloud.utils.db.SearchCriteriaService;
  import com.cloud.utils.events.SubscriptionMgr;
@@@ -227,9 -223,15 +219,15 @@@ public class SecondaryStorageManagerImp
      protected IPAddressDao _ipAddressDao = null;
      @Inject
      protected RulesManager _rulesMgr;
+     @Inject
+     TemplateManager templateMgr;
 -
 +    
      @Inject
      KeystoreManager _keystoreMgr;
+     @Inject
+     DataStoreManager _dataStoreMgr;
+     @Inject
+     ImageStoreDao _imageStoreDao;
      private long _capacityScanInterval = DEFAULT_CAPACITY_SCAN_INTERVAL;
      private int _secStorageVmMtuSize;
  
@@@ -285,24 -292,30 +283,30 @@@
                  return false;
              }
  
-             List<HostVO> ssHosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(zoneId);
-             for( HostVO ssHost : ssHosts ) {
-                 String secUrl = ssHost.getStorageUrl();
 -            List<DataStore> ssStores = this._dataStoreMgr.getImageStoresByScope(new ZoneScope(zoneId));
++            List<DataStore> ssStores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(zoneId));
+              for( DataStore ssStore : ssStores ) {
+                  if (!(ssStore.getTO() instanceof NfsTO ))
+                      continue; // only do this for Nfs
+                 String secUrl = ssStore.getUri();
                  SecStorageSetupCommand setupCmd = null;
                  if (!_useSSlCopy) {
-                 	setupCmd = new SecStorageSetupCommand(secUrl, null);
+                 	setupCmd = new SecStorageSetupCommand(ssStore.getTO(), secUrl, null);
                  } else {
                  	Certificates certs = _keystoreMgr.getCertificates(ConsoleProxyManager.CERTIFICATE_NAME);
-                 	setupCmd = new SecStorageSetupCommand(secUrl, certs);
+                 	setupCmd = new SecStorageSetupCommand(ssStore.getTO(), secUrl, certs);
                  }
 -
 +                
                  Answer answer = _agentMgr.easySend(ssHostId, setupCmd);
                  if (answer != null && answer.getResult()) {
                      SecStorageSetupAnswer an = (SecStorageSetupAnswer) answer;
-                     ssHost.setParent(an.get_dir());
-                     _hostDao.update(ssHost.getId(), ssHost);
+                     if (an.get_dir() != null){
+                         // update the parent path in image_store table for this image store
 -                        ImageStoreVO svo = this._imageStoreDao.findById(ssStore.getId());
++                        ImageStoreVO svo = _imageStoreDao.findById(ssStore.getId());
+                         svo.setParent(an.get_dir());
+                         _imageStoreDao.update(ssStore.getId(), svo);
+                     }
                      if (s_logger.isDebugEnabled()) {
-                         s_logger.debug("Successfully programmed secondary storage " + ssHost.getName() + " in secondary storage VM " + secStorageVm.getInstanceName());
+                         s_logger.debug("Successfully programmed secondary storage " + ssStore.getName() + " in secondary storage VM " + secStorageVm.getInstanceName());
                      }
                  } else {
                      if (s_logger.isDebugEnabled()) {
@@@ -330,31 -345,13 +336,13 @@@
                  }
              }
          }
+         */
          return true;
      }
 -
 -
 -
 -
 +    
 +    
-     @Override
-     public boolean deleteHost(Long hostId) {
-         List<SnapshotVO> snapshots = _snapshotDao.listByHostId(hostId);
-         if( snapshots != null && !snapshots.isEmpty()) {
-             throw new CloudRuntimeException("Can not delete this secondary storage since it contains atleast one or more snapshots ");
-         }
-         if (!_swiftMgr.isSwiftEnabled()) {
-             List<Long> list = _templateDao.listPrivateTemplatesByHost(hostId);
-             if (list != null && !list.isEmpty()) {
-                 throw new CloudRuntimeException("Can not delete this secondary storage since it contains private templates ");
-             }
-         }
-         _vmTemplateHostDao.deleteByHost(hostId);
-         HostVO host = _hostDao.findById(hostId);
-         host.setGuid(null);
-         _hostDao.update(hostId, host);
-         _hostDao.remove(hostId);
 +        
-         return true;
-     }
 +    
      @Override
      public boolean generateVMSetupCommand(Long ssAHostId) {
          HostVO ssAHost = _hostDao.findById(ssAHostId);
@@@ -515,8 -512,8 +503,8 @@@
      }
  
      protected Map<String, Object> createSecStorageVmInstance(long dataCenterId, SecondaryStorageVm.Role role) {
-         HostVO secHost = findSecondaryStorageHost(dataCenterId);
-         if (secHost == null) {
 -        DataStore secStore = this._dataStoreMgr.getImageStore(dataCenterId);
++        DataStore secStore = _dataStoreMgr.getImageStore(dataCenterId);
+         if (secStore == null) {
              String msg = "No secondary storage available in zone " + dataCenterId + ", cannot create secondary storage vm";
              s_logger.warn(msg);
              throw new CloudRuntimeException(msg);
@@@ -729,33 -719,30 +714,30 @@@
                  return false;
              }
  
-             boolean templateReady = false;
-             if (template != null) {
-                 VMTemplateHostVO templateHostRef = _vmTemplateHostDao.findByHostTemplate(secHost.getId(), template.getId());
-                 templateReady = (templateHostRef != null) && (templateHostRef.getDownloadState() == Status.DOWNLOADED);
 -            List<DataStore> stores = this._dataStoreMgr.getImageStoresByScope(new ZoneScope(dataCenterId));
++            List<DataStore> stores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(dataCenterId));
+             if (stores.size() < 1) {
+                 s_logger.debug("No image store added  in zone " + dataCenterId + ", wait until it is ready to launch secondary storage vm");
+                 return false;
              }
  
-             if (templateReady) {
+             DataStore store = templateMgr.getImageStore(dataCenterId, template.getId());
+             if (store == null) {
+                 if (s_logger.isDebugEnabled()) {
+                     s_logger.debug("No secondary storage available in zone " + dataCenterId + ", wait until it is ready to launch secondary storage vm");
+                 }
+                 return false;
+             }
  
 -            List<Pair<Long, Integer>> l = _storagePoolHostDao.getDatacenterStoragePoolHostInfo(dataCenterId, !_useLocalStorage);
 -            if (l != null && l.size() > 0 && l.get(0).second().intValue() > 0) {
 -                return true;
 -            } else {
 -                if (s_logger.isDebugEnabled()) {
 -                    s_logger.debug("Primary storage is not ready, wait until it is ready to launch secondary storage vm. dcId: " + dataCenterId + " system.vm.use.local.storage: " + _useLocalStorage +
 -                            "If you want to use local storage to start ssvm, need to set system.vm.use.local.storage to true");
 +                List<Pair<Long, Integer>> l = _storagePoolHostDao.getDatacenterStoragePoolHostInfo(dataCenterId, !_useLocalStorage);
 +                if (l != null && l.size() > 0 && l.get(0).second().intValue() > 0) {
- 
 +                    return true;
 +                } else {
 +                    if (s_logger.isDebugEnabled()) {
 +                        s_logger.debug("Primary storage is not ready, wait until it is ready to launch secondary storage vm. dcId: " + dataCenterId + " system.vm.use.local.storage: " + _useLocalStorage +
 +                        		"If you want to use local storage to start ssvm, need to set system.vm.use.local.storage to true");
 +                    }
                  }
-             } else {
-                 if (s_logger.isDebugEnabled()) {
-                     if (template == null) {
-                         s_logger.debug("Zone host is ready, but secondary storage vm template does not exist");
-                     } else {
-                         s_logger.debug("Zone host is ready, but secondary storage vm template: " + template.getId() + " is not ready on secondary storage: " + secHost.getId());
-                     }
-                 }
--            }
+ 
          }
          return false;
      }
@@@ -1011,31 -1006,33 +993,31 @@@
          return "secStorageVm." + id;
      }
  
 -    @Override
 -    public SecondaryStorageVmVO findByName(String name) {
 -        if (!VirtualMachineName.isValidSecStorageVmName(name, null)) {
 -            return null;
 -        }
 -        return findById(VirtualMachineName.getSystemVmId(name));
 -    }
 -
 -    @Override
 -    public SecondaryStorageVmVO findById(long id) {
 -        return _secStorageVmDao.findById(id);
 -    }
 -
 -    @Override
 -    public SecondaryStorageVmVO persist(SecondaryStorageVmVO vm) {
 -        return _secStorageVmDao.persist(vm);
 -    }
 +//    @Override
 +//    public SecondaryStorageVmVO findByName(String name) {
 +//        if (!VirtualMachineName.isValidSecStorageVmName(name, null)) {
 +//            return null;
 +//        }
 +//        return findById(VirtualMachineName.getSystemVmId(name));
 +//    }
 +
 +//    @Override
 +//    public SecondaryStorageVmVO findById(long id) {
 +//        return _secStorageVmDao.findById(id);
 +//    }
 +//
 +//    @Override
 +//    public SecondaryStorageVmVO persist(SecondaryStorageVmVO vm) {
 +//        return _secStorageVmDao.persist(vm);
 +//    }
  
      @Override
 -    public boolean finalizeVirtualMachineProfile(VirtualMachineProfile<SecondaryStorageVmVO> profile, DeployDestination dest, ReservationContext context) {
 -
 -    	SecondaryStorageVmVO vm = profile.getVirtualMachine();
 -        Map<String, String> details = _vmDetailsDao.findDetails(vm.getId());
 -        vm.setDetails(details);
 +    public boolean finalizeVirtualMachineProfile(VirtualMachineProfile profile, DeployDestination dest, ReservationContext context) {
  
 -        DataStore secStore = this._dataStoreMgr.getImageStore(dest.getDataCenter().getId());
 +        SecondaryStorageVmVO ssVm = _secStorageVmDao.findById(profile.getId());
 +    	
-         HostVO secHost = _ssvmMgr.findSecondaryStorageHost(dest.getDataCenter().getId());
-         assert (secHost != null);
++        DataStore secStore = _dataStoreMgr.getImageStore(dest.getDataCenter().getId());
+         assert (secStore != null);
  
          StringBuilder buf = profile.getBootArgsBuilder();
          buf.append(" template=domP type=secstorage");
@@@ -1281,9 -1277,9 +1263,9 @@@
          List<SecondaryStorageVmVO> ssVms = _secStorageVmDao.getSecStorageVmListInStates(SecondaryStorageVm.Role.templateProcessor, dataCenterId, State.Running, State.Migrating,
                  State.Starting,  State.Stopped, State.Stopping );
          int vmSize = (ssVms == null)? 0 : ssVms.size();
-         List<HostVO> ssHosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(dataCenterId);
-         int hostSize = (ssHosts == null)? 0 : ssHosts.size();
-         if ( hostSize > vmSize ) {
 -        List<DataStore> ssStores = this._dataStoreMgr.getImageStoresByScope(new ZoneScope(dataCenterId));
++        List<DataStore> ssStores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(dataCenterId));
+         int storeSize = (ssStores == null)? 0 : ssStores.size();
+         if ( storeSize > vmSize ) {
              s_logger.info("No secondary storage vms found in datacenter id=" + dataCenterId + ", starting a new one");
              return new Pair<AfterScanAction, Object>(AfterScanAction.expand, SecondaryStorageVm.Role.templateProcessor);
          }
@@@ -1366,57 -1364,11 +1350,11 @@@
  
  	@Override
      public DeleteHostAnswer deleteHost(HostVO host, boolean isForced, boolean isForceDeleteStorage) throws UnableDeleteHostException {
-         if (host.getType() == Host.Type.SecondaryStorage) {
-             deleteHost(host.getId());
-             return new DeleteHostAnswer(false);
-         }
-         return null;
-     }
- 
- 	@Override
-     public HostVO findSecondaryStorageHost(long dcId) {
- 		SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
- 	    sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.SecondaryStorage);
- 	    sc.addAnd(sc.getEntity().getDataCenterId(), Op.EQ, dcId);
- 	    List<HostVO> storageHosts = sc.list();
- 	    if (storageHosts == null || storageHosts.size() < 1) {
+ 	    // Since secondary storage is moved out of host table, this class should not handle delete secondary storage anymore.
 -        return null;
 +            return null;
-         } else {
-             Collections.shuffle(storageHosts);
-             return storageHosts.get(0);
-         }
-     }
- 
- 	@Override
-     public List<HostVO> listSecondaryStorageHostsInAllZones() {
- 		SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
- 	    sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.SecondaryStorage);
- 	    return sc.list();
      }
  
- 	@Override
-     public List<HostVO> listSecondaryStorageHostsInOneZone(long dataCenterId) {
- 		SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
- 		sc.addAnd(sc.getEntity().getDataCenterId(), Op.EQ, dataCenterId);
- 		sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.SecondaryStorage);
- 	    return sc.list();
-     }
  
- 	@Override
-     public List<HostVO> listLocalSecondaryStorageHostsInOneZone(long dataCenterId) {
- 		SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
- 		sc.addAnd(sc.getEntity().getDataCenterId(), Op.EQ, dataCenterId);
- 		sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.LocalSecondaryStorage);
- 	    return sc.list();
-     }
- 
- 	@Override
-     public List<HostVO> listAllTypesSecondaryStorageHostsInOneZone(long dataCenterId) {
- 		SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
- 		sc.addAnd(sc.getEntity().getDataCenterId(), Op.EQ, dataCenterId);
- 		sc.addAnd(sc.getEntity().getType(), Op.IN, Host.Type.LocalSecondaryStorage, Host.Type.SecondaryStorage);
- 	    return sc.list();
-     }
  
  	@Override
      public List<HostVO> listUpAndConnectingSecondaryStorageVmHost(Long dcId) {
@@@ -1444,49 -1396,26 +1382,50 @@@
          }
          return null;
      }
 -
 -
 -    @Override
 -    public boolean plugNic(Network network, NicTO nic, VirtualMachineTO vm,
 -            ReservationContext context, DeployDestination dest) throws ConcurrentOperationException, ResourceUnavailableException,
 -            InsufficientCapacityException {
 -        //not supported
 -        throw new UnsupportedOperationException("Plug nic is not supported for vm of type " + vm.getType());
 -    }
 -
 -
 -    @Override
 -    public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm,
 -            ReservationContext context, DeployDestination dest) throws ConcurrentOperationException, ResourceUnavailableException {
 -        //not supported
 -        throw new UnsupportedOperationException("Unplug nic is not supported for vm of type " + vm.getType());
 -    }
 +	
+ 
  	@Override
 -	public void prepareStop(VirtualMachineProfile<SecondaryStorageVmVO> profile) {
 -
 +    public void prepareStop(VirtualMachineProfile profile) {
  	}
 +	
 +//    @Override
 +//    public void vmWorkStart(VmWork work) {
 +//    	assert(work instanceof VmWorkStart);
 +//
 +//        SecondaryStorageVmVO vm = _secStorageVmDao.findById(work.getVmId());
 +//
 +//    	UserVO user = _entityMgr.findById(UserVO.class, work.getUserId());
 +//    	AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId());
 +//
 +//    	try {
 +//	    	_itMgr.processVmStartWork(vm, ((VmWorkStart)work).getParams(),
 +//	    		user, account,  ((VmWorkStart)work).getPlan());
 +//
 +//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 +//    	} catch(Exception e) {
 +//    		s_logger.error("Exception in process VM-start work", e);
 +//    		String result = SerializerHelper.toObjectSerializedString(e);
 +//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
 +//    	}
 +//    }
 +//
 +//    @Override
 +//    public void vmWorkStop(VmWork work) {
 +//    	assert(work instanceof VmWorkStop);
 +//
 +//        SecondaryStorageVmVO vm = _secStorageVmDao.findById(work.getVmId());
 +//
 +//    	UserVO user = _entityMgr.findById(UserVO.class, work.getUserId());
 +//    	AccountVO account = _entityMgr.findById(AccountVO.class, work.getAccountId());
 +//
 +//    	try {
 +//	    	_itMgr.processVmStopWork(vm, ((VmWorkStop)work).isForceStop(), user, account);
 +//
 +//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.SUCCEEDED, null);
 +//    	} catch(Exception e) {
 +//    		s_logger.error("Exception in process VM-stop work", e);
 +//    		String result = SerializerHelper.toObjectSerializedString(e);
 +//    		AsyncJobExecutionContext.getCurrentExecutionContext().completeJobAndJoin(JobInfo.Status.FAILED, result);
 +//    	}
 +//    }
  }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/7b7db056/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
----------------------------------------------------------------------
diff --cc server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
index 0c78c4c,ebe06e7..a1871c0
--- a/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
+++ b/server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
@@@ -27,21 -26,26 +26,28 @@@ import javax.ejb.Local
  import javax.inject.Inject;
  import javax.naming.ConfigurationException;
  
++import org.apache.log4j.Logger;
++import org.springframework.stereotype.Component;
++
  import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotPolicyCmd;
  import org.apache.cloudstack.api.command.user.snapshot.DeleteSnapshotPoliciesCmd;
  import org.apache.cloudstack.api.command.user.snapshot.ListSnapshotPoliciesCmd;
  import org.apache.cloudstack.api.command.user.snapshot.ListSnapshotsCmd;
 +import org.apache.cloudstack.context.CallContext;
+ import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
  import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+ import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
+ import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
  import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
  import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+ import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
  import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotStrategy;
  import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
  import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+ import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
  import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
- import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
- import org.apache.log4j.Logger;
- import org.springframework.stereotype.Component;
+ import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+ import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 -import org.apache.log4j.Logger;
 -import org.springframework.stereotype.Component;
  
  import com.cloud.agent.AgentManager;
  import com.cloud.agent.api.Answer;
@@@ -58,7 -57,7 +59,6 @@@ import com.cloud.configuration.Config
  import com.cloud.configuration.Resource.ResourceType;
  import com.cloud.configuration.dao.ConfigurationDao;
  import com.cloud.dc.ClusterVO;
- import com.cloud.dc.DataCenter;
 -import com.cloud.dc.DataCenterVO;
  import com.cloud.dc.dao.ClusterDao;
  import com.cloud.dc.dao.DataCenterDao;
  import com.cloud.domain.dao.DomainDao;
@@@ -75,11 -74,13 +75,13 @@@ import com.cloud.exception.StorageUnava
  import com.cloud.host.HostVO;
  import com.cloud.host.dao.HostDao;
  import com.cloud.hypervisor.Hypervisor.HypervisorType;
- import com.cloud.org.Grouping;
  import com.cloud.projects.Project.ListProjectResourcesCriteria;
+ import com.cloud.resource.ResourceManager;
  import com.cloud.server.ResourceTag.TaggedResourceType;
+ import com.cloud.storage.CreateSnapshotPayload;
++import com.cloud.storage.DataStoreRole;
  import com.cloud.storage.Snapshot;
  import com.cloud.storage.Snapshot.Type;
 -import com.cloud.storage.DataStoreRole;
  import com.cloud.storage.SnapshotPolicyVO;
  import com.cloud.storage.SnapshotScheduleVO;
  import com.cloud.storage.SnapshotVO;
@@@ -187,11 -196,15 +196,15 @@@ public class SnapshotManagerImpl extend
      @Inject TemplateManager templateMgr;
      @Inject VolumeManager volumeMgr;
      @Inject DataStoreManager dataStoreMgr;
-     @Inject List<SnapshotStrategy> snapshotStrategies;
+     @Inject SnapshotService snapshotSrv;
      @Inject VolumeDataFactory volFactory;
      @Inject SnapshotDataFactory snapshotFactory;
+     @Inject EndPointSelector _epSelector;
+ 	@Inject
+ 	private ResourceManager _resourceMgr;
+ 	@Inject
+ 	protected List<SnapshotStrategy> snapshotStrategies;
 -
 +    
  
      private int _totalRetries;
      private int _pauseInterval;
@@@ -258,7 -271,7 +271,7 @@@
      @DB
      @ActionEvent(eventType = EventTypes.EVENT_SNAPSHOT_CREATE, eventDescription = "creating snapshot", async = true)
      public Snapshot createSnapshot(Long volumeId, Long policyId, Long snapshotId, Account snapshotOwner) {
--        VolumeInfo volume = this.volFactory.getVolume(volumeId);
++        VolumeInfo volume = volFactory.getVolume(volumeId);
          if (volume == null) {
          	throw new InvalidParameterValueException("No such volume exist");
          }
@@@ -266,40 -279,26 +279,26 @@@
          if (volume.getState() != Volume.State.Ready) {
          	throw new InvalidParameterValueException("Volume is not in ready state");
          }
 -
 -
 +        
-         SnapshotInfo snapshot = null;
 +     
          boolean backedUp = false;
          // does the caller have the authority to act on this volume
 -        _accountMgr.checkAccess(UserContext.current().getCaller(), null, true, volume);
 -
 -        SnapshotInfo snapshot = this.snapshotFactory.getSnapshot(snapshotId, DataStoreRole.Primary);
 +        _accountMgr.checkAccess(CallContext.current().getCallingAccount(), null, true, volume);
 +        
- 
-         SnapshotInfo snap = this.snapshotFactory.getSnapshot(snapshotId);
-         SnapshotStrategy strategy = null;
-         for (SnapshotStrategy st : snapshotStrategies) {
-         	if (st.canHandle(snap)) {
-         		strategy = st;
-         		break;
-         	}
-         }
++        SnapshotInfo snapshot = snapshotFactory.getSnapshot(snapshotId, DataStoreRole.Primary);
  
          try {
-         	snapshot = strategy.takeSnapshot(volume, snapshotId);
-         	if (snapshot != null) {
 -        	postCreateSnapshot(volumeId, snapshot.getId(), policyId);
 -        	//Check if the snapshot was removed while backingUp. If yes, do not log snapshot create usage event
 -        	SnapshotVO freshSnapshot = _snapshotDao.findById(snapshot.getId());
 -        	if ((freshSnapshot != null) && backedUp) {
 -        		UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_CREATE, snapshot.getAccountId(),
 -        				snapshot.getDataCenterId(), snapshotId, snapshot.getName(), null, null,
 -        				volume.getSize(), snapshot.getClass().getName(), snapshot.getUuid());
 -        	}
 +        		postCreateSnapshot(volumeId, snapshot.getId(), policyId);
 +        		//Check if the snapshot was removed while backingUp. If yes, do not log snapshot create usage event
 +        		SnapshotVO freshSnapshot = _snapshotDao.findById(snapshot.getId());
 +        		if ((freshSnapshot != null) && backedUp) {
 +        			UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_CREATE, snapshot.getAccountId(),
 +        					snapshot.getDataCenterId(), snapshotId, snapshot.getName(), null, null,
 +        					volume.getSize(), snapshot.getClass().getName(), snapshot.getUuid());
 +        		}
  
 -        	_resourceLimitMgr.incrementResourceCount(snapshotOwner.getId(), ResourceType.snapshot);
 +                _resourceLimitMgr.incrementResourceCount(snapshotOwner.getId(), ResourceType.snapshot);
-         	}
-         	if (backup) {
-         		this.backupSnapshot(snapshotId);
-         	}
+ 
          } catch(Exception e) {
              s_logger.debug("Failed to create snapshot", e);
              if (backup) {
@@@ -325,42 -318,18 +318,18 @@@
  
      @Override
      public Snapshot backupSnapshot(Long snapshotId) {
-     	 SnapshotInfo snapshot = this.snapshotFactory.getSnapshot(snapshotId);
-     	 if (snapshot == null) {
-     		 throw new CloudRuntimeException("Can't find snapshot:" + snapshotId);
-     	 }
-     	 
-     	 if (snapshot.getState() == Snapshot.State.BackedUp) {
-     	     return snapshot;
-     	 }
-     	 
-     	 SnapshotStrategy strategy = null;
-          for (SnapshotStrategy st : snapshotStrategies) {
-          	if (st.canHandle(snapshot)) {
-          		strategy = st;
-          		break;
-          	}
 -    	 SnapshotInfo snapshot = this.snapshotFactory.getSnapshot(snapshotId, DataStoreRole.Image);
++    	 SnapshotInfo snapshot = snapshotFactory.getSnapshot(snapshotId, DataStoreRole.Image);
+     	 if (snapshot != null) {
+     		 throw new CloudRuntimeException("Already in the backup snapshot:" + snapshotId);
 -    	 }
 -
 -         return this.snapshotSrv.backupSnapshot(snapshot);
 +         }
 +         
-          return strategy.backupSnapshot(snapshot);
++         return snapshotSrv.backupSnapshot(snapshot);
      }
  
+     /*
      @Override
      public void downloadSnapshotsFromSwift(SnapshotVO ss) {
+ 
          long volumeId = ss.getVolumeId();
          VolumeVO volume = _volsDao.findById(volumeId);
          Long dcId = volume.getDataCenterId();
@@@ -443,11 -415,11 +415,11 @@@
                      e);
          }
  
-     }
+     }*/
 -
 +    
      @Override
-     public SnapshotVO getParentSnapshot(VolumeInfo volume, Snapshot snapshot) {
-     	 long preId = _snapshotDao.getLastSnapshot(volume.getId(), snapshot.getId());
+     public SnapshotVO getParentSnapshot(VolumeInfo volume) {
+     	 long preId = _snapshotDao.getLastSnapshot(volume.getId(), DataStoreRole.Primary);
  
           SnapshotVO preSnapshotVO = null;
           if (preId != 0 && !(volume.getLastPoolId() != null && !volume.getLastPoolId().equals(volume.getPoolId()))) {
@@@ -507,20 -479,19 +479,19 @@@
      @DB
      @ActionEvent(eventType = EventTypes.EVENT_SNAPSHOT_DELETE, eventDescription = "deleting snapshot", async = true)
      public boolean deleteSnapshot(long snapshotId) {
 -        Account caller = UserContext.current().getCaller();
 +        Account caller = CallContext.current().getCallingAccount();
  
          // Verify parameters
-         SnapshotInfo snapshotCheck = this.snapshotFactory.getSnapshot(snapshotId);
 -        SnapshotVO snapshotCheck = this._snapshotDao.findById(snapshotId);
++        SnapshotVO snapshotCheck = _snapshotDao.findById(snapshotId);
          if (snapshotCheck == null) {
              throw new InvalidParameterValueException("unable to find a snapshot with id " + snapshotId);
          }
 -
 +        
          _accountMgr.checkAccess(caller, null, true, snapshotCheck);
-         
-         SnapshotStrategy strategy = null;
-         for (SnapshotStrategy st : snapshotStrategies) {
-         	if (st.canHandle(snapshotCheck)) {
-         		strategy = st;
+         SnapshotStrategy snapshotStrategy = null;
 -        for (SnapshotStrategy strategy : this.snapshotStrategies) {
++        for (SnapshotStrategy strategy : snapshotStrategies) {
+         	if (strategy.canHandle(snapshotCheck)) {
+         		snapshotStrategy = strategy;
          		break;
          	}
          }
@@@ -546,21 -517,14 +517,14 @@@
  
      @Override
      public String getSecondaryStorageURL(SnapshotVO snapshot) {
-         HostVO secHost = getSecondaryStorageHost(snapshot);
-         if (secHost != null) {
-             return secHost.getStorageUrl();
 -        SnapshotDataStoreVO snapshotStore = this._snapshotStoreDao.findBySnapshot(snapshot.getId(), DataStoreRole.Image);
++        SnapshotDataStoreVO snapshotStore = _snapshotStoreDao.findBySnapshot(snapshot.getId(), DataStoreRole.Image);
+         if (snapshotStore != null){
 -            DataStore store = this.dataStoreMgr.getDataStore(snapshotStore.getDataStoreId(), DataStoreRole.Image);
++            DataStore store = dataStoreMgr.getDataStore(snapshotStore.getDataStoreId(), DataStoreRole.Image);
+             if ( store != null ){
+                 return store.getUri();
 -            }
 +        }
-         throw new CloudRuntimeException("Can not find secondary storage");
+         }
+         throw new CloudRuntimeException("Can not find secondary storage hosting the snapshot");
      }
  
      @Override
@@@ -589,8 -552,8 +553,8 @@@
         _accountMgr.buildACLSearchParameters(caller, id, cmd.getAccountName(), cmd.getProjectId(), permittedAccounts, domainIdRecursiveListProject, cmd.listAll(), false);
         Long domainId = domainIdRecursiveListProject.first();
         Boolean isRecursive = domainIdRecursiveListProject.second();
-        ListProjectResourcesCriteria listProjectResourcesCriteria = domainIdRecursiveListProject.third();        
+        ListProjectResourcesCriteria listProjectResourcesCriteria = domainIdRecursiveListProject.third();
 -
 +        
          Filter searchFilter = new Filter(SnapshotVO.class, "created", false, cmd.getStartIndex(), cmd.getPageSizeVal());
          SearchBuilder<SnapshotVO> sb = _snapshotDao.createSearchBuilder();
          _accountMgr.buildACLSearchBuilder(sb, domainId, isRecursive, permittedAccounts, listProjectResourcesCriteria);
@@@ -651,7 -616,7 +617,7 @@@
          }
  
          if (snapshotTypeStr != null) {
--            Type snapshotType = SnapshotVO.getSnapshotType((String) snapshotTypeStr);
++            Type snapshotType = SnapshotVO.getSnapshotType(snapshotTypeStr);
              if (snapshotType == null) {
                  throw new InvalidParameterValueException("Unsupported snapshot type " + snapshotTypeStr);
              }
@@@ -661,7 -626,7 +627,7 @@@
                  sc.setParameters("snapshotTypeEQ", snapshotType.ordinal());
              }
          } else if (intervalTypeStr != null && volumeId != null) {
--            Type type = SnapshotVO.getSnapshotType((String) intervalTypeStr);
++            Type type = SnapshotVO.getSnapshotType(intervalTypeStr);
              if (type == null) {
                  throw new InvalidParameterValueException("Unsupported snapstho interval type " + intervalTypeStr);
              }
@@@ -694,42 -659,12 +660,12 @@@
                  // This volume doesn't have any snapshots. Nothing do delete.
                  continue;
              }
-             List<HostVO> ssHosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(dcId);
-             SwiftTO swift = _swiftMgr.getSwiftTO();
-             S3TO s3 = _s3Mgr.getS3TO();
- 
-             checkObjectStorageConfiguration(swift, s3);
-             StoragePoolVO pool = _primaryDataStoreDao.findById(volume.getPoolId());
-             if (swift == null && s3 == null) {
-                 for (HostVO ssHost : ssHosts) {
-                     DeleteSnapshotBackupCommand cmd = new DeleteSnapshotBackupCommand(
-                             pool,null, null, ssHost.getStorageUrl(), dcId,
-                             accountId, volumeId, "", true);
-                     Answer answer = null;
-                     try {
-                         answer = _agentMgr.sendToSSVM(dcId, cmd);
-                     } catch (Exception e) {
-                         s_logger.warn("Failed to delete all snapshot for volume " + volumeId + " on secondary storage " + ssHost.getStorageUrl());
-                     }
-                     if ((answer != null) && answer.getResult()) {
-                         s_logger.debug("Deleted all snapshots for volume: " + volumeId + " under account: " + accountId);
-                     } else {
-                         success = false;
-                         if (answer != null) {
-                             s_logger.error(answer.getDetails());
-                         }
-                     }
-                 }
-             } else {
-                 DeleteSnapshotBackupCommand cmd = new DeleteSnapshotBackupCommand(
-                         pool,swift, s3, "", dcId, accountId, volumeId, "", true);
-                 Answer answer = null;
-                 try {
-                     answer = _agentMgr.sendToSSVM(dcId, cmd);
-                 } catch (Exception e) {
-                     final String storeType = s3 != null ? "S3" : "swift";
-                     s_logger.warn("Failed to delete all snapshot for volume " + volumeId + " on " + storeType);
-                 }
 -            List<DataStore> ssHosts = this.dataStoreMgr.getImageStoresByScope(new ZoneScope(dcId));
++            List<DataStore> ssHosts = dataStoreMgr.getImageStoresByScope(new ZoneScope(dcId));
+             for (DataStore ssHost : ssHosts) {
+                 String snapshotDir = TemplateConstants.DEFAULT_SNAPSHOT_ROOT_DIR + "/" + accountId + "/" + volumeId;
+                 DeleteSnapshotsDirCommand cmd = new DeleteSnapshotsDirCommand(ssHost.getTO(), snapshotDir);
+                 EndPoint ep = _epSelector.select(ssHost);
+                 Answer answer = ep.sendMessage(cmd);
                  if ((answer != null) && answer.getResult()) {
                      s_logger.debug("Deleted all snapshots for volume: " + volumeId + " under account: " + accountId);
                  } else {
@@@ -963,97 -899,121 +900,119 @@@
          return null;
      }
  
-     @Override
-     public SnapshotVO allocSnapshot(Long volumeId, Long policyId) throws ResourceAllocationException {
-         Account caller = CallContext.current().getCallingAccount();
-         
-         VolumeVO volume = _volsDao.findById(volumeId);
-         if (volume == null) {
-             throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
-         }
-         DataCenter zone = _dcDao.findById(volume.getDataCenterId());
-         if (zone == null) {
-             throw new InvalidParameterValueException("Can't find zone by id " + volume.getDataCenterId());
 -
 -
+     private boolean hostSupportSnapsthot(HostVO host) {
+ 		if (host.getHypervisorType() != HypervisorType.KVM) {
+ 			return true;
 -		}
 +        }
+ 		// Determine host capabilities
+ 		String caps = host.getCapabilities();
 -
 +        
-         if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(caller.getType())) {
-             throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zone.getName());
+ 		if (caps != null) {
+ 			String[] tokens = caps.split(",");
+ 			for (String token : tokens) {
+ 				if (token.contains("snapshot")) {
+ 					return true;
+ 				}
+ 			}
+ 		}
+ 		return false;
 -	}
 -
 +        }
 +        
-         if (volume.getState() != Volume.State.Ready) {
-             throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
+     private boolean supportedByHypervisor(VolumeInfo volume) {
+     	StoragePool storagePool = (StoragePool)volume.getDataStore();
+         ClusterVO cluster = _clusterDao.findById(storagePool.getClusterId());
+         if (cluster != null && cluster.getHypervisorType() == HypervisorType.Ovm) {
+             throw new InvalidParameterValueException("Ovm won't support taking snapshot");
          }
  
-         if ( volume.getTemplateId() != null ) {
-             VMTemplateVO  template = _templateDao.findById(volume.getTemplateId());
-             if( template != null && template.getTemplateType() == Storage.TemplateType.SYSTEM ) {
-                 throw new InvalidParameterValueException("VolumeId: " + volumeId + " is for System VM , Creating snapshot against System VM volumes is not supported");
+ 		if (volume.getHypervisorType().equals(HypervisorType.KVM)) {
+ 			List<HostVO> hosts = _resourceMgr.listAllHostsInCluster(cluster.getId());
+ 			if (hosts != null && !hosts.isEmpty()) {
+ 				HostVO host = hosts.get(0);
+ 				if (!hostSupportSnapsthot(host)) {
+ 					throw new CloudRuntimeException("KVM Snapshot is not supported on cluster: " + host.getId());
+ 				}
 -			}
 -		}
 -
 +            }
 +        }
 +        
-         StoragePoolVO storagePoolVO = _storagePoolDao.findById(volume.getPoolId());
-         if (storagePoolVO == null) {
-             throw new InvalidParameterValueException("VolumeId: " + volumeId + " please attach this volume to a VM before create snapshot for it");
-         }
- 
-         ClusterVO cluster = _clusterDao.findById(storagePoolVO.getClusterId());
-         if (cluster != null && cluster.getHypervisorType() == HypervisorType.Ovm) {
-             throw new InvalidParameterValueException("Ovm won't support taking snapshot");
-         }
- 
-         // Verify permissions
-         _accountMgr.checkAccess(caller, null, true, volume);
-         Type snapshotType = getSnapshotType(policyId);
-         Account owner = _accountMgr.getAccount(volume.getAccountId());
+ 		// if volume is attached to a vm in destroyed or expunging state; disallow
+ 		if (volume.getInstanceId() != null) {
+ 			UserVmVO userVm = _vmDao.findById(volume.getInstanceId());
+ 			if (userVm != null) {
+                 if (userVm.getState().equals(State.Destroyed) || userVm.getState().equals(State.Expunging)) {
+                     throw new CloudRuntimeException("Creating snapshot failed due to volume:" + volume.getId()
+                             + " is associated with vm:" + userVm.getInstanceName() + " is in "
+                             + userVm.getState().toString() + " state");
 -                }
++        }
+ 
+                 if (userVm.getHypervisorType() == HypervisorType.VMware
+                         || userVm.getHypervisorType() == HypervisorType.KVM) {
+                     List<SnapshotVO> activeSnapshots = _snapshotDao.listByInstanceId(volume.getInstanceId(),
+                             Snapshot.State.Creating, Snapshot.State.CreatedOnPrimary, Snapshot.State.BackingUp);
+                     if (activeSnapshots.size() > 1)
+                         throw new CloudRuntimeException(
+                                 "There is other active snapshot tasks on the instance to which the volume is attached, please try again later");
 -                }
++        }
+ 
+                 List<VMSnapshotVO> activeVMSnapshots = _vmSnapshotDao.listByInstanceId(userVm.getId(),
+                         VMSnapshot.State.Creating, VMSnapshot.State.Reverting, VMSnapshot.State.Expunging);
+                 if (activeVMSnapshots.size() > 0) {
+                     throw new CloudRuntimeException(
+                             "There is other active vm snapshot tasks on the instance to which the volume is attached, please try again later");
+                 }
+ 			}
+ 		}
  
-         try{
-             _resourceLimitMgr.checkResourceLimit(owner, ResourceType.snapshot);
-             if (backup) {
-                 _resourceLimitMgr.checkResourceLimit(owner, ResourceType.secondary_storage, new Long(volume.getSize()));
-             } else {
-                 _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, new Long(volume.getSize()));
+ 		return true;
 -	}
 +            }
-         } catch (ResourceAllocationException e) {
-             if (snapshotType != Type.MANUAL){
-                 String msg = "Snapshot resource limit exceeded for account id : " + owner.getId() + ". Failed to create recurring snapshots";
-                 s_logger.warn(msg);
-                 _alertMgr.sendAlert(AlertManager.ALERT_TYPE_UPDATE_RESOURCE_COUNT, 0L, 0L, msg,
-                         "Snapshot resource limit exceeded for account id : " + owner.getId() + ". Failed to create recurring snapshots; please use updateResourceLimit to increase the limit");
+     @Override
+     public SnapshotInfo takeSnapshot(VolumeInfo volume) throws ResourceAllocationException {
+         CreateSnapshotPayload payload = (CreateSnapshotPayload)volume.getpayload();
+         Long snapshotId = payload.getSnapshotId();
+         Account snapshotOwner = payload.getAccount();
 -        SnapshotInfo snapshot = this.snapshotFactory.getSnapshot(snapshotId, volume.getDataStore());
++        SnapshotInfo snapshot = snapshotFactory.getSnapshot(snapshotId, volume.getDataStore());
+         boolean processed = false;
+ 
+         try {
+             for (SnapshotStrategy strategy : snapshotStrategies) {
+                 if (strategy.canHandle(snapshot)) {
+                     processed = true;
+                     snapshot = strategy.takeSnapshot(snapshot);
+                     break;
 -                }
              }
-             throw e;
 +        }
+             if (!processed) {
+                 throw new CloudRuntimeException("Can't find snapshot strategy to deal with snapshot:" + snapshotId);
+             }
+             postCreateSnapshot(volume.getId(), snapshotId, payload.getSnapshotPolicyId());
  
-         // Determine the name for this snapshot
-         // Snapshot Name: VMInstancename + volumeName + timeString
-         String timeString = DateUtil.getDateDisplayString(DateUtil.GMT_TIMEZONE, new Date(), DateUtil.YYYYMMDD_FORMAT);
+             UsageEventUtils.publishUsageEvent(EventTypes.EVENT_SNAPSHOT_CREATE, snapshot.getAccountId(),
+                     snapshot.getDataCenterId(), snapshotId, snapshot.getName(), null, null,
+                     volume.getSize(), snapshot.getClass().getName(), snapshot.getUuid());
  
-         String snapshotName = volume.getUuid() + "_" + timeString;
  
-         // Create the Snapshot object and save it so we can return it to the
-         // user        
-         HypervisorType hypervisorType = this._volsDao.getHypervisorType(volumeId);
-         SnapshotVO snapshotVO = new SnapshotVO(volume.getDataCenterId(), volume.getAccountId(), volume.getDomainId(), volume.getId(), volume.getDiskOfferingId(), null, snapshotName,
-                 (short) snapshotType.ordinal(), snapshotType.name(), volume.getSize(), hypervisorType);
-         SnapshotVO snapshot = _snapshotDao.persist(snapshotVO);
-         if (snapshot == null) {
-             throw new CloudRuntimeException("Failed to create snapshot for volume: "+volumeId);
-         }
+             _resourceLimitMgr.incrementResourceCount(snapshotOwner.getId(), ResourceType.snapshot);
+ 
+         } catch(Exception e) {
+             s_logger.debug("Failed to create snapshot", e);
 -            if (backup) {
 +        if (backup) {
-             _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.secondary_storage,
+                 _resourceLimitMgr.decrementResourceCount(snapshotOwner.getId(), ResourceType.secondary_storage,
 -                        new Long(volume.getSize()));
 -            } else {
 +                    new Long(volume.getSize()));
 +        } else {
-             _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.primary_storage,
+                 _resourceLimitMgr.decrementResourceCount(snapshotOwner.getId(), ResourceType.primary_storage,
 -                        new Long(volume.getSize()));
 -            }
 +                    new Long(volume.getSize()));
 +        }
+             throw new CloudRuntimeException("Failed to create snapshot", e);
+         }
          return snapshot;
      }
  
      @Override
      public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 -
 + 
          String value = _configDao.getValue(Config.BackupSnapshotWait.toString());
          _backupsnapshotwait = NumbersUtil.parseInt(value, Integer.parseInt(Config.BackupSnapshotWait.getDefaultValue()));
--        backup = Boolean.parseBoolean(this._configDao.getValue(Config.BackupSnapshotAferTakingSnapshot.toString()));
++        backup = Boolean.parseBoolean(_configDao.getValue(Config.BackupSnapshotAferTakingSnapshot.toString()));
  
          Type.HOURLY.setMax(NumbersUtil.parseInt(_configDao.getValue("snapshot.max.hourly"), HOURLYMAX));
          Type.DAILY.setMax(NumbersUtil.parseInt(_configDao.getValue("snapshot.max.daily"), DAILYMAX));
@@@ -1134,4 -1094,61 +1093,61 @@@
      	}
      	return true;
      }
+ 
+     @Override
+     public Snapshot allocSnapshot(Long volumeId, Long policyId) throws ResourceAllocationException {
 -        Account caller = UserContext.current().getCaller();
 -        VolumeInfo volume = this.volFactory.getVolume(volumeId);
++        Account caller = CallContext.current().getCallingAccount();
++        VolumeInfo volume = volFactory.getVolume(volumeId);
+         supportedByHypervisor(volume);
+ 
+         // Verify permissions
+         _accountMgr.checkAccess(caller, null, true, volume);
+         Type snapshotType = getSnapshotType(policyId);
+         Account owner = _accountMgr.getAccount(volume.getAccountId());
+ 
+         try{
+             _resourceLimitMgr.checkResourceLimit(owner, ResourceType.snapshot);
+             if (backup) {
+                 _resourceLimitMgr.checkResourceLimit(owner, ResourceType.secondary_storage, new Long(volume.getSize()));
+             } else {
+                 _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, new Long(volume.getSize()));
+             }
+         } catch (ResourceAllocationException e) {
+             if (snapshotType != Type.MANUAL){
+                 String msg = "Snapshot resource limit exceeded for account id : " + owner.getId() + ". Failed to create recurring snapshots";
+                 s_logger.warn(msg);
+                 _alertMgr.sendAlert(AlertManager.ALERT_TYPE_UPDATE_RESOURCE_COUNT, 0L, 0L, msg,
+                         "Snapshot resource limit exceeded for account id : " + owner.getId() + ". Failed to create recurring snapshots; please use updateResourceLimit to increase the limit");
+             }
+             throw e;
+         }
+ 
+         // Determine the name for this snapshot
+         // Snapshot Name: VMInstancename + volumeName + timeString
+         String timeString = DateUtil.getDateDisplayString(DateUtil.GMT_TIMEZONE, new Date(), DateUtil.YYYYMMDD_FORMAT);
+ 
+         VMInstanceVO vmInstance = _vmDao.findById(volume.getInstanceId());
+         String vmDisplayName = "detached";
+         if (vmInstance != null) {
+             vmDisplayName = vmInstance.getHostName();
+         }
+         String snapshotName = vmDisplayName + "_" + volume.getName() + "_" + timeString;
+ 
+         HypervisorType hypervisorType = volume.getHypervisorType();
+         SnapshotVO snapshotVO = new SnapshotVO(volume.getDataCenterId(), volume.getAccountId(), volume.getDomainId(), volume.getId(), volume.getDiskOfferingId(), snapshotName,
+                 (short) snapshotType.ordinal(), snapshotType.name(), volume.getSize(), hypervisorType);
+ 
+         SnapshotVO snapshot = _snapshotDao.persist(snapshotVO);
+         if (snapshot == null) {
+             throw new CloudRuntimeException("Failed to create snapshot for volume: " + volume.getId());
+         }
+         if (backup) {
+             _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.secondary_storage,
+                     new Long(volume.getSize()));
+         } else {
+             _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.primary_storage,
+                     new Long(volume.getSize()));
+         }
+         return snapshot;
+     }
  }