You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by bf...@apache.org on 2013/02/21 22:45:20 UTC

[28/51] [abbrv] squash changes into one giant patch

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/a22403ed/server/src/com/cloud/storage/StorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/StorageManagerImpl.java b/server/src/com/cloud/storage/StorageManagerImpl.java
index 05e0cfe..f2d92e5 100755
--- a/server/src/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/com/cloud/storage/StorageManagerImpl.java
@@ -17,8 +17,6 @@
 package com.cloud.storage;
 
 import java.math.BigDecimal;
-import java.net.Inet6Address;
-import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
@@ -27,15 +25,13 @@ import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -48,17 +44,41 @@ import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaint
 import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeletePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
-import org.apache.cloudstack.api.command.user.volume.CreateVolumeCmd;
-import org.apache.cloudstack.api.command.user.volume.UploadVolumeCmd;
-import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreLifeCycle;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProvider;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProviderManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreRole;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreStatus;
+import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.HypervisorHostListener;
+import org.apache.cloudstack.engine.subsystem.api.storage.ImageDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.ScopeType;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
+import org.apache.cloudstack.framework.async.AsyncCallFuture;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
 import com.cloud.agent.AgentManager;
-import com.cloud.agent.api.*;
-import com.cloud.agent.api.storage.*;
-import com.cloud.agent.api.to.StorageFilerTO;
-import com.cloud.agent.api.to.VolumeTO;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.BackupSnapshotCommand;
+import com.cloud.agent.api.CleanupSnapshotBackupCommand;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.ManageSnapshotCommand;
+import com.cloud.agent.api.StoragePoolInfo;
+import com.cloud.agent.api.storage.DeleteTemplateCommand;
+import com.cloud.agent.api.storage.DeleteVolumeCommand;
+
 import com.cloud.agent.manager.Commands;
 import com.cloud.alert.AlertManager;
 import com.cloud.api.ApiDBUtils;
@@ -72,46 +92,61 @@ import com.cloud.cluster.ClusterManagerListener;
 import com.cloud.cluster.ManagementServerHostVO;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.ConfigurationManager;
-import com.cloud.configuration.Resource.ResourceType;
 import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.consoleproxy.ConsoleProxyManager;
 import com.cloud.dc.ClusterVO;
 import com.cloud.dc.DataCenterVO;
 import com.cloud.dc.HostPodVO;
-import com.cloud.dc.Pod;
 import com.cloud.dc.dao.ClusterDao;
 import com.cloud.dc.dao.DataCenterDao;
 import com.cloud.dc.dao.HostPodDao;
-import com.cloud.deploy.DeployDestination;
-import com.cloud.domain.Domain;
 import com.cloud.domain.dao.DomainDao;
-import com.cloud.event.ActionEvent;
-import com.cloud.event.EventTypes;
-import com.cloud.event.UsageEventUtils;
+
 import com.cloud.event.dao.EventDao;
-import com.cloud.exception.*;
+import com.cloud.event.dao.UsageEventDao;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.ConnectionException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.exception.PermissionDeniedException;
+import com.cloud.exception.ResourceInUseException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.exception.StorageUnavailableException;
+
 import com.cloud.host.Host;
 import com.cloud.host.HostVO;
 import com.cloud.host.Status;
 import com.cloud.host.dao.HostDao;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.hypervisor.HypervisorGuruManager;
+import com.cloud.hypervisor.dao.HypervisorCapabilitiesDao;
 import com.cloud.network.NetworkModel;
-import com.cloud.offering.ServiceOffering;
 import com.cloud.org.Grouping;
 import com.cloud.org.Grouping.AllocationState;
 import com.cloud.resource.ResourceManager;
 import com.cloud.resource.ResourceState;
 import com.cloud.server.ManagementServer;
 import com.cloud.server.StatsCollector;
-import com.cloud.service.ServiceOfferingVO;
 import com.cloud.service.dao.ServiceOfferingDao;
 import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.Storage.StoragePoolType;
-import com.cloud.storage.Volume.Event;
 import com.cloud.storage.Volume.Type;
 import com.cloud.storage.allocator.StoragePoolAllocator;
-import com.cloud.storage.dao.*;
+
+import com.cloud.storage.dao.DiskOfferingDao;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.SnapshotPolicyDao;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.storage.dao.StoragePoolWorkDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.storage.dao.VMTemplateHostDao;
+import com.cloud.storage.dao.VMTemplatePoolDao;
+import com.cloud.storage.dao.VMTemplateS3Dao;
+import com.cloud.storage.dao.VMTemplateSwiftDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.dao.VolumeHostDao;
+
 import com.cloud.storage.download.DownloadMonitor;
 import com.cloud.storage.listener.StoragePoolMonitor;
 import com.cloud.storage.listener.VolumeStateListener;
@@ -124,23 +159,24 @@ import com.cloud.template.TemplateManager;
 import com.cloud.user.*;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.user.dao.UserDao;
-import com.cloud.uservm.UserVm;
-import com.cloud.utils.EnumUtils;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
 import com.cloud.utils.UriUtils;
 import com.cloud.utils.component.ComponentContext;
-import com.cloud.utils.component.Manager;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.db.*;
 import com.cloud.utils.db.JoinBuilder.JoinType;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.utils.exception.ExecutionException;
-import com.cloud.utils.fsm.NoTransitionException;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.vm.*;
+
+import com.cloud.vm.DiskProfile;
+import com.cloud.vm.UserVmManager;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachineManager;
+import com.cloud.vm.VirtualMachineProfile;
+import com.cloud.vm.VirtualMachineProfileImpl;
+
 import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.*;
 
@@ -173,6 +209,8 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     @Inject
     protected NetworkModel _networkMgr;
     @Inject
+    protected ServiceOfferingDao _serviceOfferingDao;
+    @Inject
     protected VolumeDao _volsDao;
     @Inject
     protected HostDao _hostDao;
@@ -209,7 +247,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     @Inject
     protected VMInstanceDao _vmInstanceDao;
     @Inject
-    protected StoragePoolDao _storagePoolDao = null;
+    protected PrimaryDataStoreDao _storagePoolDao = null;
     @Inject
     protected CapacityDao _capacityDao;
     @Inject
@@ -262,14 +300,30 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     protected ResourceTagDao _resourceTagDao;
     @Inject
     protected List<StoragePoolAllocator> _storagePoolAllocators;
-    @Inject ConfigurationDao _configDao;
-    @Inject ManagementServer _msServer;
+    @Inject
+    ConfigurationDao _configDao;
+    @Inject
+    ManagementServer _msServer;
+    @Inject
+    DataStoreManager dataStoreMgr;
+    @Inject
+    DataStoreProviderManager dataStoreProviderMgr;
+    @Inject
+    VolumeService volService;
+    @Inject
+    VolumeDataFactory volFactory;
+    @Inject
+    ImageDataFactory tmplFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    protected HypervisorCapabilitiesDao _hypervisorCapabilitiesDao;
 
-    // TODO : we don't have any instantiated pool discover, disable injection temporarily
+    // TODO : we don't have any instantiated pool discover, disable injection
+    // temporarily
     // @Inject
     protected List<StoragePoolDiscoverer> _discoverers;
 
-
     protected SearchBuilder<VMTemplateHostVO> HostTemplateStatesSearch;
     protected GenericSearchBuilder<StoragePoolHostVO, Long> UpHostsInPoolSearch;
     protected SearchBuilder<VMInstanceVO> StoragePoolSearch;
@@ -288,32 +342,39 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     protected BigDecimal _overProvisioningFactor = new BigDecimal(1);
     private long _maxVolumeSizeInGb;
     private long _serverId;
-    private final StateMachine2<Volume.State, Volume.Event, Volume> _volStateMachine;
+
     private int _customDiskOfferingMinSize = 1;
     private int _customDiskOfferingMaxSize = 1024;
     private double _storageUsedThreshold = 1.0d;
     private double _storageAllocatedThreshold = 1.0d;
     protected BigDecimal _storageOverprovisioningFactor = new BigDecimal(1);
+    private Map<String, HypervisorHostListener> hostListeners = new HashMap<String, HypervisorHostListener>();
 
     private boolean _recreateSystemVmEnabled;
 
-    public boolean share(VMInstanceVO vm, List<VolumeVO> vols, HostVO host, boolean cancelPreviousShare) throws StorageUnavailableException {
+    public boolean share(VMInstanceVO vm, List<VolumeVO> vols, HostVO host,
+            boolean cancelPreviousShare) throws StorageUnavailableException {
 
         // if pool is in maintenance and it is the ONLY pool available; reject
-        List<VolumeVO> rootVolForGivenVm = _volsDao.findByInstanceAndType(vm.getId(), Type.ROOT);
+        List<VolumeVO> rootVolForGivenVm = _volsDao.findByInstanceAndType(
+                vm.getId(), Type.ROOT);
         if (rootVolForGivenVm != null && rootVolForGivenVm.size() > 0) {
-            boolean isPoolAvailable = isPoolAvailable(rootVolForGivenVm.get(0).getPoolId());
+            boolean isPoolAvailable = isPoolAvailable(rootVolForGivenVm.get(0)
+                    .getPoolId());
             if (!isPoolAvailable) {
-                throw new StorageUnavailableException("Can not share " + vm, rootVolForGivenVm.get(0).getPoolId());
+                throw new StorageUnavailableException("Can not share " + vm,
+                        rootVolForGivenVm.get(0).getPoolId());
             }
         }
 
         // this check is done for maintenance mode for primary storage
         // if any one of the volume is unusable, we return false
-        // if we return false, the allocator will try to switch to another PS if available
+        // if we return false, the allocator will try to switch to another PS if
+        // available
         for (VolumeVO vol : vols) {
             if (vol.getRemoved() != null) {
-                s_logger.warn("Volume id:" + vol.getId() + " is removed, cannot share on this instance");
+                s_logger.warn("Volume id:" + vol.getId()
+                        + " is removed, cannot share on this instance");
                 // not ok to share
                 return false;
             }
@@ -323,26 +384,15 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         return true;
     }
 
-    @Override
-    public VolumeVO allocateDuplicateVolume(VolumeVO oldVol, Long templateId) {
-        VolumeVO newVol = new VolumeVO(oldVol.getVolumeType(), oldVol.getName(), oldVol.getDataCenterId(), oldVol.getDomainId(), oldVol.getAccountId(), oldVol.getDiskOfferingId(), oldVol.getSize());
-        if (templateId != null) {
-            newVol.setTemplateId(templateId);
-        } else {
-            newVol.setTemplateId(oldVol.getTemplateId());
-        }
-        newVol.setDeviceId(oldVol.getDeviceId());
-        newVol.setInstanceId(oldVol.getInstanceId());
-        newVol.setRecreatable(oldVol.isRecreatable());
-        return _volsDao.persist(newVol);
-    }
-
     private boolean isPoolAvailable(Long poolId) {
         // get list of all pools
         List<StoragePoolVO> pools = _storagePoolDao.listAll();
 
         // if no pools or 1 pool which is in maintenance
-        if (pools == null || pools.size() == 0 || (pools.size() == 1 && pools.get(0).getStatus().equals(StoragePoolStatus.Maintenance))) {
+        if (pools == null
+                || pools.size() == 0
+                || (pools.size() == 1 && pools.get(0).getStatus()
+                        .equals(DataStoreStatus.Maintenance))) {
             return false;
         } else {
             return true;
@@ -350,8 +400,10 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     }
 
     @Override
-    public List<StoragePoolVO> ListByDataCenterHypervisor(long datacenterId, HypervisorType type) {
-        List<StoragePoolVO> pools = _storagePoolDao.listByDataCenterId(datacenterId);
+    public List<StoragePoolVO> ListByDataCenterHypervisor(
+            long datacenterId, HypervisorType type) {
+        List<StoragePoolVO> pools = _storagePoolDao
+                .listByDataCenterId(datacenterId);
         List<StoragePoolVO> retPools = new ArrayList<StoragePoolVO>();
         for (StoragePoolVO pool : pools) {
             if (pool.getStatus() != StoragePoolStatus.Up) {
@@ -368,21 +420,33 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
 
     @Override
     public boolean isLocalStorageActiveOnHost(Long hostId) {
-        List<StoragePoolHostVO> storagePoolHostRefs = _storagePoolHostDao.listByHostId(hostId);
+        List<StoragePoolHostVO> storagePoolHostRefs = _storagePoolHostDao
+                .listByHostId(hostId);
         for (StoragePoolHostVO storagePoolHostRef : storagePoolHostRefs) {
-            StoragePoolVO storagePool = _storagePoolDao.findById(storagePoolHostRef.getPoolId());
-            if (storagePool.getPoolType() == StoragePoolType.LVM || storagePool.getPoolType() == StoragePoolType.EXT) {
-                SearchBuilder<VolumeVO> volumeSB = _volsDao.createSearchBuilder();
-                volumeSB.and("poolId", volumeSB.entity().getPoolId(), SearchCriteria.Op.EQ);
-                volumeSB.and("removed", volumeSB.entity().getRemoved(), SearchCriteria.Op.NULL);
-
-                SearchBuilder<VMInstanceVO> activeVmSB = _vmInstanceDao.createSearchBuilder();
-                activeVmSB.and("state", activeVmSB.entity().getState(), SearchCriteria.Op.IN);
-                volumeSB.join("activeVmSB", activeVmSB, volumeSB.entity().getInstanceId(), activeVmSB.entity().getId(), JoinBuilder.JoinType.INNER);
+            StoragePoolVO PrimaryDataStoreVO = _storagePoolDao
+                    .findById(storagePoolHostRef.getPoolId());
+            if (PrimaryDataStoreVO.getPoolType() == StoragePoolType.LVM
+                    || PrimaryDataStoreVO.getPoolType() == StoragePoolType.EXT) {
+                SearchBuilder<VolumeVO> volumeSB = _volsDao
+                        .createSearchBuilder();
+                volumeSB.and("poolId", volumeSB.entity().getPoolId(),
+                        SearchCriteria.Op.EQ);
+                volumeSB.and("removed", volumeSB.entity().getRemoved(),
+                        SearchCriteria.Op.NULL);
+
+                SearchBuilder<VMInstanceVO> activeVmSB = _vmInstanceDao
+                        .createSearchBuilder();
+                activeVmSB.and("state", activeVmSB.entity().getState(),
+                        SearchCriteria.Op.IN);
+                volumeSB.join("activeVmSB", activeVmSB, volumeSB.entity()
+                        .getInstanceId(), activeVmSB.entity().getId(),
+                        JoinBuilder.JoinType.INNER);
 
                 SearchCriteria<VolumeVO> volumeSC = volumeSB.create();
-                volumeSC.setParameters("poolId", storagePool.getId());
-                volumeSC.setJoinParameters("activeVmSB", "state", State.Starting, State.Running, State.Stopping, State.Migrating);
+                volumeSC.setParameters("poolId", PrimaryDataStoreVO.getId());
+                volumeSC.setJoinParameters("activeVmSB", "state",
+                        State.Starting, State.Running, State.Stopping,
+                        State.Migrating);
 
                 List<VolumeVO> volumes = _volsDao.search(volumeSC, null);
                 if (volumes.size() > 0) {
@@ -394,26 +458,35 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         return false;
     }
 
-    protected StoragePoolVO findStoragePool(DiskProfile dskCh, final DataCenterVO dc, HostPodVO pod, Long clusterId, Long hostId, VMInstanceVO vm, final Set<StoragePool> avoid) {
+    @Override
+    public StoragePool findStoragePool(DiskProfile dskCh,
+            final DataCenterVO dc, HostPodVO pod, Long clusterId, Long hostId,
+            VMInstanceVO vm, final Set<StoragePool> avoid) {
 
-        VirtualMachineProfile<VMInstanceVO> profile = new VirtualMachineProfileImpl<VMInstanceVO>(vm);
+        VirtualMachineProfile<VMInstanceVO> profile = new VirtualMachineProfileImpl<VMInstanceVO>(
+                vm);
         for (StoragePoolAllocator allocator : _storagePoolAllocators) {
-            final List<StoragePool> poolList = allocator.allocateToPool(dskCh, profile, dc.getId(), pod.getId(), clusterId, hostId, avoid, 1);
+            final List<StoragePool> poolList = allocator.allocateToPool(
+                    dskCh, profile, dc.getId(), pod.getId(), clusterId, hostId,
+                    avoid, 1);
             if (poolList != null && !poolList.isEmpty()) {
-                return (StoragePoolVO) poolList.get(0);
+                return (StoragePool)this.dataStoreMgr.getDataStore(poolList.get(0).getId(), DataStoreRole.Primary);
             }
         }
         return null;
     }
 
     @Override
-    public Answer[] sendToPool(StoragePool pool, Commands cmds) throws StorageUnavailableException {
+    public Answer[] sendToPool(StoragePool pool, Commands cmds)
+            throws StorageUnavailableException {
         return sendToPool(pool, null, null, cmds).second();
     }
 
     @Override
-    public Answer sendToPool(StoragePool pool, long[] hostIdsToTryFirst, Command cmd) throws StorageUnavailableException {
-        Answer[] answers = sendToPool(pool, hostIdsToTryFirst, null, new Commands(cmd)).second();
+    public Answer sendToPool(StoragePool pool, long[] hostIdsToTryFirst,
+            Command cmd) throws StorageUnavailableException {
+        Answer[] answers = sendToPool(pool, hostIdsToTryFirst, null,
+                new Commands(cmd)).second();
         if (answers == null) {
             return null;
         }
@@ -421,7 +494,8 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     }
 
     @Override
-    public Answer sendToPool(StoragePool pool, Command cmd) throws StorageUnavailableException {
+    public Answer sendToPool(StoragePool pool, Command cmd)
+            throws StorageUnavailableException {
         Answer[] answers = sendToPool(pool, new Commands(cmd));
         if (answers == null) {
             return null;
@@ -429,439 +503,27 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         return answers[0];
     }
 
-    @Override
-    public Answer sendToPool(long poolId, Command cmd) throws StorageUnavailableException {
-        StoragePool pool = _storagePoolDao.findById(poolId);
-        return sendToPool(pool, cmd);
-    }
-
-    @Override
-    public Answer[] sendToPool(long poolId, Commands cmds) throws StorageUnavailableException {
-        StoragePool pool = _storagePoolDao.findById(poolId);
-        return sendToPool(pool, cmds);
-    }
-
-    protected DiskProfile createDiskCharacteristics(VolumeVO volume, VMTemplateVO template, DataCenterVO dc, DiskOfferingVO diskOffering) {
-        if (volume.getVolumeType() == Type.ROOT && Storage.ImageFormat.ISO != template.getFormat()) {
-            SearchCriteria<VMTemplateHostVO> sc = HostTemplateStatesSearch.create();
-            sc.setParameters("id", template.getId());
-            sc.setParameters("state", com.cloud.storage.VMTemplateStorageResourceAssoc.Status.DOWNLOADED);
-            sc.setJoinParameters("host", "dcId", dc.getId());
-
-            List<VMTemplateHostVO> sss = _vmTemplateHostDao.search(sc, null);
-            if (sss.size() == 0) {
-                throw new CloudRuntimeException("Template " + template.getName() + " has not been completely downloaded to zone " + dc.getId());
-            }
-            VMTemplateHostVO ss = sss.get(0);
-
-            return new DiskProfile(volume.getId(), volume.getVolumeType(), volume.getName(), diskOffering.getId(), ss.getSize(), diskOffering.getTagsArray(), diskOffering.getUseLocalStorage(),
-                    diskOffering.isRecreatable(), Storage.ImageFormat.ISO != template.getFormat() ? template.getId() : null);
-        } else {
-            return new DiskProfile(volume.getId(), volume.getVolumeType(), volume.getName(), diskOffering.getId(), diskOffering.getDiskSize(), diskOffering.getTagsArray(),
-                    diskOffering.getUseLocalStorage(), diskOffering.isRecreatable(), null);
-        }
-    }
-
-    @Override
-    public boolean canVmRestartOnAnotherServer(long vmId) {
-        List<VolumeVO> vols = _volsDao.findCreatedByInstance(vmId);
-        for (VolumeVO vol : vols) {
-            if (!vol.isRecreatable() && !vol.getPoolType().isShared()) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @DB
-    protected Pair<VolumeVO, String> createVolumeFromSnapshot(VolumeVO volume, SnapshotVO snapshot) {
-        VolumeVO createdVolume = null;
-        Long volumeId = volume.getId();
-
-        String volumeFolder = null;
-
-        try {
-            stateTransitTo(volume, Volume.Event.CreateRequested);
-        } catch (NoTransitionException e) {
-            s_logger.debug(e.toString());
-            return null;
-        }
-        // Create the Volume object and save it so that we can return it to the user
-        Account account = _accountDao.findById(volume.getAccountId());
-
-        final HashSet<StoragePool> poolsToAvoid = new HashSet<StoragePool>();
-        StoragePoolVO pool = null;
-        boolean success = false;
-        Set<Long> podsToAvoid = new HashSet<Long>();
-        Pair<HostPodVO, Long> pod = null;
-        String volumeUUID = null;
-        String details = null;
-
-        DiskOfferingVO diskOffering = _diskOfferingDao.findByIdIncludingRemoved(volume.getDiskOfferingId());
-        DataCenterVO dc = _dcDao.findById(volume.getDataCenterId());
-        DiskProfile dskCh = new DiskProfile(volume, diskOffering, snapshot.getHypervisorType());
-
-        int retry = 0;
-        // Determine what pod to store the volume in
-        while ((pod = _resourceMgr.findPod(null, null, dc, account.getId(), podsToAvoid)) != null) {
-            podsToAvoid.add(pod.first().getId());
-            // Determine what storage pool to store the volume in
-            while ((pool = findStoragePool(dskCh, dc, pod.first(), null, null, null, poolsToAvoid)) != null) {
-                poolsToAvoid.add(pool);
-                volumeFolder = pool.getPath();
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Attempting to create volume from snapshotId: " + snapshot.getId() + " on storage pool " + pool.getName());
-                }
-
-                // Get the newly created VDI from the snapshot.
-                // This will return a null volumePath if it could not be created
-                Pair<String, String> volumeDetails = createVDIFromSnapshot(UserContext.current().getCallerUserId(), snapshot, pool);
-
-                volumeUUID = volumeDetails.first();
-                details = volumeDetails.second();
-
-                if (volumeUUID != null) {
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug("Volume with UUID " + volumeUUID + " was created on storage pool " + pool.getName());
-                    }
-                    success = true;
-                    break; // break out of the "find storage pool" loop
-                } else {
-                    retry++;
-                    if (retry >= 3) {
-                        _volsDao.expunge(volumeId);
-                        String msg = "Unable to create volume from snapshot " + snapshot.getId() + " after retrying 3 times, due to " + details;
-                        s_logger.debug(msg);
-                        throw new CloudRuntimeException(msg);
-
-                    }
-                }
-                s_logger.warn("Unable to create volume on pool " + pool.getName() + ", reason: " + details);
-            }
-
-            if (success) {
-                break; // break out of the "find pod" loop
-            }
-        }
-
-        if (!success) {
-            _volsDao.expunge(volumeId);
-            String msg = "Unable to create volume from snapshot " + snapshot.getId() + " due to " + details;
-            s_logger.debug(msg);
-            throw new CloudRuntimeException(msg);
-
-        }
-
-        createdVolume = _volsDao.findById(volumeId);
-
-        try {
-            if (success) {
-                createdVolume.setPodId(pod.first().getId());
-                createdVolume.setPoolId(pool.getId());
-                createdVolume.setPoolType(pool.getPoolType());
-                createdVolume.setFolder(volumeFolder);
-                createdVolume.setPath(volumeUUID);
-                createdVolume.setDomainId(account.getDomainId());
-                stateTransitTo(createdVolume, Volume.Event.OperationSucceeded);
-            }
-        } catch (NoTransitionException e) {
-            s_logger.debug("Failed to update volume state: " + e.toString());
-            return null;
-        }
-
-        return new Pair<VolumeVO, String>(createdVolume, details);
-    }
-
-    @Override
-    public boolean stateTransitTo(Volume vol, Volume.Event event) throws NoTransitionException {
-        return _volStateMachine.transitTo(vol, event, null, _volsDao);
-    }
-
-    protected VolumeVO createVolumeFromSnapshot(VolumeVO volume, long snapshotId) {
-
-        // By default, assume failure.
-        VolumeVO createdVolume = null;
-        SnapshotVO snapshot = _snapshotDao.findById(snapshotId); // Precondition: snapshot is not null and not removed.
-
-        Pair<VolumeVO, String> volumeDetails = createVolumeFromSnapshot(volume, snapshot);
-        if (volumeDetails != null) {
-            createdVolume = volumeDetails.first();
-            UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_CREATE, createdVolume.getAccountId(),
-                    createdVolume.getDataCenterId(), createdVolume.getId(), createdVolume.getName(), createdVolume.getDiskOfferingId(),
-                    null, createdVolume.getSize(), Volume.class.getName(), createdVolume.getUuid());
-        }
-        return createdVolume;
-    }
-
-    protected Pair<String, String> createVDIFromSnapshot(long userId, SnapshotVO snapshot, StoragePoolVO pool) {
-        String vdiUUID = null;
-        Long snapshotId = snapshot.getId();
-        Long volumeId = snapshot.getVolumeId();
-        Long dcId = snapshot.getDataCenterId();
-        String secondaryStoragePoolUrl = _snapMgr.getSecondaryStorageURL(snapshot);
-        long accountId = snapshot.getAccountId();
-
-        String backedUpSnapshotUuid = snapshot.getBackupSnapshotId();
-        snapshot = _snapshotDao.findById(snapshotId);
-        if (snapshot.getVersion().trim().equals("2.1")) {
-            VolumeVO volume = _volsDao.findByIdIncludingRemoved(volumeId);
-            if (volume == null) {
-                throw new CloudRuntimeException("failed to upgrade snapshot " + snapshotId + " due to unable to find orignal volume:" + volumeId + ", try it later ");
-            }
-            if (volume.getTemplateId() == null) {
-                _snapshotDao.updateSnapshotVersion(volumeId, "2.1", "2.2");
-            } else {
-                VMTemplateVO template = _templateDao.findByIdIncludingRemoved(volume.getTemplateId());
-                if (template == null) {
-                    throw new CloudRuntimeException("failed to upgrade snapshot " + snapshotId + " due to unalbe to find orignal template :" + volume.getTemplateId() + ", try it later ");
-                }
-                Long templateId = template.getId();
-                Long tmpltAccountId = template.getAccountId();
-                if (!_snapshotDao.lockInLockTable(snapshotId.toString(), 10)) {
-                    throw new CloudRuntimeException("failed to upgrade snapshot " + snapshotId + " due to this snapshot is being used, try it later ");
-                }
-                UpgradeSnapshotCommand cmd = new UpgradeSnapshotCommand(null, secondaryStoragePoolUrl, dcId, accountId, volumeId, templateId, tmpltAccountId, null, snapshot.getBackupSnapshotId(),
-                        snapshot.getName(), "2.1");
-                Answer answer = null;
-                try {
-                    answer = sendToPool(pool, cmd);
-                } catch (StorageUnavailableException e) {
-                } finally {
-                    _snapshotDao.unlockFromLockTable(snapshotId.toString());
-                }
-                if ((answer != null) && answer.getResult()) {
-                    _snapshotDao.updateSnapshotVersion(volumeId, "2.1", "2.2");
-                } else {
-                    return new Pair<String, String>(null, "Unable to upgrade snapshot from 2.1 to 2.2 for " + snapshot.getId());
-                }
-            }
-        }
-        String basicErrMsg = "Failed to create volume from " + snapshot.getName() + " on pool " + pool;
-        try {
-            if (snapshot.getSwiftId() != null && snapshot.getSwiftId() != 0) {
-                _snapshotMgr.downloadSnapshotsFromSwift(snapshot);
-            } else if (snapshot.getS3Id() != null && snapshot.getS3Id() != 0) {
-                _snapshotMgr.downloadSnapshotsFromS3(snapshot);
-            }
-            CreateVolumeFromSnapshotCommand createVolumeFromSnapshotCommand = new CreateVolumeFromSnapshotCommand(pool, secondaryStoragePoolUrl, dcId, accountId, volumeId,
-                    backedUpSnapshotUuid, snapshot.getName(), _createVolumeFromSnapshotWait);
-            CreateVolumeFromSnapshotAnswer answer;
-            if (!_snapshotDao.lockInLockTable(snapshotId.toString(), 10)) {
-                throw new CloudRuntimeException("failed to create volume from " + snapshotId + " due to this snapshot is being used, try it later ");
-            }
-            answer = (CreateVolumeFromSnapshotAnswer) sendToPool(pool, createVolumeFromSnapshotCommand);
-            if (answer != null && answer.getResult()) {
-                vdiUUID = answer.getVdi();
-            } else {
-                s_logger.error(basicErrMsg + " due to " + ((answer == null) ? "null" : answer.getDetails()));
-                throw new CloudRuntimeException(basicErrMsg);
-            }
-        } catch (StorageUnavailableException e) {
-            s_logger.error(basicErrMsg);
-        } finally {
-            if (snapshot.getSwiftId() != null) {
-                _snapshotMgr.deleteSnapshotsDirForVolume(secondaryStoragePoolUrl, dcId, accountId, volumeId);
-            }
-            _snapshotDao.unlockFromLockTable(snapshotId.toString());
-        }
-        return new Pair<String, String>(vdiUUID, basicErrMsg);
-    }
-
-
-    @Override
-    @DB
-    public VolumeVO copyVolumeFromSecToPrimary(VolumeVO volume, VMInstanceVO vm, VMTemplateVO template, DataCenterVO dc, HostPodVO pod, Long clusterId, ServiceOfferingVO offering, DiskOfferingVO diskOffering,
-            List<StoragePoolVO> avoids, long size, HypervisorType hyperType) throws NoTransitionException {
-
-        final HashSet<StoragePool> avoidPools = new HashSet<StoragePool>(avoids);
-        DiskProfile dskCh = createDiskCharacteristics(volume, template, dc, diskOffering);
-        dskCh.setHyperType(vm.getHypervisorType());
-        // Find a suitable storage to create volume on 
-        StoragePoolVO destPool = findStoragePool(dskCh, dc, pod, clusterId, null, vm, avoidPools);
-
-        // Copy the volume from secondary storage to the destination storage pool    	  	
-        stateTransitTo(volume, Event.CopyRequested);
-        VolumeHostVO volumeHostVO = _volumeHostDao.findByVolumeId(volume.getId());
-        HostVO secStorage = _hostDao.findById(volumeHostVO.getHostId());
-        String secondaryStorageURL = secStorage.getStorageUrl();
-        String[] volumePath = volumeHostVO.getInstallPath().split("/");
-        String volumeUUID = volumePath[volumePath.length - 1].split("\\.")[0];
-
-        CopyVolumeCommand cvCmd = new CopyVolumeCommand(volume.getId(), volumeUUID, destPool, secondaryStorageURL, false, _copyvolumewait);
-        CopyVolumeAnswer cvAnswer;
-        try {
-            cvAnswer = (CopyVolumeAnswer) sendToPool(destPool, cvCmd);
-        } catch (StorageUnavailableException e1) {
-            stateTransitTo(volume, Event.CopyFailed);
-            throw new CloudRuntimeException("Failed to copy the volume from secondary storage to the destination primary storage pool.");
-        }
-
-        if (cvAnswer == null || !cvAnswer.getResult()) {
-            stateTransitTo(volume, Event.CopyFailed);
-            throw new CloudRuntimeException("Failed to copy the volume from secondary storage to the destination primary storage pool.");
-        }        
-        Transaction txn = Transaction.currentTxn();
-        txn.start();        
-        volume.setPath(cvAnswer.getVolumePath());
-        volume.setFolder(destPool.getPath());
-        volume.setPodId(destPool.getPodId());
-        volume.setPoolId(destPool.getId());        
-        volume.setPodId(destPool.getPodId());
-        stateTransitTo(volume, Event.CopySucceeded); 
-        UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_CREATE, volume.getAccountId(),
-                volume.getDataCenterId(), volume.getId(), volume.getName(), volume.getDiskOfferingId(),
-                null, volume.getSize(), Volume.class.getName(), volume.getUuid());
-        _volumeHostDao.remove(volumeHostVO.getId());
-        txn.commit();
-        return volume;
-
-    }
-
-    @Override
-    @DB
-    public VolumeVO createVolume(VolumeVO volume, VMInstanceVO vm, VMTemplateVO template, DataCenterVO dc, HostPodVO pod, Long clusterId, ServiceOfferingVO offering, DiskOfferingVO diskOffering,
-            List<StoragePoolVO> avoids, long size, HypervisorType hyperType) {
-        StoragePoolVO pool = null;
-        final HashSet<StoragePool> avoidPools = new HashSet<StoragePool>(avoids);
-
-        try {
-            stateTransitTo(volume, Volume.Event.CreateRequested);
-        } catch (NoTransitionException e) {
-            s_logger.debug("Unable to update volume state: " + e.toString());
-            return null;
-        }
-
-        if (diskOffering != null && diskOffering.isCustomized()) {
-            diskOffering.setDiskSize(size);
-        }
-        DiskProfile dskCh = null;
-        if (volume.getVolumeType() == Type.ROOT && Storage.ImageFormat.ISO != template.getFormat()) {
-            dskCh = createDiskCharacteristics(volume, template, dc, offering);
-        } else {
-            dskCh = createDiskCharacteristics(volume, template, dc, diskOffering);
-        }
-
-        dskCh.setHyperType(hyperType);
-
-        VolumeTO created = null;
-        int retry = _retry;
-        while (--retry >= 0) {
-            created = null;
-
-            long podId = pod.getId();
-            pod = _podDao.findById(podId);
-            if (pod == null) {
-                s_logger.warn("Unable to find pod " + podId + " when create volume " + volume.getName());
-                break;
-            }
-
-            pool = findStoragePool(dskCh, dc, pod, clusterId, vm.getHostId(), vm, avoidPools);
-            if (pool == null) {
-                s_logger.warn("Unable to find storage poll when create volume " + volume.getName());
-                break;
-            }
-
-            avoidPools.add(pool);
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Trying to create " + volume + " on " + pool);
-            }
-
-            CreateCommand cmd = null;
-            VMTemplateStoragePoolVO tmpltStoredOn = null;
-
-            for (int i = 0; i < 2; i++) {
-                if (volume.getVolumeType() == Type.ROOT && Storage.ImageFormat.ISO != template.getFormat()) {
-                    if (pool.getPoolType() == StoragePoolType.CLVM) {
-                        //prepareISOForCreate does what we need, which is to tell us where the template is
-                        VMTemplateHostVO tmpltHostOn = _tmpltMgr.prepareISOForCreate(template, pool);
-                        if (tmpltHostOn == null) {
-                            continue;
-                        }
-                        HostVO secondaryStorageHost = _hostDao.findById(tmpltHostOn.getHostId());
-                        String tmpltHostUrl = secondaryStorageHost.getStorageUrl();
-                        String fullTmpltUrl = tmpltHostUrl + "/" + tmpltHostOn.getInstallPath();
-                        cmd = new CreateCommand(dskCh, fullTmpltUrl, new StorageFilerTO(pool));
-                    } else {
-                        tmpltStoredOn = _tmpltMgr.prepareTemplateForCreate(template, pool);
-                        if (tmpltStoredOn == null) {
-                            continue;
-                        }
-                        cmd = new CreateCommand(dskCh, tmpltStoredOn.getLocalDownloadPath(), new StorageFilerTO(pool));
-                    }
-                } else {
-                    if (volume.getVolumeType() == Type.ROOT && Storage.ImageFormat.ISO == template.getFormat()) {
-                        VMTemplateHostVO tmpltHostOn = _tmpltMgr.prepareISOForCreate(template, pool);
-                        if (tmpltHostOn == null) {
-                            throw new CloudRuntimeException("Did not find ISO in secondry storage in zone " + pool.getDataCenterId());
-                        }
-                    }
-                    cmd = new CreateCommand(dskCh, new StorageFilerTO(pool));
-                }
-
-                try {
-                    Answer answer = sendToPool(pool, cmd);
-                    if (answer != null && answer.getResult()) {
-                        created = ((CreateAnswer) answer).getVolume();
-                        break;
-                    }
-
-                    if (tmpltStoredOn != null && answer != null && (answer instanceof CreateAnswer) && ((CreateAnswer) answer).templateReloadRequested()) {
-                        if (!_tmpltMgr.resetTemplateDownloadStateOnPool(tmpltStoredOn.getId())) {
-                            break; // break out of template-redeploy retry loop
-                        }
-                    } else {
-                        break;
-                    }
-                } catch (StorageUnavailableException e) {
-                    s_logger.debug("Storage unavailable for " + pool.getId());
-                    break; // break out of template-redeploy retry loop
-                }
-            }
-
-            if (created != null) {
-                break;
-            }
-
-            s_logger.debug("Retrying the create because it failed on pool " + pool);
-        }
-
-        if (created == null) {
-            return null;
-        } else {
-            volume.setFolder(pool.getPath());
-            volume.setPath(created.getPath());
-            volume.setSize(created.getSize());
-            volume.setPoolType(pool.getPoolType());
-            volume.setPoolId(pool.getId());
-            volume.setPodId(pod.getId());
-            try {
-                stateTransitTo(volume, Volume.Event.OperationSucceeded);
-            } catch (NoTransitionException e) {
-                s_logger.debug("Unable to update volume state: " + e.toString());
-                return null;
-            }
-            return volume;
-        }
-    }
-
-    public Long chooseHostForStoragePool(StoragePoolVO poolVO, List<Long> avoidHosts, boolean sendToVmResidesOn, Long vmId) {
+    public Long chooseHostForStoragePool(StoragePoolVO poolVO,
+            List<Long> avoidHosts, boolean sendToVmResidesOn, Long vmId) {
         if (sendToVmResidesOn) {
             if (vmId != null) {
                 VMInstanceVO vmInstance = _vmInstanceDao.findById(vmId);
                 if (vmInstance != null) {
                     Long hostId = vmInstance.getHostId();
-                    if (hostId != null && !avoidHosts.contains(vmInstance.getHostId())) {
+                    if (hostId != null
+                            && !avoidHosts.contains(vmInstance.getHostId())) {
                         return hostId;
                     }
                 }
             }
             /*
-             * Can't find the vm where host resides on(vm is destroyed? or volume is detached from vm), randomly choose
-             * a host
-             * to send the cmd
+             * Can't find the vm where host resides on(vm is destroyed? or
+             * volume is detached from vm), randomly choose a host to send the
+             * cmd
              */
         }
-        List<StoragePoolHostVO> poolHosts = _poolHostDao.listByHostStatus(poolVO.getId(), Status.Up);
+        List<StoragePoolHostVO> poolHosts = _poolHostDao.listByHostStatus(
+                poolVO.getId(), Status.Up);
         Collections.shuffle(poolHosts);
         if (poolHosts != null && poolHosts.size() > 0) {
             for (StoragePoolHostVO sphvo : poolHosts) {
@@ -876,9 +538,12 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
 
-        Map<String, String> configs = _configDao.getConfiguration("management-server", params);
 
-        String overProvisioningFactorStr = configs.get("storage.overprovisioning.factor");
+        Map<String, String> configs = _configDao.getConfiguration(
+                "management-server", params);
+
+        String overProvisioningFactorStr = configs
+                .get("storage.overprovisioning.factor");
         if (overProvisioningFactorStr != null) {
             _overProvisioningFactor = new BigDecimal(overProvisioningFactorStr);
         }
@@ -886,94 +551,128 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         _retry = NumbersUtil.parseInt(configs.get(Config.StartRetry.key()), 10);
         _pingInterval = NumbersUtil.parseInt(configs.get("ping.interval"), 60);
         _hostRetry = NumbersUtil.parseInt(configs.get("host.retry"), 2);
-        _storagePoolAcquisitionWaitSeconds = NumbersUtil.parseInt(configs.get("pool.acquisition.wait.seconds"), 1800);
-        s_logger.info("pool.acquisition.wait.seconds is configured as " + _storagePoolAcquisitionWaitSeconds + " seconds");
+        _storagePoolAcquisitionWaitSeconds = NumbersUtil.parseInt(
+                configs.get("pool.acquisition.wait.seconds"), 1800);
+        s_logger.info("pool.acquisition.wait.seconds is configured as "
+                + _storagePoolAcquisitionWaitSeconds + " seconds");
 
-        _agentMgr.registerForHostEvents(new StoragePoolMonitor(this, _storagePoolDao), true, false, true);
+        _agentMgr.registerForHostEvents(new StoragePoolMonitor(this,
+                _storagePoolDao), true, false, true);
 
         String storageCleanupEnabled = configs.get("storage.cleanup.enabled");
-        _storageCleanupEnabled = (storageCleanupEnabled == null) ? true : Boolean.parseBoolean(storageCleanupEnabled);
+        _storageCleanupEnabled = (storageCleanupEnabled == null) ? true
+                : Boolean.parseBoolean(storageCleanupEnabled);
 
-        String value = _configDao.getValue(Config.CreateVolumeFromSnapshotWait.toString());
-        _createVolumeFromSnapshotWait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CreateVolumeFromSnapshotWait.getDefaultValue()));
+        String value = _configDao.getValue(Config.CreateVolumeFromSnapshotWait
+                .toString());
+        _createVolumeFromSnapshotWait = NumbersUtil.parseInt(value,
+                Integer.parseInt(Config.CreateVolumeFromSnapshotWait
+                        .getDefaultValue()));
 
         value = _configDao.getValue(Config.CopyVolumeWait.toString());
-        _copyvolumewait = NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
+        _copyvolumewait = NumbersUtil.parseInt(value,
+                Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
 
         value = _configDao.getValue(Config.RecreateSystemVmEnabled.key());
         _recreateSystemVmEnabled = Boolean.parseBoolean(value);
 
         value = _configDao.getValue(Config.StorageTemplateCleanupEnabled.key());
-        _templateCleanupEnabled = (value == null ? true : Boolean.parseBoolean(value));
+        _templateCleanupEnabled = (value == null ? true : Boolean
+                .parseBoolean(value));
 
         String time = configs.get("storage.cleanup.interval");
         _storageCleanupInterval = NumbersUtil.parseInt(time, 86400);
 
-        String storageUsedThreshold = _configDao.getValue(Config.StorageCapacityDisableThreshold.key());
+        String storageUsedThreshold = _configDao
+                .getValue(Config.StorageCapacityDisableThreshold.key());
         if (storageUsedThreshold != null) {
             _storageUsedThreshold = Double.parseDouble(storageUsedThreshold);
         }
 
-        String storageAllocatedThreshold = _configDao.getValue(Config.StorageAllocatedCapacityDisableThreshold.key());
+        String storageAllocatedThreshold = _configDao
+                .getValue(Config.StorageAllocatedCapacityDisableThreshold.key());
         if (storageAllocatedThreshold != null) {
-            _storageAllocatedThreshold = Double.parseDouble(storageAllocatedThreshold);
+            _storageAllocatedThreshold = Double
+                    .parseDouble(storageAllocatedThreshold);
         }
 
-        String globalStorageOverprovisioningFactor = configs.get("storage.overprovisioning.factor");
-        _storageOverprovisioningFactor = new BigDecimal(NumbersUtil.parseFloat(globalStorageOverprovisioningFactor, 2.0f));
+        String globalStorageOverprovisioningFactor = configs
+                .get("storage.overprovisioning.factor");
+        _storageOverprovisioningFactor = new BigDecimal(NumbersUtil.parseFloat(
+                globalStorageOverprovisioningFactor, 2.0f));
 
-        s_logger.info("Storage cleanup enabled: " + _storageCleanupEnabled + ", interval: " + _storageCleanupInterval + ", template cleanup enabled: " + _templateCleanupEnabled);
+        s_logger.info("Storage cleanup enabled: " + _storageCleanupEnabled
+                + ", interval: " + _storageCleanupInterval
+                + ", template cleanup enabled: " + _templateCleanupEnabled);
 
         String workers = configs.get("expunge.workers");
         int wrks = NumbersUtil.parseInt(workers, 10);
-        _executor = Executors.newScheduledThreadPool(wrks, new NamedThreadFactory("StorageManager-Scavenger"));
+        _executor = Executors.newScheduledThreadPool(wrks,
+                new NamedThreadFactory("StorageManager-Scavenger"));
 
-        _agentMgr.registerForHostEvents(ComponentContext.inject(LocalStoragePoolListener.class), true, false, false);
+        _agentMgr.registerForHostEvents(
+                ComponentContext.inject(LocalStoragePoolListener.class), true,
+                false, false);
 
-        String maxVolumeSizeInGbString = _configDao.getValue("storage.max.volume.size");
-        _maxVolumeSizeInGb = NumbersUtil.parseLong(maxVolumeSizeInGbString, 2000);
+        String maxVolumeSizeInGbString = _configDao
+                .getValue("storage.max.volume.size");
+        _maxVolumeSizeInGb = NumbersUtil.parseLong(maxVolumeSizeInGbString,
+                2000);
 
-        String _customDiskOfferingMinSizeStr = _configDao.getValue(Config.CustomDiskOfferingMinSize.toString());
-        _customDiskOfferingMinSize = NumbersUtil.parseInt(_customDiskOfferingMinSizeStr, Integer.parseInt(Config.CustomDiskOfferingMinSize.getDefaultValue()));
+        String _customDiskOfferingMinSizeStr = _configDao
+                .getValue(Config.CustomDiskOfferingMinSize.toString());
+        _customDiskOfferingMinSize = NumbersUtil.parseInt(
+                _customDiskOfferingMinSizeStr, Integer
+                        .parseInt(Config.CustomDiskOfferingMinSize
+                                .getDefaultValue()));
 
-        String _customDiskOfferingMaxSizeStr = _configDao.getValue(Config.CustomDiskOfferingMaxSize.toString());
-        _customDiskOfferingMaxSize = NumbersUtil.parseInt(_customDiskOfferingMaxSizeStr, Integer.parseInt(Config.CustomDiskOfferingMaxSize.getDefaultValue()));
+        String _customDiskOfferingMaxSizeStr = _configDao
+                .getValue(Config.CustomDiskOfferingMaxSize.toString());
+        _customDiskOfferingMaxSize = NumbersUtil.parseInt(
+                _customDiskOfferingMaxSizeStr, Integer
+                        .parseInt(Config.CustomDiskOfferingMaxSize
+                                .getDefaultValue()));
 
-        HostTemplateStatesSearch = _vmTemplateHostDao.createSearchBuilder();
-        HostTemplateStatesSearch.and("id", HostTemplateStatesSearch.entity().getTemplateId(), SearchCriteria.Op.EQ);
-        HostTemplateStatesSearch.and("state", HostTemplateStatesSearch.entity().getDownloadState(), SearchCriteria.Op.EQ);
-
-        SearchBuilder<HostVO> HostSearch = _hostDao.createSearchBuilder();
-        HostSearch.and("dcId", HostSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
-
-        HostTemplateStatesSearch.join("host", HostSearch, HostSearch.entity().getId(), HostTemplateStatesSearch.entity().getHostId(), JoinBuilder.JoinType.INNER);
-        HostSearch.done();
-        HostTemplateStatesSearch.done();
 
         _serverId = _msServer.getId();
 
-        UpHostsInPoolSearch = _storagePoolHostDao.createSearchBuilder(Long.class);
-        UpHostsInPoolSearch.selectField(UpHostsInPoolSearch.entity().getHostId());
+        UpHostsInPoolSearch = _storagePoolHostDao
+                .createSearchBuilder(Long.class);
+        UpHostsInPoolSearch.selectField(UpHostsInPoolSearch.entity()
+                .getHostId());
         SearchBuilder<HostVO> hostSearch = _hostDao.createSearchBuilder();
         hostSearch.and("status", hostSearch.entity().getStatus(), Op.EQ);
-        hostSearch.and("resourceState", hostSearch.entity().getResourceState(), Op.EQ);
-        UpHostsInPoolSearch.join("hosts", hostSearch, hostSearch.entity().getId(), UpHostsInPoolSearch.entity().getHostId(), JoinType.INNER);
-        UpHostsInPoolSearch.and("pool", UpHostsInPoolSearch.entity().getPoolId(), Op.EQ);
+        hostSearch.and("resourceState", hostSearch.entity().getResourceState(),
+                Op.EQ);
+        UpHostsInPoolSearch.join("hosts", hostSearch, hostSearch.entity()
+                .getId(), UpHostsInPoolSearch.entity().getHostId(),
+                JoinType.INNER);
+        UpHostsInPoolSearch.and("pool", UpHostsInPoolSearch.entity()
+                .getPoolId(), Op.EQ);
         UpHostsInPoolSearch.done();
 
         StoragePoolSearch = _vmInstanceDao.createSearchBuilder();
 
         SearchBuilder<VolumeVO> volumeSearch = _volumeDao.createSearchBuilder();
-        volumeSearch.and("volumeType", volumeSearch.entity().getVolumeType(), SearchCriteria.Op.EQ);
-        volumeSearch.and("poolId", volumeSearch.entity().getPoolId(), SearchCriteria.Op.EQ);
-        StoragePoolSearch.join("vmVolume", volumeSearch, volumeSearch.entity().getInstanceId(), StoragePoolSearch.entity().getId(), JoinBuilder.JoinType.INNER);
+        volumeSearch.and("volumeType", volumeSearch.entity().getVolumeType(),
+                SearchCriteria.Op.EQ);
+        volumeSearch.and("poolId", volumeSearch.entity().getPoolId(),
+                SearchCriteria.Op.EQ);
+        StoragePoolSearch.join("vmVolume", volumeSearch, volumeSearch.entity()
+                .getInstanceId(), StoragePoolSearch.entity().getId(),
+                JoinBuilder.JoinType.INNER);
         StoragePoolSearch.done();
 
         LocalStorageSearch = _storagePoolDao.createSearchBuilder();
-        SearchBuilder<StoragePoolHostVO> storageHostSearch = _storagePoolHostDao.createSearchBuilder();
-        storageHostSearch.and("hostId", storageHostSearch.entity().getHostId(), SearchCriteria.Op.EQ);
-        LocalStorageSearch.join("poolHost", storageHostSearch, storageHostSearch.entity().getPoolId(), LocalStorageSearch.entity().getId(), JoinBuilder.JoinType.INNER);
-        LocalStorageSearch.and("type", LocalStorageSearch.entity().getPoolType(), SearchCriteria.Op.IN);
+        SearchBuilder<StoragePoolHostVO> storageHostSearch = _storagePoolHostDao
+                .createSearchBuilder();
+        storageHostSearch.and("hostId", storageHostSearch.entity().getHostId(),
+                SearchCriteria.Op.EQ);
+        LocalStorageSearch.join("poolHost", storageHostSearch,
+                storageHostSearch.entity().getPoolId(), LocalStorageSearch
+                        .entity().getId(), JoinBuilder.JoinType.INNER);
+        LocalStorageSearch.and("type", LocalStorageSearch.entity()
+                .getPoolType(), SearchCriteria.Op.IN);
         LocalStorageSearch.done();
 
         Volume.State.getStateMachine().registerListener( new VolumeStateListener());
@@ -981,435 +680,199 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         return true;
     }
 
-    public String getRandomVolumeName() {
-        return UUID.randomUUID().toString();
+    
+    @Override
+    public String getStoragePoolTags(long poolId) {
+        return _configMgr.listToCsvTags(_storagePoolDao
+                .searchForStoragePoolDetails(poolId, "true"));
     }
 
     @Override
-    public boolean volumeOnSharedStoragePool(VolumeVO volume) {
-        Long poolId = volume.getPoolId();
-        if (poolId == null) {
-            return false;
+    public boolean start() {
+        if (_storageCleanupEnabled) {
+            Random generator = new Random();
+            int initialDelay = generator.nextInt(_storageCleanupInterval);
+            _executor.scheduleWithFixedDelay(new StorageGarbageCollector(),
+                    initialDelay, _storageCleanupInterval, TimeUnit.SECONDS);
         } else {
-            StoragePoolVO pool = _storagePoolDao.findById(poolId);
-
-            if (pool == null) {
-                return false;
-            } else {
-                return pool.isShared();
-            }
+            s_logger.debug("Storage cleanup is not enabled, so the storage cleanup thread is not being scheduled.");
         }
+
+        return true;
     }
 
     @Override
-    public boolean volumeInactive(VolumeVO volume) {
-        Long vmId = volume.getInstanceId();
-        if (vmId != null) {
-            UserVm vm = _userVmDao.findById(vmId);
-            if (vm == null) {
-                return true;
-            }
-            State state = vm.getState();
-            if (state.equals(State.Stopped) || state.equals(State.Destroyed)) {
-                return true;
-            }
+    public boolean stop() {
+        if (_storageCleanupEnabled) {
+            _executor.shutdown();
         }
-        return false;
-    }
 
+        return true;
+    }
+    
+    @DB
     @Override
-    public String getVmNameOnVolume(VolumeVO volume) {
-        Long vmId = volume.getInstanceId();
-        if (vmId != null) {
-            VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+    public DataStore createLocalStorage(Host host, StoragePoolInfo pInfo) throws ConnectionException {
 
-            if (vm == null) {
-                return null;
-            }
-            return vm.getInstanceName();
+        DataCenterVO dc = _dcDao.findById(host.getDataCenterId());
+        if (dc == null || !dc.isLocalStorageEnabled()) {
+            return null;
         }
-        return null;
-    }
-
-    @Override
-    public Pair<String, String> getAbsoluteIsoPath(long templateId, long dataCenterId) {
-        String isoPath = null;
-
-        List<HostVO> storageHosts = _resourceMgr.listAllHostsInOneZoneByType(Host.Type.SecondaryStorage, dataCenterId);
-        if (storageHosts != null) {
-            for (HostVO storageHost : storageHosts) {
-                List<VMTemplateHostVO> templateHostVOs = _vmTemplateHostDao.listByTemplateHostStatus(templateId, storageHost.getId(), VMTemplateStorageResourceAssoc.Status.DOWNLOADED );
-                if (templateHostVOs != null && !templateHostVOs.isEmpty()) {
-                    VMTemplateHostVO tmpHostVO = templateHostVOs.get(0);
-                    isoPath = storageHost.getStorageUrl() + "/" + tmpHostVO.getInstallPath();
-                    return new Pair<String, String>(isoPath, storageHost.getStorageUrl());
+        DataStore store = null;
+        try {
+            StoragePoolVO pool = _storagePoolDao.findPoolByHostPath(host.getDataCenterId(), host.getPodId(), pInfo.getHost(), pInfo.getHostPath(), pInfo.getUuid());
+            if(pool == null && host.getHypervisorType() == HypervisorType.VMware) {
+                // perform run-time upgrade. In versions prior to 2.2.12, there is a bug that we don't save local datastore info (host path is empty), this will cause us
+                // not able to distinguish multiple local datastores that may be available on the host, to support smooth migration, we 
+                // need to perform runtime upgrade here
+                if(pInfo.getHostPath().length() > 0) {
+                    pool = _storagePoolDao.findPoolByHostPath(host.getDataCenterId(), host.getPodId(), pInfo.getHost(), "", pInfo.getUuid());
                 }
             }
+            DataStoreProvider provider = this.dataStoreProviderMgr.getDefaultPrimaryDataStoreProvider();
+            DataStoreLifeCycle lifeCycle = provider.getLifeCycle();
+            if (pool == null) {
+                Map<String, Object> params = new HashMap<String, Object>();
+                String name = (host.getName() + " Local Storage");
+                params.put("zoneId", host.getDataCenterId());
+                params.put("clusterId", host.getClusterId());
+                params.put("podId", host.getPodId());
+                params.put("url", pInfo.getPoolType().toString() + "://" + pInfo.getHost() + "/" + pInfo.getHostPath());
+                params.put("name", name);
+                params.put("localStorage", true);
+                params.put("details", pInfo.getDetails());
+                params.put("uuid", pInfo.getUuid());
+                params.put("providerId", provider.getId());
+                
+                store = lifeCycle.initialize(params);
+            } else {
+                store = (DataStore) dataStoreMgr.getDataStore(pool.getId(),
+                        DataStoreRole.Primary);
+            }
+            
+            HostScope scope = new HostScope(host.getId());
+            lifeCycle.attachHost(store, scope, pInfo);
+        } catch (Exception e) {
+            s_logger.warn("Unable to setup the local storage pool for " + host, e);
+            throw new ConnectionException(true, "Unable to setup the local storage pool for " + host, e);
         }
-        s_logger.warn("Unable to find secondary storage in zone id=" + dataCenterId);
-        return null;
+
+        return (DataStore) dataStoreMgr.getDataStore(store.getId(),
+                DataStoreRole.Primary);
     }
 
     @Override
-    public String getSecondaryStorageURL(long zoneId) {
-        // Determine the secondary storage URL
-        HostVO secondaryStorageHost = getSecondaryStorageHost(zoneId);
+    @SuppressWarnings("rawtypes")
+    public PrimaryDataStoreInfo createPool(CreateStoragePoolCmd cmd)
+            throws ResourceInUseException, IllegalArgumentException,
+            UnknownHostException, ResourceUnavailableException {
+        String providerUuid = cmd.getStorageProviderUuid();
+        DataStoreProvider storeProvider = dataStoreProviderMgr
+                .getDataStoreProviderByUuid(providerUuid);
 
-        if (secondaryStorageHost == null) {
-            return null;
+        if (storeProvider == null) {
+            storeProvider = dataStoreProviderMgr.getDefaultPrimaryDataStoreProvider();
+            throw new InvalidParameterValueException(
+                    "invalid storage provider uuid" + providerUuid);
         }
 
-        return secondaryStorageHost.getStorageUrl();
-    }
+        Long clusterId = cmd.getClusterId();
+        Long podId = cmd.getPodId();
+        Long zoneId = cmd.getZoneId();
 
-    @Override
-    public HostVO getSecondaryStorageHost(long zoneId, long tmpltId) {
-        List<HostVO> hosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(zoneId);
-        if (hosts == null || hosts.size() == 0) {
-            return null;
-        }
-        for (HostVO host : hosts) {
-            VMTemplateHostVO tmpltHost = _vmTemplateHostDao.findByHostTemplate(host.getId(), tmpltId);
-            if (tmpltHost != null && !tmpltHost.getDestroyed() && tmpltHost.getDownloadState() == VMTemplateStorageResourceAssoc.Status.DOWNLOADED) {
-                return host;
+        ScopeType scopeType = ScopeType.CLUSTER;
+        String scope = cmd.getScope();
+        if (scope != null) {
+            try {
+                scopeType = Enum.valueOf(ScopeType.class, scope);
+            } catch (Exception e) {
+                throw new InvalidParameterValueException("invalid scope"
+                        + scope);
             }
         }
-        return null;
-    }
 
-    @Override
-    public VMTemplateHostVO getTemplateHostRef(long zoneId, long tmpltId, boolean readyOnly) {
-        List<HostVO> hosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(zoneId);
-        if (hosts == null || hosts.size() == 0) {
-            return null;
+        if (scopeType == ScopeType.CLUSTER && clusterId == null) {
+            throw new InvalidParameterValueException(
+                    "cluster id can't be null, if scope is cluster");
+        } else if (scopeType == ScopeType.ZONE && zoneId == null) {
+            throw new InvalidParameterValueException(
+                    "zone id can't be null, if scope is zone");
         }
-        VMTemplateHostVO inProgress = null;
-        VMTemplateHostVO other = null;
-        for (HostVO host : hosts) {
-            VMTemplateHostVO tmpltHost = _vmTemplateHostDao.findByHostTemplate(host.getId(), tmpltId);
-            if (tmpltHost != null && !tmpltHost.getDestroyed()) {
-                if (tmpltHost.getDownloadState() == VMTemplateStorageResourceAssoc.Status.DOWNLOADED) {
-                    return tmpltHost;
-                } else if (tmpltHost.getDownloadState() == VMTemplateStorageResourceAssoc.Status.DOWNLOAD_IN_PROGRESS) {
-                    inProgress = tmpltHost;
-                } else {
-                    other = tmpltHost;
+
+        Map ds = cmd.getDetails();
+        Map<String, String> details = new HashMap<String, String>();
+        if (ds != null) {
+            Collection detailsCollection = ds.values();
+            Iterator it = detailsCollection.iterator();
+            while (it.hasNext()) {
+                HashMap d = (HashMap) it.next();
+                Iterator it2 = d.entrySet().iterator();
+                while (it2.hasNext()) {
+                    Map.Entry entry = (Map.Entry) it2.next();
+                    details.put((String) entry.getKey(),
+                            (String) entry.getValue());
                 }
             }
         }
-        if (inProgress != null) {
-            return inProgress;
+
+        DataCenterVO zone = _dcDao.findById(cmd.getZoneId());
+        if (zone == null) {
+            throw new InvalidParameterValueException(
+                    "unable to find zone by id " + zoneId);
         }
-        return other;
-    }
+        // Check if zone is disabled
+        Account account = UserContext.current().getCaller();
+        if (Grouping.AllocationState.Disabled == zone.getAllocationState()
+                && !_accountMgr.isRootAdmin(account.getType())) {
+            throw new PermissionDeniedException(
+                    "Cannot perform this operation, Zone is currently disabled: "
+                            + zoneId);
+        }
+
+        Map<String, Object> params = new HashMap<String, Object>();
+        params.put("zoneId", zone.getId());
+        params.put("clusterId", clusterId);
+        params.put("podId", podId);
+        params.put("url", cmd.getUrl());
+        params.put("tags", cmd.getTags());
+        params.put("name", cmd.getStoragePoolName());
+        params.put("details", details);
+        params.put("providerId", storeProvider.getId());
+
+        DataStoreLifeCycle lifeCycle = storeProvider.getLifeCycle();
+        DataStore store = null;
+        try {
+            store = lifeCycle.initialize(params);
 
-    @Override
-    public HostVO getSecondaryStorageHost(long zoneId) {
-        List<HostVO> hosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(zoneId);
-        if (hosts == null || hosts.size() == 0) {
-            hosts = _ssvmMgr.listLocalSecondaryStorageHostsInOneZone(zoneId);
-            if (hosts.isEmpty()) {
-                return null;
+            if (scopeType == ScopeType.CLUSTER) {
+                ClusterScope clusterScope = new ClusterScope(clusterId, podId,
+                        zoneId);
+                lifeCycle.attachCluster(store, clusterScope);
+            } else if (scopeType == ScopeType.ZONE) {
+                ZoneScope zoneScope = new ZoneScope(zoneId);
+                lifeCycle.attachZone(store, zoneScope);
             }
+        } catch (Exception e) {
+            s_logger.debug("Failed to add data store", e);
+            throw new CloudRuntimeException("Failed to add data store", e);
         }
 
-        int size = hosts.size();
-        Random rn = new Random();
-        int index = rn.nextInt(size);
-        return hosts.get(index);
+        return (PrimaryDataStoreInfo) dataStoreMgr.getDataStore(store.getId(),
+                DataStoreRole.Primary);
     }
 
     @Override
-    public List<HostVO> getSecondaryStorageHosts(long zoneId) {
-        List<HostVO> hosts = _ssvmMgr.listSecondaryStorageHostsInOneZone(zoneId);
-        if (hosts == null || hosts.size() == 0) {
-            hosts = _ssvmMgr.listLocalSecondaryStorageHostsInOneZone(zoneId);
-            if (hosts.isEmpty()) {
-                return new ArrayList<HostVO>();
-            }
-        }
-        return hosts;
-    }
+    public PrimaryDataStoreInfo updateStoragePool(UpdateStoragePoolCmd cmd)
+            throws IllegalArgumentException {
+        // Input validation
+        Long id = cmd.getId();
+        List<String> tags = cmd.getTags();
 
-    @Override
-    public String getStoragePoolTags(long poolId) {
-        return _configMgr.listToCsvTags(_storagePoolDao.searchForStoragePoolDetails(poolId, "true"));
-    }
-
-    @Override
-    public boolean start() {
-        if (_storageCleanupEnabled) {
-            Random generator = new Random();
-            int initialDelay = generator.nextInt(_storageCleanupInterval);
-            _executor.scheduleWithFixedDelay(new StorageGarbageCollector(), initialDelay, _storageCleanupInterval, TimeUnit.SECONDS);
-        } else {
-            s_logger.debug("Storage cleanup is not enabled, so the storage cleanup thread is not being scheduled.");
-        }
-
-        return true;
-    }
-
-    @Override
-    public boolean stop() {
-        if (_storageCleanupEnabled) {
-            _executor.shutdown();
-        }
-
-        return true;
-    }
-
-    protected StorageManagerImpl() {
-        _volStateMachine = Volume.State.getStateMachine();
-    }
-
-    @Override
-    @SuppressWarnings("rawtypes")
-    public StoragePoolVO createPool(CreateStoragePoolCmd cmd) throws ResourceInUseException, IllegalArgumentException, UnknownHostException, ResourceUnavailableException {
-        Long clusterId = cmd.getClusterId();
-        Long podId = cmd.getPodId();
-        Map ds = cmd.getDetails();
-
-        if (clusterId != null && podId == null) {
-            throw new InvalidParameterValueException("Cluster id requires pod id");
-        }
-
-        Map<String, String> details = new HashMap<String, String>();
-        if (ds != null) {
-            Collection detailsCollection = ds.values();
-            Iterator it = detailsCollection.iterator();
-            while (it.hasNext()) {
-                HashMap d = (HashMap) it.next();
-                Iterator it2 = d.entrySet().iterator();
-                while (it2.hasNext()) {
-                    Map.Entry entry = (Map.Entry) it2.next();
-                    details.put((String) entry.getKey(), (String) entry.getValue());
-                }
-            }
-        }
-
-        // verify input parameters
-        Long zoneId = cmd.getZoneId();
-        DataCenterVO zone = _dcDao.findById(cmd.getZoneId());
-        if (zone == null) {
-            throw new InvalidParameterValueException("unable to find zone by id " + zoneId);
-        }
-        // Check if zone is disabled
-        Account account = UserContext.current().getCaller();
-        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(account.getType())) {
-            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zoneId);
-        }
-
-        // Check if there is host up in this cluster
-        List<HostVO> allHosts = _resourceMgr.listAllUpAndEnabledHosts(Host.Type.Routing, clusterId, podId, zoneId);
-        if (allHosts.isEmpty()) {
-            throw new ResourceUnavailableException("No host up to associate a storage pool with in cluster " + clusterId, Pod.class, podId);
-        }
-        URI uri = null;
-        try {
-            uri = new URI(UriUtils.encodeURIComponent(cmd.getUrl()));
-            if (uri.getScheme() == null) {
-                throw new InvalidParameterValueException("scheme is null " + cmd.getUrl() + ", add nfs:// as a prefix");
-            } else if (uri.getScheme().equalsIgnoreCase("nfs")) {
-                String uriHost = uri.getHost();
-                String uriPath = uri.getPath();
-                if (uriHost == null || uriPath == null || uriHost.trim().isEmpty() || uriPath.trim().isEmpty()) {
-                    throw new InvalidParameterValueException("host or path is null, should be nfs://hostname/path");
-                }
-            } else if (uri.getScheme().equalsIgnoreCase("sharedMountPoint")) {
-                String uriPath = uri.getPath();
-                if (uriPath == null) {
-                    throw new InvalidParameterValueException("host or path is null, should be sharedmountpoint://localhost/path");
-                }
-            }  else if (uri.getScheme().equalsIgnoreCase("rbd")) {
-                String uriPath = uri.getPath();
-                if (uriPath == null) {
-                    throw new InvalidParameterValueException("host or path is null, should be rbd://hostname/pool");
-                }
-            }
-        } catch (URISyntaxException e) {
-            throw new InvalidParameterValueException(cmd.getUrl() + " is not a valid uri");
-        }
-
-        String tags = cmd.getTags();
-        if (tags != null) {
-            String[] tokens = tags.split(",");
-
-            for (String tag : tokens) {
-                tag = tag.trim();
-                if (tag.length() == 0) {
-                    continue;
-                }
-                details.put(tag, "true");
-            }
-        }
-
-        String scheme = uri.getScheme();
-        String storageHost = uri.getHost();
-        String hostPath = uri.getPath();
-        String userInfo = uri.getUserInfo();
-        int port = uri.getPort();
-        StoragePoolVO pool = null;
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("createPool Params @ scheme - " + scheme + " storageHost - " + storageHost + " hostPath - " + hostPath + " port - " + port);
-        }
-        if (scheme.equalsIgnoreCase("nfs")) {
-            if (port == -1) {
-                port = 2049;
-            }
-            pool = new StoragePoolVO(StoragePoolType.NetworkFilesystem, storageHost, port, hostPath);
-            if (clusterId == null) {
-                throw new IllegalArgumentException("NFS need to have clusters specified for XenServers");
-            }
-        } else if (scheme.equalsIgnoreCase("file")) {
-            if (port == -1) {
-                port = 0;
-            }
-            pool = new StoragePoolVO(StoragePoolType.Filesystem, "localhost", 0, hostPath);
-        } else if (scheme.equalsIgnoreCase("sharedMountPoint")) {
-            pool = new StoragePoolVO(StoragePoolType.SharedMountPoint, storageHost, 0, hostPath);
-        } else if (scheme.equalsIgnoreCase("clvm")) {
-            pool = new StoragePoolVO(StoragePoolType.CLVM, storageHost, 0, hostPath.replaceFirst("/", ""));
-        } else if (scheme.equalsIgnoreCase("rbd")) {
-            if (port == -1) {
-                port = 6789;
-            }
-            pool = new StoragePoolVO(StoragePoolType.RBD, storageHost, port, hostPath.replaceFirst("/", ""), userInfo);
-        } else if (scheme.equalsIgnoreCase("PreSetup")) {
-            pool = new StoragePoolVO(StoragePoolType.PreSetup, storageHost, 0, hostPath);
-        } else if (scheme.equalsIgnoreCase("iscsi")) {
-            String[] tokens = hostPath.split("/");
-            int lun = NumbersUtil.parseInt(tokens[tokens.length - 1], -1);
-            if (port == -1) {
-                port = 3260;
-            }
-            if (lun != -1) {
-                if (clusterId == null) {
-                    throw new IllegalArgumentException("IscsiLUN need to have clusters specified");
-                }
-                hostPath.replaceFirst("/", "");
-                pool = new StoragePoolVO(StoragePoolType.IscsiLUN, storageHost, port, hostPath);
-            } else {
-                for (StoragePoolDiscoverer discoverer : _discoverers) {
-                    Map<StoragePoolVO, Map<String, String>> pools;
-                    try {
-                        pools = discoverer.find(cmd.getZoneId(), podId, uri, details);
-                    } catch (DiscoveryException e) {
-                        throw new IllegalArgumentException("Not enough information for discovery " + uri, e);
-                    }
-                    if (pools != null) {
-                        Map.Entry<StoragePoolVO, Map<String, String>> entry = pools.entrySet().iterator().next();
-                        pool = entry.getKey();
-                        details = entry.getValue();
-                        break;
-                    }
-                }
-            }
-        } else if (scheme.equalsIgnoreCase("iso")) {
-            if (port == -1) {
-                port = 2049;
-            }
-            pool = new StoragePoolVO(StoragePoolType.ISO, storageHost, port, hostPath);
-        } else if (scheme.equalsIgnoreCase("vmfs")) {
-            pool = new StoragePoolVO(StoragePoolType.VMFS, "VMFS datastore: " + hostPath, 0, hostPath);
-        } else if (scheme.equalsIgnoreCase("ocfs2")) {
-            port = 7777;
-            pool = new StoragePoolVO(StoragePoolType.OCFS2, "clustered", port, hostPath);
-        } else {
-            s_logger.warn("Unable to figure out the scheme for URI: " + uri);
-            throw new IllegalArgumentException("Unable to figure out the scheme for URI: " + uri);
-        }
-
-        if (pool == null) {
-            s_logger.warn("Unable to figure out the scheme for URI: " + uri);
-            throw new IllegalArgumentException("Unable to figure out the scheme for URI: " + uri);
-        }
-
-        List<StoragePoolVO> pools = _storagePoolDao.listPoolByHostPath(storageHost, hostPath);
-        if (!pools.isEmpty() && !scheme.equalsIgnoreCase("sharedmountpoint")) {
-            Long oldPodId = pools.get(0).getPodId();
-            throw new ResourceInUseException("Storage pool " + uri + " already in use by another pod (id=" + oldPodId + ")", "StoragePool", uri.toASCIIString());
-        }
-
-        long poolId = _storagePoolDao.getNextInSequence(Long.class, "id");
-        String uuid = null;
-        if (scheme.equalsIgnoreCase("sharedmountpoint") || scheme.equalsIgnoreCase("clvm")) {
-            uuid = UUID.randomUUID().toString();
-        } else if (scheme.equalsIgnoreCase("PreSetup")) {
-            uuid = hostPath.replace("/", "");
-        } else {
-            uuid = UUID.nameUUIDFromBytes(new String(storageHost + hostPath).getBytes()).toString();
-        }
-
-        List<StoragePoolVO> spHandles = _storagePoolDao.findIfDuplicatePoolsExistByUUID(uuid);
-        if ((spHandles != null) && (spHandles.size() > 0)) {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Another active pool with the same uuid already exists");
-            }
-            throw new ResourceInUseException("Another active pool with the same uuid already exists");
-        }
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("In createPool Setting poolId - " + poolId + " uuid - " + uuid + " zoneId - " + zoneId + " podId - " + podId + " poolName - " + cmd.getStoragePoolName());
-        }
-
-        pool.setId(poolId);
-        pool.setUuid(uuid);
-        pool.setDataCenterId(cmd.getZoneId());
-        pool.setPodId(podId);
-        pool.setName(cmd.getStoragePoolName());
-        pool.setClusterId(clusterId);
-        pool.setStatus(StoragePoolStatus.Up);
-        pool = _storagePoolDao.persist(pool, details);
-
-        if (pool.getPoolType() == StoragePoolType.OCFS2 && !_ocfs2Mgr.prepareNodes(allHosts, pool)) {
-            s_logger.warn("Can not create storage pool " + pool + " on cluster " + clusterId);
-            _storagePoolDao.expunge(pool.getId());
-            return null;
-        }
-
-        boolean success = false;
-        for (HostVO h : allHosts) {
-            success = createStoragePool(h.getId(), pool);
-            if (success) {
-                break;
-            }
-        }
-        if (!success) {
-            s_logger.warn("Can not create storage pool " + pool + " on cluster " + clusterId);
-            _storagePoolDao.expunge(pool.getId());
-            return null;
-        }
-        s_logger.debug("In createPool Adding the pool to each of the hosts");
-        List<HostVO> poolHosts = new ArrayList<HostVO>();
-        for (HostVO h : allHosts) {
-            try {
-                connectHostToSharedPool(h.getId(), pool);
-                poolHosts.add(h);
-            } catch (Exception e) {
-                s_logger.warn("Unable to establish a connection between " + h + " and " + pool, e);
-            }
-        }
-
-        if (poolHosts.isEmpty()) {
-            s_logger.warn("No host can access storage pool " + pool + " on cluster " + clusterId);
-            _storagePoolDao.expunge(pool.getId());
-            return null;
-        } else {
-            createCapacityEntry(pool);
-        }
-        return pool;
-    }
-
-    @Override
-    public StoragePoolVO updateStoragePool(UpdateStoragePoolCmd cmd) throws IllegalArgumentException {
-        // Input validation
-        Long id = cmd.getId();
-        List<String> tags = cmd.getTags();
-
-        StoragePoolVO pool = _storagePoolDao.findById(id);
-        if (pool == null) {
-            throw new IllegalArgumentException("Unable to find storage pool with ID: " + id);
-        }
+        StoragePoolVO pool = _storagePoolDao.findById(id);
+        if (pool == null) {
+            throw new IllegalArgumentException(
+                    "Unable to find storage pool with ID: " + id);
+        }
 
         if (tags != null) {
             Map<String, String> details = new HashMap<String, String>();
@@ -1423,934 +886,211 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
             _storagePoolDao.updateDetails(id, details);
         }
 
-        return pool;
+        return (PrimaryDataStoreInfo) dataStoreMgr.getDataStore(pool.getId(),
+                DataStoreRole.Primary);
     }
 
     @Override
     @DB
     public boolean deletePool(DeletePoolCmd cmd) {
         Long id = cmd.getId();
-        boolean deleteFlag = false;
         boolean forced = cmd.isForced();
 
-        // verify parameters
         StoragePoolVO sPool = _storagePoolDao.findById(id);
         if (sPool == null) {
             s_logger.warn("Unable to find pool:" + id);
-            throw new InvalidParameterValueException("Unable to find pool by id " + id);
+            throw new InvalidParameterValueException(
+                    "Unable to find pool by id " + id);
         }
-        if(sPool.getStatus() != StoragePoolStatus.Maintenance){
-            s_logger.warn("Unable to delete storage id: " + id +" due to it is not in Maintenance state");
-            throw new InvalidParameterValueException("Unable to delete storage due to it is not in Maintenance state, id: " + id);           
+        if (sPool.getStatus() != StoragePoolStatus.Maintenance) {
+            s_logger.warn("Unable to delete storage id: " + id
+                    + " due to it is not in Maintenance state");
+            throw new InvalidParameterValueException(
+                    "Unable to delete storage due to it is not in Maintenance state, id: "
+                            + id);
         }
-        if (sPool.getPoolType().equals(StoragePoolType.LVM) || sPool.getPoolType().equals(StoragePoolType.EXT)) {
+        if (sPool.isLocal()) {
             s_logger.warn("Unable to delete local storage id:" + id);
-            throw new InvalidParameterValueException("Unable to delete local storage id: " + id);
+            throw new InvalidParameterValueException(
+                    "Unable to delete local storage id: " + id);
         }
 
         Pair<Long, Long> vlms = _volsDao.getCountAndTotalByPool(id);
         if (forced) {
             if (vlms.first() > 0) {
-                Pair<Long, Long> nonDstrdVlms = _volsDao.getNonDestroyedCountAndTotalByPool(id);
+                Pair<Long, Long> nonDstrdVlms = _volsDao
+                        .getNonDestroyedCountAndTotalByPool(id);
                 if (nonDstrdVlms.first() > 0) {
-                    throw new CloudRuntimeException("Cannot delete pool " + sPool.getName() + " as there are associated " +
-                            "non-destroyed vols for this pool");
+                    throw new CloudRuntimeException("Cannot delete pool "
+                            + sPool.getName() + " as there are associated "
+                            + "non-destroyed vols for this pool");
                 }
-                //force expunge non-destroyed volumes
+                // force expunge non-destroyed volumes
                 List<VolumeVO> vols = _volsDao.listVolumesToBeDestroyed();
                 for (VolumeVO vol : vols) {
-                    expungeVolume(vol, true);
+                    AsyncCallFuture<VolumeApiResult> future = this.volService.expungeVolumeAsync(this.volFactory.getVolume(vol.getId()));
+                    try {
+                        future.get();
+                    } catch (InterruptedException e) {
+                        s_logger.debug("expunge volume failed" + vol.getId(), e);
+                    } catch (ExecutionException e) {
+                        s_logger.debug("expunge volume failed" + vol.getId(), e);
+                    }
                 }
             }
         } else {
             // Check if the pool has associated volumes in the volumes table
             // If it does , then you cannot delete the pool
             if (vlms.first() > 0) {
-                throw new CloudRuntimeException("Cannot delete pool " + sPool.getName() + " as there are associated vols" +
-                        " for this pool");
+                throw new CloudRuntimeException("Cannot delete pool "
+                        + sPool.getName() + " as there are associated vols"
+                        + " for this pool");
             }
         }
 
-
         // First get the host_id from storage_pool_host_ref for given pool id
-        StoragePoolVO lock = _storagePoolDao.acquireInLockTable(sPool.getId());
+        StoragePoolVO lock = _storagePoolDao.acquireInLockTable(sPool
+                .getId());
 
         if (lock == null) {
             if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Failed to acquire lock when deleting StoragePool with ID: " + sPool.getId());
+                s_logger.debug("Failed to acquire lock when deleting PrimaryDataStoreVO with ID: "
+                        + sPool.getId());
             }
             return false;
         }
 
-        // mark storage pool as removed (so it can't be used for new volumes creation), release the lock
-        boolean isLockReleased = false;
-        isLockReleased = _storagePoolDao.releaseFromLockTable(lock.getId());
+        _storagePoolDao.releaseFromLockTable(lock.getId());
         s_logger.trace("Released lock for storage pool " + id);
 
-        // for the given pool id, find all records in the storage_pool_host_ref
-        List<StoragePoolHostVO> hostPoolRecords = _storagePoolHostDao.listByPoolId(id);
-        Transaction txn = Transaction.currentTxn();
-        try {
-            // if not records exist, delete the given pool (base case)
-            if (hostPoolRecords.size() == 0) {
-
-                txn.start();
-                sPool.setUuid(null);
-                _storagePoolDao.update(id, sPool);
-                _storagePoolDao.remove(id);
-                deletePoolStats(id);
-                txn.commit();
-
-                deleteFlag = true;
-                return true;
-            } else {
-                // Remove the SR associated with the Xenserver
-                for (StoragePoolHostVO host : hostPoolRecords) {
-                    DeleteStoragePoolCommand deleteCmd = new DeleteStoragePoolCommand(sPool);
-                    final Answer answer = _agentMgr.easySend(host.getHostId(), deleteCmd);
-
-                    if (answer != null && answer.getResult()) {
-                        deleteFlag = true;
-                        break;
-                    }
-                }
-            }
-        } finally {
-            if (deleteFlag) {
-                // now delete the storage_pool_host_ref and storage_pool records
-                txn.start();
-                for (StoragePoolHostVO host : hostPoolRecords) {
-                    _storagePoolHostDao.deleteStoragePoolHostDetails(host.getHostId(), host.getPoolId());
-                }
-                sPool.setUuid(null);
-            

<TRUNCATED>