You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ro...@apache.org on 2018/09/12 09:10:24 UTC

[cloudstack] branch master updated (56f9185 -> c49807f)

This is an automated email from the ASF dual-hosted git repository.

rohit pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git.


    from 56f9185  Remove 'iam' projects (#2817)
     add 2ab3976  CLOUDSTACK-9473: storage pool capacity check when volume is resized or migrated (#2829)
     new c49807f  Merge remote-tracking branch 'origin/4.11'

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/com/cloud/storage/StorageManager.java     |  2 ++
 .../java/com/cloud/storage/StorageManagerImpl.java | 42 +++++++++++++++++-----
 .../com/cloud/storage/VolumeApiServiceImpl.java    | 10 ++++++
 3 files changed, 46 insertions(+), 8 deletions(-)


[cloudstack] 01/01: Merge remote-tracking branch 'origin/4.11'

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git

commit c49807f8f48bb6801f09ee1d26ae79c6eb090688
Merge: 56f9185 2ab3976
Author: Rohit Yadav <ro...@shapeblue.com>
AuthorDate: Wed Sep 12 14:17:29 2018 +0530

    Merge remote-tracking branch 'origin/4.11'
    
    Signed-off-by: Rohit Yadav <ro...@shapeblue.com>

 .../java/com/cloud/storage/StorageManager.java     |  2 ++
 .../java/com/cloud/storage/StorageManagerImpl.java | 42 +++++++++++++++++-----
 .../com/cloud/storage/VolumeApiServiceImpl.java    | 10 ++++++
 3 files changed, 46 insertions(+), 8 deletions(-)

diff --cc engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
index 2fd732b,0000000..a8a566a
mode 100644,000000..100644
--- a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
+++ b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
@@@ -1,213 -1,0 +1,215 @@@
 +// Licensed to the Apache Software Foundation (ASF) under one
 +// or more contributor license agreements.  See the NOTICE file
 +// distributed with this work for additional information
 +// regarding copyright ownership.  The ASF licenses this file
 +// to you under the Apache License, Version 2.0 (the
 +// "License"); you may not use this file except in compliance
 +// with the License.  You may obtain a copy of the License at
 +//
 +//   http://www.apache.org/licenses/LICENSE-2.0
 +//
 +// Unless required by applicable law or agreed to in writing,
 +// software distributed under the License is distributed on an
 +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +// KIND, either express or implied.  See the License for the
 +// specific language governing permissions and limitations
 +// under the License.
 +package com.cloud.storage;
 +
 +import java.math.BigDecimal;
 +import java.util.List;
 +
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 +import org.apache.cloudstack.engine.subsystem.api.storage.HypervisorHostListener;
 +import org.apache.cloudstack.framework.config.ConfigKey;
 +import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 +
 +import com.cloud.agent.api.Answer;
 +import com.cloud.agent.api.Command;
 +import com.cloud.agent.api.StoragePoolInfo;
 +import com.cloud.agent.api.to.DataTO;
 +import com.cloud.agent.api.to.DiskTO;
 +import com.cloud.agent.manager.Commands;
 +import com.cloud.capacity.CapacityVO;
 +import com.cloud.exception.ConnectionException;
 +import com.cloud.exception.StorageConflictException;
 +import com.cloud.exception.StorageUnavailableException;
 +import com.cloud.host.Host;
 +import com.cloud.hypervisor.Hypervisor.HypervisorType;
 +import com.cloud.offering.DiskOffering;
 +import com.cloud.offering.ServiceOffering;
 +import com.cloud.storage.Storage.ImageFormat;
 +import com.cloud.utils.Pair;
 +import com.cloud.vm.DiskProfile;
 +import com.cloud.vm.VMInstanceVO;
 +
 +public interface StorageManager extends StorageService {
 +    ConfigKey<Integer> StorageCleanupInterval = new ConfigKey<>(Integer.class,
 +            "storage.cleanup.interval",
 +            "Advanced",
 +            "86400",
 +            "The interval (in seconds) to wait before running the storage cleanup thread.",
 +            false,
 +            ConfigKey.Scope.Global,
 +            null);
 +    ConfigKey<Integer> StorageCleanupDelay = new ConfigKey<>(Integer.class,
 +            "storage.cleanup.delay",
 +            "Advanced",
 +            "86400",
 +            "Determines how long (in seconds) to wait before actually expunging destroyed volumes. The default value = the default value of storage.cleanup.interval.",
 +            false,
 +            ConfigKey.Scope.Global,
 +            null);
 +    ConfigKey<Boolean> StorageCleanupEnabled = new ConfigKey<>(Boolean.class,
 +            "storage.cleanup.enabled",
 +            "Advanced",
 +            "true",
 +            "Enables/disables the storage cleanup thread.",
 +            false,
 +            ConfigKey.Scope.Global,
 +            null);
 +    ConfigKey<Boolean> TemplateCleanupEnabled = new ConfigKey<>(Boolean.class,
 +            "storage.template.cleanup.enabled",
 +            "Storage",
 +            "true",
 +            "Enable/disable template cleanup activity, only take effect when overall storage cleanup is enabled",
 +            false,
 +            ConfigKey.Scope.Global,
 +            null);
 +    ConfigKey<Integer> KvmStorageOfflineMigrationWait = new ConfigKey<>(Integer.class,
 +            "kvm.storage.offline.migration.wait",
 +            "Storage",
 +            "10800",
 +            "Timeout in seconds for offline (non-live) storage migration to complete on KVM",
 +            true,
 +            ConfigKey.Scope.Global,
 +            null);
 +    ConfigKey<Integer> KvmStorageOnlineMigrationWait = new ConfigKey<>(Integer.class,
 +            "kvm.storage.online.migration.wait",
 +            "Storage",
 +            "10800",
 +            "Timeout in seconds for online (live) storage migration to complete on KVM (migrateVirtualMachineWithVolume)",
 +            true,
 +            ConfigKey.Scope.Global,
 +            null);
 +    ConfigKey<Integer> MaxNumberOfManagedClusteredFileSystems = new ConfigKey<>(Integer.class,
 +            "max.number.managed.clustered.file.systems",
 +            "Storage",
 +            "200",
 +            "XenServer and VMware only: Maximum number of managed SRs or datastores per compute cluster",
 +            true,
 +            ConfigKey.Scope.Cluster,
 +            null);
 +
 +    /**
 +     * Returns a comma separated list of tags for the specified storage pool
 +     * @param poolId
 +     * @return comma separated list of tags
 +     */
 +    public String getStoragePoolTags(long poolId);
 +
 +    Answer sendToPool(long poolId, Command cmd) throws StorageUnavailableException;
 +
 +    Answer sendToPool(StoragePool pool, Command cmd) throws StorageUnavailableException;
 +
 +    Answer[] sendToPool(long poolId, Commands cmd) throws StorageUnavailableException;
 +
 +    Answer[] sendToPool(StoragePool pool, Commands cmds) throws StorageUnavailableException;
 +
 +    Pair<Long, Answer[]> sendToPool(StoragePool pool, long[] hostIdsToTryFirst, List<Long> hostIdsToAvoid, Commands cmds) throws StorageUnavailableException;
 +
 +    Pair<Long, Answer> sendToPool(StoragePool pool, long[] hostIdsToTryFirst, List<Long> hostIdsToAvoid, Command cmd) throws StorageUnavailableException;
 +
 +    /**
 +     * Checks if a host has running VMs that are using its local storage pool.
 +     * @return true if local storage is active on the host
 +     */
 +    boolean isLocalStorageActiveOnHost(Long hostId);
 +
 +    /**
 +     * Cleans up storage pools by removing unused templates.
 +     * @param recurring - true if this cleanup is part of a recurring garbage collection thread
 +     */
 +    void cleanupStorage(boolean recurring);
 +
 +    String getPrimaryStorageNameLabel(VolumeVO volume);
 +
 +    void createCapacityEntry(StoragePoolVO storagePool, short capacityType, long allocated);
 +
 +    Answer sendToPool(StoragePool pool, long[] hostIdsToTryFirst, Command cmd) throws StorageUnavailableException;
 +
 +    CapacityVO getSecondaryStorageUsedStats(Long hostId, Long zoneId);
 +
 +    CapacityVO getStoragePoolUsedStats(Long poolId, Long clusterId, Long podId, Long zoneId);
 +
 +    List<StoragePoolVO> ListByDataCenterHypervisor(long datacenterId, HypervisorType type);
 +
 +    List<VMInstanceVO> listByStoragePool(long storagePoolId);
 +
 +    StoragePoolVO findLocalStorageOnHost(long hostId);
 +
 +    Host updateSecondaryStorage(long secStorageId, String newUrl);
 +
 +    void removeStoragePoolFromCluster(long hostId, String iScsiName, StoragePool storagePool);
 +
 +    List<Long> getUpHostsInPool(long poolId);
 +
 +    void cleanupSecondaryStorage(boolean recurring);
 +
 +    HypervisorType getHypervisorTypeFromFormat(ImageFormat format);
 +
 +    boolean storagePoolHasEnoughIops(List<Volume> volume, StoragePool pool);
 +
 +    boolean storagePoolHasEnoughSpace(List<Volume> volume, StoragePool pool);
 +
 +    /**
 +     * This comment is relevant to managed storage only.
 +     *
 +     *  Long clusterId = only used for managed storage
 +     *
 +     *  Some managed storage can be more efficient handling VM templates (via cloning) if it knows the capabilities of the compute cluster it is dealing with.
 +     *  If the compute cluster supports UUID resigning and the storage system can clone a volume from a volume, then this determines how much more space a
 +     *  new root volume (that makes use of a template) will take up on the storage system.
 +     *
 +     *  For example, if a storage system can clone a volume from a volume and the compute cluster supports UUID resigning (relevant for hypervisors like
 +     *  XenServer and ESXi that put virtual disks in clustered file systems), then the storage system will need to determine if it already has a copy of
 +     *  the template or if it will need to create one first before cloning the template to a new volume to be used for the new root disk (assuming the root
 +     *  disk is being deployed from a template). If the template doesn't already exists on the storage system, then you need to take into consideration space
 +     *  required for that template (stored in one volume) and space required for a new volume created from that template volume (for your new root volume).
 +     *
 +     *  If UUID resigning is not available in the compute cluster or the storage system doesn't support cloning a volume from a volume, then for each new
 +     *  root disk that uses a template, CloudStack will have the template be copied down to a newly created volume on the storage system (i.e. no need
 +     *  to take into consideration the possible need to first create a volume on the storage system for a template that will be used for the root disk
 +     *  via cloning).
 +     *
 +     *  Cloning volumes on the back-end instead of copying down a new template for each new volume helps to alleviate load on the hypervisors.
 +     */
 +    boolean storagePoolHasEnoughSpace(List<Volume> volume, StoragePool pool, Long clusterId);
 +
++    boolean storagePoolHasEnoughSpaceForResize(StoragePool pool, long currentSize, long newSiz);
++
 +    boolean registerHostListener(String providerUuid, HypervisorHostListener listener);
 +
 +    void connectHostToSharedPool(long hostId, long poolId) throws StorageUnavailableException, StorageConflictException;
 +
 +    void createCapacityEntry(long poolId);
 +
 +    DataStore createLocalStorage(Host host, StoragePoolInfo poolInfo) throws ConnectionException;
 +
 +    BigDecimal getStorageOverProvisioningFactor(Long dcId);
 +
 +    Long getDiskBytesReadRate(ServiceOffering offering, DiskOffering diskOffering);
 +
 +    Long getDiskBytesWriteRate(ServiceOffering offering, DiskOffering diskOffering);
 +
 +    Long getDiskIopsReadRate(ServiceOffering offering, DiskOffering diskOffering);
 +
 +    Long getDiskIopsWriteRate(ServiceOffering offering, DiskOffering diskOffering);
 +
 +    void cleanupDownloadUrls();
 +
 +    void setDiskProfileThrottling(DiskProfile dskCh, ServiceOffering offering, DiskOffering diskOffering);
 +
 +    DiskTO getDiskWithThrottling(DataTO volTO, Volume.Type volumeType, long deviceId, String path, long offeringId, long diskOfferingId);
 +
 +}
diff --cc server/src/main/java/com/cloud/storage/StorageManagerImpl.java
index fc40981,0000000..92eb2b0
mode 100644,000000..100644
--- a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
@@@ -1,2498 -1,0 +1,2524 @@@
 +// Licensed to the Apache Software Foundation (ASF) under one
 +// or more contributor license agreements.  See the NOTICE file
 +// distributed with this work for additional information
 +// regarding copyright ownership.  The ASF licenses this file
 +// to you under the Apache License, Version 2.0 (the
 +// "License"); you may not use this file except in compliance
 +// with the License.  You may obtain a copy of the License at
 +//
 +//   http://www.apache.org/licenses/LICENSE-2.0
 +//
 +// Unless required by applicable law or agreed to in writing,
 +// software distributed under the License is distributed on an
 +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +// KIND, either express or implied.  See the License for the
 +// specific language governing permissions and limitations
 +// under the License.
 +package com.cloud.storage;
 +
 +import java.math.BigDecimal;
 +import java.net.URI;
 +import java.net.URISyntaxException;
 +import java.net.UnknownHostException;
 +import java.sql.PreparedStatement;
 +import java.sql.ResultSet;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +import javax.inject.Inject;
 +
 +import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaintenanceCmd;
 +import org.apache.cloudstack.api.command.admin.storage.CreateSecondaryStagingStoreCmd;
 +import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
 +import org.apache.cloudstack.api.command.admin.storage.DeleteImageStoreCmd;
 +import org.apache.cloudstack.api.command.admin.storage.DeletePoolCmd;
 +import org.apache.cloudstack.api.command.admin.storage.DeleteSecondaryStagingStoreCmd;
 +import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
 +import org.apache.cloudstack.context.CallContext;
 +import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreDriver;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreLifeCycle;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProvider;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProviderManager;
 +import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
 +import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
 +import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
 +import org.apache.cloudstack.engine.subsystem.api.storage.HypervisorHostListener;
 +import org.apache.cloudstack.engine.subsystem.api.storage.ImageStoreProvider;
 +import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
 +import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreDriver;
 +import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreLifeCycle;
 +import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
 +import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
 +import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
 +import org.apache.cloudstack.engine.subsystem.api.storage.TemplateInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.TemplateService;
 +import org.apache.cloudstack.engine.subsystem.api.storage.TemplateService.TemplateApiResult;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult;
 +import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
 +import org.apache.cloudstack.framework.async.AsyncCallFuture;
 +import org.apache.cloudstack.framework.config.ConfigKey;
 +import org.apache.cloudstack.framework.config.Configurable;
 +import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 +import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 +import org.apache.cloudstack.storage.command.DettachCommand;
 +import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.ImageStoreDetailsDao;
 +import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
 +import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 +import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 +import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 +import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
 +import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
 +import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
 +import org.apache.cloudstack.storage.to.VolumeObjectTO;
 +import org.apache.log4j.Logger;
 +import org.springframework.stereotype.Component;
 +
 +import com.cloud.agent.AgentManager;
 +import com.cloud.agent.api.Answer;
 +import com.cloud.agent.api.Command;
 +import com.cloud.agent.api.DeleteStoragePoolCommand;
 +import com.cloud.agent.api.StoragePoolInfo;
 +import com.cloud.agent.api.to.DataTO;
 +import com.cloud.agent.api.to.DiskTO;
 +import com.cloud.agent.manager.Commands;
 +import com.cloud.api.ApiDBUtils;
 +import com.cloud.api.query.dao.TemplateJoinDao;
 +import com.cloud.api.query.vo.TemplateJoinVO;
 +import com.cloud.capacity.Capacity;
 +import com.cloud.capacity.CapacityManager;
 +import com.cloud.capacity.CapacityState;
 +import com.cloud.capacity.CapacityVO;
 +import com.cloud.capacity.dao.CapacityDao;
 +import com.cloud.cluster.ClusterManagerListener;
 +import com.cloud.cluster.ManagementServerHost;
 +import com.cloud.configuration.Config;
 +import com.cloud.configuration.ConfigurationManager;
 +import com.cloud.configuration.ConfigurationManagerImpl;
 +import com.cloud.configuration.Resource.ResourceType;
 +import com.cloud.dc.ClusterVO;
 +import com.cloud.dc.DataCenterVO;
 +import com.cloud.dc.dao.ClusterDao;
 +import com.cloud.dc.dao.DataCenterDao;
 +import com.cloud.event.ActionEvent;
 +import com.cloud.event.EventTypes;
 +import com.cloud.exception.AgentUnavailableException;
 +import com.cloud.exception.ConnectionException;
 +import com.cloud.exception.DiscoveryException;
 +import com.cloud.exception.InsufficientCapacityException;
 +import com.cloud.exception.InvalidParameterValueException;
 +import com.cloud.exception.OperationTimedoutException;
 +import com.cloud.exception.PermissionDeniedException;
 +import com.cloud.exception.ResourceInUseException;
 +import com.cloud.exception.ResourceUnavailableException;
 +import com.cloud.exception.StorageConflictException;
 +import com.cloud.exception.StorageUnavailableException;
 +import com.cloud.host.Host;
 +import com.cloud.host.HostVO;
 +import com.cloud.host.Status;
 +import com.cloud.host.dao.HostDao;
 +import com.cloud.hypervisor.Hypervisor;
 +import com.cloud.hypervisor.Hypervisor.HypervisorType;
 +import com.cloud.hypervisor.HypervisorGuruManager;
 +import com.cloud.offering.DiskOffering;
 +import com.cloud.offering.ServiceOffering;
 +import com.cloud.org.Grouping;
 +import com.cloud.org.Grouping.AllocationState;
 +import com.cloud.resource.ResourceState;
 +import com.cloud.server.ConfigurationServer;
 +import com.cloud.server.ManagementServer;
 +import com.cloud.server.StatsCollector;
 +import com.cloud.storage.Storage.ImageFormat;
 +import com.cloud.storage.Storage.StoragePoolType;
 +import com.cloud.storage.Volume.Type;
 +import com.cloud.storage.dao.DiskOfferingDao;
 +import com.cloud.storage.dao.SnapshotDao;
 +import com.cloud.storage.dao.StoragePoolHostDao;
 +import com.cloud.storage.dao.StoragePoolTagsDao;
 +import com.cloud.storage.dao.StoragePoolWorkDao;
 +import com.cloud.storage.dao.VMTemplateDao;
 +import com.cloud.storage.dao.VMTemplatePoolDao;
 +import com.cloud.storage.dao.VMTemplateZoneDao;
 +import com.cloud.storage.dao.VolumeDao;
 +import com.cloud.storage.listener.StoragePoolMonitor;
 +import com.cloud.storage.listener.VolumeStateListener;
 +import com.cloud.template.TemplateManager;
 +import com.cloud.template.VirtualMachineTemplate;
 +import com.cloud.user.Account;
 +import com.cloud.user.AccountManager;
 +import com.cloud.user.ResourceLimitService;
 +import com.cloud.user.dao.UserDao;
 +import com.cloud.utils.DateUtil;
 +import com.cloud.utils.NumbersUtil;
 +import com.cloud.utils.Pair;
 +import com.cloud.utils.StringUtils;
 +import com.cloud.utils.UriUtils;
 +import com.cloud.utils.component.ComponentContext;
 +import com.cloud.utils.component.ManagerBase;
 +import com.cloud.utils.concurrency.NamedThreadFactory;
 +import com.cloud.utils.db.DB;
 +import com.cloud.utils.db.EntityManager;
 +import com.cloud.utils.db.GenericSearchBuilder;
 +import com.cloud.utils.db.GlobalLock;
 +import com.cloud.utils.db.JoinBuilder;
 +import com.cloud.utils.db.JoinBuilder.JoinType;
 +import com.cloud.utils.db.SearchBuilder;
 +import com.cloud.utils.db.SearchCriteria;
 +import com.cloud.utils.db.SearchCriteria.Op;
 +import com.cloud.utils.db.Transaction;
 +import com.cloud.utils.db.TransactionCallbackNoReturn;
 +import com.cloud.utils.db.TransactionLegacy;
 +import com.cloud.utils.db.TransactionStatus;
 +import com.cloud.utils.exception.CloudRuntimeException;
 +import com.cloud.vm.DiskProfile;
 +import com.cloud.vm.VMInstanceVO;
 +import com.cloud.vm.VirtualMachine.State;
 +import com.cloud.vm.dao.VMInstanceDao;
 +
 +@Component
 +public class StorageManagerImpl extends ManagerBase implements StorageManager, ClusterManagerListener, Configurable {
 +    private static final Logger s_logger = Logger.getLogger(StorageManagerImpl.class);
 +
 +    protected String _name;
 +    @Inject
 +    protected AgentManager _agentMgr;
 +    @Inject
 +    protected TemplateManager _tmpltMgr;
 +    @Inject
 +    protected AccountManager _accountMgr;
 +    @Inject
 +    protected ConfigurationManager _configMgr;
 +    @Inject
 +    protected VolumeDao _volsDao;
 +    @Inject
 +    private VolumeDataStoreDao _volumeDataStoreDao;
 +    @Inject
 +    protected HostDao _hostDao;
 +    @Inject
 +    protected SnapshotDao _snapshotDao;
 +    @Inject
 +    protected StoragePoolHostDao _storagePoolHostDao;
 +    @Inject
 +    protected VMTemplatePoolDao _vmTemplatePoolDao = null;
 +    @Inject
 +    protected VMTemplateZoneDao _vmTemplateZoneDao;
 +    @Inject
 +    protected VMTemplateDao _vmTemplateDao = null;
 +    @Inject
 +    protected VMInstanceDao _vmInstanceDao;
 +    @Inject
 +    protected PrimaryDataStoreDao _storagePoolDao = null;
 +    @Inject
 +    protected StoragePoolDetailsDao _storagePoolDetailsDao;
 +    @Inject
 +    protected ImageStoreDao _imageStoreDao = null;
 +    @Inject
 +    protected ImageStoreDetailsDao _imageStoreDetailsDao = null;
 +    @Inject
 +    protected SnapshotDataStoreDao _snapshotStoreDao = null;
 +    @Inject
 +    protected TemplateDataStoreDao _templateStoreDao = null;
 +    @Inject
 +    protected TemplateJoinDao _templateViewDao = null;
 +    @Inject
 +    protected VolumeDataStoreDao _volumeStoreDao = null;
 +    @Inject
 +    protected CapacityDao _capacityDao;
 +    @Inject
 +    protected CapacityManager _capacityMgr;
 +    @Inject
 +    protected DataCenterDao _dcDao = null;
 +    @Inject
 +    protected VMTemplateDao _templateDao;
 +    @Inject
 +    protected UserDao _userDao;
 +    @Inject
 +    protected ClusterDao _clusterDao;
 +    @Inject
 +    protected StoragePoolWorkDao _storagePoolWorkDao;
 +    @Inject
 +    protected HypervisorGuruManager _hvGuruMgr;
 +    @Inject
 +    protected VolumeDao _volumeDao;
 +    @Inject
 +    ConfigurationDao _configDao;
 +    @Inject
 +    ManagementServer _msServer;
 +    @Inject
 +    VolumeService volService;
 +    @Inject
 +    VolumeDataFactory volFactory;
 +    @Inject
 +    TemplateDataFactory tmplFactory;
 +    @Inject
 +    SnapshotDataFactory snapshotFactory;
 +    @Inject
 +    ConfigurationServer _configServer;
 +    @Inject
 +    DataStoreManager _dataStoreMgr;
 +    @Inject
 +    DataStoreProviderManager _dataStoreProviderMgr;
 +    @Inject
 +    private TemplateService _imageSrv;
 +    @Inject
 +    EndPointSelector _epSelector;
 +    @Inject
 +    private DiskOfferingDao _diskOfferingDao;
 +    @Inject
 +    ResourceLimitService _resourceLimitMgr;
 +    @Inject
 +    EntityManager _entityMgr;
 +    @Inject
 +    SnapshotService _snapshotService;
 +    @Inject
 +    StoragePoolTagsDao _storagePoolTagsDao;
 +
 +    protected List<StoragePoolDiscoverer> _discoverers;
 +
 +    public List<StoragePoolDiscoverer> getDiscoverers() {
 +        return _discoverers;
 +    }
 +
 +    public void setDiscoverers(List<StoragePoolDiscoverer> discoverers) {
 +        _discoverers = discoverers;
 +    }
 +
 +    protected SearchBuilder<VMTemplateHostVO> HostTemplateStatesSearch;
 +    protected GenericSearchBuilder<StoragePoolHostVO, Long> UpHostsInPoolSearch;
 +    protected SearchBuilder<VMInstanceVO> StoragePoolSearch;
 +    protected SearchBuilder<StoragePoolVO> LocalStorageSearch;
 +
 +    ScheduledExecutorService _executor = null;
 +    int _storagePoolAcquisitionWaitSeconds = 1800; // 30 minutes
 +    int _downloadUrlCleanupInterval;
 +    int _downloadUrlExpirationInterval;
 +    // protected BigDecimal _overProvisioningFactor = new BigDecimal(1);
 +    private long _serverId;
 +
 +    private final Map<String, HypervisorHostListener> hostListeners = new HashMap<String, HypervisorHostListener>();
 +
 +    public boolean share(VMInstanceVO vm, List<VolumeVO> vols, HostVO host, boolean cancelPreviousShare) throws StorageUnavailableException {
 +
 +        // if pool is in maintenance and it is the ONLY pool available; reject
 +        List<VolumeVO> rootVolForGivenVm = _volsDao.findByInstanceAndType(vm.getId(), Type.ROOT);
 +        if (rootVolForGivenVm != null && rootVolForGivenVm.size() > 0) {
 +            boolean isPoolAvailable = isPoolAvailable(rootVolForGivenVm.get(0).getPoolId());
 +            if (!isPoolAvailable) {
 +                throw new StorageUnavailableException("Can not share " + vm, rootVolForGivenVm.get(0).getPoolId());
 +            }
 +        }
 +
 +        // this check is done for maintenance mode for primary storage
 +        // if any one of the volume is unusable, we return false
 +        // if we return false, the allocator will try to switch to another PS if
 +        // available
 +        for (VolumeVO vol : vols) {
 +            if (vol.getRemoved() != null) {
 +                s_logger.warn("Volume id:" + vol.getId() + " is removed, cannot share on this instance");
 +                // not ok to share
 +                return false;
 +            }
 +        }
 +        // ok to share
 +        return true;
 +    }
 +
 +    private boolean isPoolAvailable(Long poolId) {
 +        // get list of all pools
 +        List<StoragePoolVO> pools = _storagePoolDao.listAll();
 +
 +        // if no pools or 1 pool which is in maintenance
 +        if (pools == null || pools.size() == 0 || (pools.size() == 1 && pools.get(0).getStatus().equals(StoragePoolStatus.Maintenance))) {
 +            return false;
 +        } else {
 +            return true;
 +        }
 +    }
 +
 +    @Override
 +    public List<StoragePoolVO> ListByDataCenterHypervisor(long datacenterId, HypervisorType type) {
 +        List<StoragePoolVO> pools = _storagePoolDao.listByDataCenterId(datacenterId);
 +        List<StoragePoolVO> retPools = new ArrayList<StoragePoolVO>();
 +        for (StoragePoolVO pool : pools) {
 +            if (pool.getStatus() != StoragePoolStatus.Up) {
 +                continue;
 +            }
 +            if (pool.getScope() == ScopeType.ZONE) {
 +                if (pool.getHypervisor() != null && pool.getHypervisor() == type) {
 +                    retPools.add(pool);
 +                }
 +            } else {
 +                ClusterVO cluster = _clusterDao.findById(pool.getClusterId());
 +                if (type == cluster.getHypervisorType()) {
 +                    retPools.add(pool);
 +                }
 +            }
 +        }
 +        Collections.shuffle(retPools);
 +        return retPools;
 +    }
 +
 +    @Override
 +    public boolean isLocalStorageActiveOnHost(Long hostId) {
 +        List<StoragePoolHostVO> storagePoolHostRefs = _storagePoolHostDao.listByHostId(hostId);
 +        for (StoragePoolHostVO storagePoolHostRef : storagePoolHostRefs) {
 +            StoragePoolVO PrimaryDataStoreVO = _storagePoolDao.findById(storagePoolHostRef.getPoolId());
 +            if (PrimaryDataStoreVO.getPoolType() == StoragePoolType.LVM || PrimaryDataStoreVO.getPoolType() == StoragePoolType.EXT) {
 +                SearchBuilder<VolumeVO> volumeSB = _volsDao.createSearchBuilder();
 +                volumeSB.and("poolId", volumeSB.entity().getPoolId(), SearchCriteria.Op.EQ);
 +                volumeSB.and("removed", volumeSB.entity().getRemoved(), SearchCriteria.Op.NULL);
 +                volumeSB.and("state", volumeSB.entity().getState(), SearchCriteria.Op.NIN);
 +
 +                SearchBuilder<VMInstanceVO> activeVmSB = _vmInstanceDao.createSearchBuilder();
 +                activeVmSB.and("state", activeVmSB.entity().getState(), SearchCriteria.Op.IN);
 +                volumeSB.join("activeVmSB", activeVmSB, volumeSB.entity().getInstanceId(), activeVmSB.entity().getId(), JoinBuilder.JoinType.INNER);
 +
 +                SearchCriteria<VolumeVO> volumeSC = volumeSB.create();
 +                volumeSC.setParameters("poolId", PrimaryDataStoreVO.getId());
 +                volumeSC.setParameters("state", Volume.State.Expunging, Volume.State.Destroy);
 +                volumeSC.setJoinParameters("activeVmSB", "state", State.Starting, State.Running, State.Stopping, State.Migrating);
 +
 +                List<VolumeVO> volumes = _volsDao.search(volumeSC, null);
 +                if (volumes.size() > 0) {
 +                    return true;
 +                }
 +            }
 +        }
 +
 +        return false;
 +    }
 +
 +    @Override
 +    public Answer[] sendToPool(StoragePool pool, Commands cmds) throws StorageUnavailableException {
 +        return sendToPool(pool, null, null, cmds).second();
 +    }
 +
 +    @Override
 +    public Answer sendToPool(StoragePool pool, long[] hostIdsToTryFirst, Command cmd) throws StorageUnavailableException {
 +        Answer[] answers = sendToPool(pool, hostIdsToTryFirst, null, new Commands(cmd)).second();
 +        if (answers == null) {
 +            return null;
 +        }
 +        return answers[0];
 +    }
 +
 +    @Override
 +    public Answer sendToPool(StoragePool pool, Command cmd) throws StorageUnavailableException {
 +        Answer[] answers = sendToPool(pool, new Commands(cmd));
 +        if (answers == null) {
 +            return null;
 +        }
 +        return answers[0];
 +    }
 +
 +    public Long chooseHostForStoragePool(StoragePoolVO poolVO, List<Long> avoidHosts, boolean sendToVmResidesOn, Long vmId) {
 +        if (sendToVmResidesOn) {
 +            if (vmId != null) {
 +                VMInstanceVO vmInstance = _vmInstanceDao.findById(vmId);
 +                if (vmInstance != null) {
 +                    Long hostId = vmInstance.getHostId();
 +                    if (hostId != null && !avoidHosts.contains(vmInstance.getHostId())) {
 +                        return hostId;
 +                    }
 +                }
 +            }
 +            /*
 +             * Can't find the vm where host resides on(vm is destroyed? or
 +             * volume is detached from vm), randomly choose a host to send the
 +             * cmd
 +             */
 +        }
 +        List<StoragePoolHostVO> poolHosts = _storagePoolHostDao.listByHostStatus(poolVO.getId(), Status.Up);
 +        Collections.shuffle(poolHosts);
 +        if (poolHosts != null && poolHosts.size() > 0) {
 +            for (StoragePoolHostVO sphvo : poolHosts) {
 +                if (!avoidHosts.contains(sphvo.getHostId())) {
 +                    return sphvo.getHostId();
 +                }
 +            }
 +        }
 +        return null;
 +    }
 +
 +    @Override
 +    public boolean configure(String name, Map<String, Object> params) {
 +        Map<String, String> configs = _configDao.getConfiguration("management-server", params);
 +
 +        _storagePoolAcquisitionWaitSeconds = NumbersUtil.parseInt(configs.get("pool.acquisition.wait.seconds"), 1800);
 +        s_logger.info("pool.acquisition.wait.seconds is configured as " + _storagePoolAcquisitionWaitSeconds + " seconds");
 +
 +        _agentMgr.registerForHostEvents(new StoragePoolMonitor(this, _storagePoolDao, _dataStoreProviderMgr), true, false, true);
 +
 +        s_logger.info("Storage cleanup enabled: " + StorageCleanupEnabled.value() + ", interval: " + StorageCleanupInterval.value() + ", delay: " + StorageCleanupDelay.value()
 +        + ", template cleanup enabled: " + TemplateCleanupEnabled.value());
 +
 +        String cleanupInterval = configs.get("extract.url.cleanup.interval");
 +        _downloadUrlCleanupInterval = NumbersUtil.parseInt(cleanupInterval, 7200);
 +
 +        String urlExpirationInterval = configs.get("extract.url.expiration.interval");
 +        _downloadUrlExpirationInterval = NumbersUtil.parseInt(urlExpirationInterval, 14400);
 +
 +        String workers = configs.get("expunge.workers");
 +        int wrks = NumbersUtil.parseInt(workers, 10);
 +        _executor = Executors.newScheduledThreadPool(wrks, new NamedThreadFactory("StorageManager-Scavenger"));
 +
 +        _agentMgr.registerForHostEvents(ComponentContext.inject(LocalStoragePoolListener.class), true, false, false);
 +
 +        _serverId = _msServer.getId();
 +
 +        UpHostsInPoolSearch = _storagePoolHostDao.createSearchBuilder(Long.class);
 +        UpHostsInPoolSearch.selectFields(UpHostsInPoolSearch.entity().getHostId());
 +        SearchBuilder<HostVO> hostSearch = _hostDao.createSearchBuilder();
 +        hostSearch.and("status", hostSearch.entity().getStatus(), Op.EQ);
 +        hostSearch.and("resourceState", hostSearch.entity().getResourceState(), Op.EQ);
 +        UpHostsInPoolSearch.join("hosts", hostSearch, hostSearch.entity().getId(), UpHostsInPoolSearch.entity().getHostId(), JoinType.INNER);
 +        UpHostsInPoolSearch.and("pool", UpHostsInPoolSearch.entity().getPoolId(), Op.EQ);
 +        UpHostsInPoolSearch.done();
 +
 +        StoragePoolSearch = _vmInstanceDao.createSearchBuilder();
 +
 +        SearchBuilder<VolumeVO> volumeSearch = _volumeDao.createSearchBuilder();
 +        volumeSearch.and("volumeType", volumeSearch.entity().getVolumeType(), SearchCriteria.Op.EQ);
 +        volumeSearch.and("poolId", volumeSearch.entity().getPoolId(), SearchCriteria.Op.EQ);
 +        volumeSearch.and("state", volumeSearch.entity().getState(), SearchCriteria.Op.EQ);
 +        StoragePoolSearch.join("vmVolume", volumeSearch, volumeSearch.entity().getInstanceId(), StoragePoolSearch.entity().getId(), JoinBuilder.JoinType.INNER);
 +        StoragePoolSearch.done();
 +
 +        LocalStorageSearch = _storagePoolDao.createSearchBuilder();
 +        SearchBuilder<StoragePoolHostVO> storageHostSearch = _storagePoolHostDao.createSearchBuilder();
 +        storageHostSearch.and("hostId", storageHostSearch.entity().getHostId(), SearchCriteria.Op.EQ);
 +        LocalStorageSearch.join("poolHost", storageHostSearch, storageHostSearch.entity().getPoolId(), LocalStorageSearch.entity().getId(), JoinBuilder.JoinType.INNER);
 +        LocalStorageSearch.and("type", LocalStorageSearch.entity().getPoolType(), SearchCriteria.Op.IN);
 +        LocalStorageSearch.done();
 +
 +        Volume.State.getStateMachine().registerListener(new VolumeStateListener(_configDao, _vmInstanceDao));
 +
 +        return true;
 +    }
 +
 +    @Override
 +    public String getStoragePoolTags(long poolId) {
 +        return com.cloud.utils.StringUtils.listToCsvTags(_storagePoolDao.searchForStoragePoolTags(poolId));
 +    }
 +
 +    @Override
 +    public boolean start() {
 +        if (StorageCleanupEnabled.value()) {
 +            Random generator = new Random();
 +            int initialDelay = generator.nextInt(StorageCleanupInterval.value());
 +            _executor.scheduleWithFixedDelay(new StorageGarbageCollector(), initialDelay, StorageCleanupInterval.value(), TimeUnit.SECONDS);
 +        } else {
 +            s_logger.debug("Storage cleanup is not enabled, so the storage cleanup thread is not being scheduled.");
 +        }
 +
 +        _executor.scheduleWithFixedDelay(new DownloadURLGarbageCollector(), _downloadUrlCleanupInterval, _downloadUrlCleanupInterval, TimeUnit.SECONDS);
 +
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean stop() {
 +        if (StorageCleanupEnabled.value()) {
 +            _executor.shutdown();
 +        }
 +        return true;
 +    }
 +
 +    @DB
 +    @Override
 +    public DataStore createLocalStorage(Host host, StoragePoolInfo pInfo) throws ConnectionException {
 +        DataCenterVO dc = _dcDao.findById(host.getDataCenterId());
 +        if (dc == null) {
 +            return null;
 +        }
 +        boolean useLocalStorageForSystemVM = false;
 +        Boolean isLocal = ConfigurationManagerImpl.SystemVMUseLocalStorage.valueIn(dc.getId());
 +        if (isLocal != null) {
 +            useLocalStorageForSystemVM = isLocal.booleanValue();
 +        }
 +        if (!(dc.isLocalStorageEnabled() || useLocalStorageForSystemVM)) {
 +            return null;
 +        }
 +        DataStore store;
 +        try {
 +            String hostAddress = pInfo.getHost();
 +            if (host.getHypervisorType() == Hypervisor.HypervisorType.VMware) {
 +                hostAddress = "VMFS datastore: " + pInfo.getHostPath();
 +            }
 +            StoragePoolVO pool = _storagePoolDao.findPoolByHostPath(host.getDataCenterId(), host.getPodId(), hostAddress, pInfo.getHostPath(), pInfo.getUuid());
 +            if (pool == null && host.getHypervisorType() == HypervisorType.VMware) {
 +                // perform run-time upgrade. In versions prior to 2.2.12, there
 +                // is a bug that we don't save local datastore info (host path
 +                // is empty), this will cause us
 +                // not able to distinguish multiple local datastores that may be
 +                // available on the host, to support smooth migration, we
 +                // need to perform runtime upgrade here
 +                if (pInfo.getHostPath().length() > 0) {
 +                    pool = _storagePoolDao.findPoolByHostPath(host.getDataCenterId(), host.getPodId(), hostAddress, "", pInfo.getUuid());
 +                }
 +            }
 +            if (pool == null) {
 +                //the path can be different, but if they have the same uuid, assume they are the same storage
 +                pool = _storagePoolDao.findPoolByHostPath(host.getDataCenterId(), host.getPodId(), hostAddress, null, pInfo.getUuid());
 +                if (pool != null) {
 +                    s_logger.debug("Found a storage pool: " + pInfo.getUuid() + ", but with different hostpath " + pInfo.getHostPath() + ", still treat it as the same pool");
 +                }
 +            }
 +
 +            DataStoreProvider provider = _dataStoreProviderMgr.getDefaultPrimaryDataStoreProvider();
 +            DataStoreLifeCycle lifeCycle = provider.getDataStoreLifeCycle();
 +            if (pool == null) {
 +                Map<String, Object> params = new HashMap<String, Object>();
 +                String name = createLocalStoragePoolName(host, pInfo);
 +                params.put("zoneId", host.getDataCenterId());
 +                params.put("clusterId", host.getClusterId());
 +                params.put("podId", host.getPodId());
 +                params.put("url", pInfo.getPoolType().toString() + "://" + pInfo.getHost() + "/" + pInfo.getHostPath());
 +                params.put("name", name);
 +                params.put("localStorage", true);
 +                params.put("details", pInfo.getDetails());
 +                params.put("uuid", pInfo.getUuid());
 +                params.put("providerName", provider.getName());
 +
 +                store = lifeCycle.initialize(params);
 +            } else {
 +                store = _dataStoreMgr.getDataStore(pool.getId(), DataStoreRole.Primary);
 +            }
 +
 +            pool = _storagePoolDao.findById(store.getId());
 +            if (pool.getStatus() != StoragePoolStatus.Maintenance && pool.getStatus() != StoragePoolStatus.Removed) {
 +                HostScope scope = new HostScope(host.getId(), host.getClusterId(), host.getDataCenterId());
 +                lifeCycle.attachHost(store, scope, pInfo);
 +            }
 +
 +        } catch (Exception e) {
 +            s_logger.warn("Unable to setup the local storage pool for " + host, e);
 +            throw new ConnectionException(true, "Unable to setup the local storage pool for " + host, e);
 +        }
 +
 +        return _dataStoreMgr.getDataStore(store.getId(), DataStoreRole.Primary);
 +    }
 +
 +    /**
 +     * Creates the local storage pool name.
 +     * The name will follow the pattern: <hostname>-local-<firstBlockOfUuid>
 +     */
 +    protected String createLocalStoragePoolName(Host host, StoragePoolInfo storagePoolInformation) {
 +        return String.format("%s-%s-%s", org.apache.commons.lang3.StringUtils.trim(host.getName()), "local", storagePoolInformation.getUuid().split("-")[0]);
 +    }
 +
 +    @Override
 +    public PrimaryDataStoreInfo createPool(CreateStoragePoolCmd cmd) throws ResourceInUseException, IllegalArgumentException, UnknownHostException, ResourceUnavailableException {
 +        String providerName = cmd.getStorageProviderName();
 +        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(providerName);
 +
 +        if (storeProvider == null) {
 +            storeProvider = _dataStoreProviderMgr.getDefaultPrimaryDataStoreProvider();
 +            if (storeProvider == null) {
 +                throw new InvalidParameterValueException("can't find storage provider: " + providerName);
 +            }
 +        }
 +
 +        Long clusterId = cmd.getClusterId();
 +        Long podId = cmd.getPodId();
 +        Long zoneId = cmd.getZoneId();
 +
 +        ScopeType scopeType = ScopeType.CLUSTER;
 +        String scope = cmd.getScope();
 +        if (scope != null) {
 +            try {
 +                scopeType = Enum.valueOf(ScopeType.class, scope.toUpperCase());
 +            } catch (Exception e) {
 +                throw new InvalidParameterValueException("invalid scope for pool " + scope);
 +            }
 +        }
 +
 +        if (scopeType == ScopeType.CLUSTER && clusterId == null) {
 +            throw new InvalidParameterValueException("cluster id can't be null, if scope is cluster");
 +        } else if (scopeType == ScopeType.ZONE && zoneId == null) {
 +            throw new InvalidParameterValueException("zone id can't be null, if scope is zone");
 +        }
 +
 +        HypervisorType hypervisorType = HypervisorType.KVM;
 +        if (scopeType == ScopeType.ZONE) {
 +            // ignore passed clusterId and podId
 +            clusterId = null;
 +            podId = null;
 +            String hypervisor = cmd.getHypervisor();
 +            if (hypervisor != null) {
 +                try {
 +                    hypervisorType = HypervisorType.getType(hypervisor);
 +                } catch (Exception e) {
 +                    throw new InvalidParameterValueException("invalid hypervisor type " + hypervisor);
 +                }
 +            } else {
 +                throw new InvalidParameterValueException("Missing parameter hypervisor. Hypervisor type is required to create zone wide primary storage.");
 +            }
 +            if (hypervisorType != HypervisorType.KVM && hypervisorType != HypervisorType.VMware && hypervisorType != HypervisorType.Hyperv && hypervisorType != HypervisorType.LXC
 +                    && hypervisorType != HypervisorType.Any) {
 +                throw new InvalidParameterValueException("zone wide storage pool is not supported for hypervisor type " + hypervisor);
 +            }
 +        }
 +
 +        Map<String, String> details = extractApiParamAsMap(cmd.getDetails());
 +        DataCenterVO zone = _dcDao.findById(cmd.getZoneId());
 +        if (zone == null) {
 +            throw new InvalidParameterValueException("unable to find zone by id " + zoneId);
 +        }
 +        // Check if zone is disabled
 +        Account account = CallContext.current().getCallingAccount();
 +        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(account.getId())) {
 +            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zoneId);
 +        }
 +
 +        Map<String, Object> params = new HashMap<String, Object>();
 +        params.put("zoneId", zone.getId());
 +        params.put("clusterId", clusterId);
 +        params.put("podId", podId);
 +        params.put("url", cmd.getUrl());
 +        params.put("tags", cmd.getTags());
 +        params.put("name", cmd.getStoragePoolName());
 +        params.put("details", details);
 +        params.put("providerName", storeProvider.getName());
 +        params.put("managed", cmd.isManaged());
 +        params.put("capacityBytes", cmd.getCapacityBytes());
 +        params.put("capacityIops", cmd.getCapacityIops());
 +
 +        DataStoreLifeCycle lifeCycle = storeProvider.getDataStoreLifeCycle();
 +        DataStore store = null;
 +        try {
 +            store = lifeCycle.initialize(params);
 +            if (scopeType == ScopeType.CLUSTER) {
 +                ClusterScope clusterScope = new ClusterScope(clusterId, podId, zoneId);
 +                lifeCycle.attachCluster(store, clusterScope);
 +            } else if (scopeType == ScopeType.ZONE) {
 +                ZoneScope zoneScope = new ZoneScope(zoneId);
 +                lifeCycle.attachZone(store, zoneScope, hypervisorType);
 +            }
 +        } catch (Exception e) {
 +            s_logger.debug("Failed to add data store: " + e.getMessage(), e);
 +            try {
 +                // clean up the db, just absorb the exception thrown in deletion with error logged, so that user can get error for adding data store
 +                // not deleting data store.
 +                if (store != null) {
 +                    lifeCycle.deleteDataStore(store);
 +                }
 +            } catch (Exception ex) {
 +                s_logger.debug("Failed to clean up storage pool: " + ex.getMessage());
 +            }
 +            throw new CloudRuntimeException("Failed to add data store: " + e.getMessage(), e);
 +        }
 +
 +        return (PrimaryDataStoreInfo)_dataStoreMgr.getDataStore(store.getId(), DataStoreRole.Primary);
 +    }
 +
 +    private Map<String, String> extractApiParamAsMap(Map ds) {
 +        Map<String, String> details = new HashMap<String, String>();
 +        if (ds != null) {
 +            Collection detailsCollection = ds.values();
 +            Iterator it = detailsCollection.iterator();
 +            while (it.hasNext()) {
 +                HashMap d = (HashMap)it.next();
 +                Iterator it2 = d.entrySet().iterator();
 +                while (it2.hasNext()) {
 +                    Map.Entry entry = (Map.Entry)it2.next();
 +                    details.put((String)entry.getKey(), (String)entry.getValue());
 +                }
 +            }
 +        }
 +        return details;
 +    }
 +
 +    @ActionEvent(eventType = EventTypes.EVENT_DISABLE_PRIMARY_STORAGE, eventDescription = "disable storage pool")
 +    private void disablePrimaryStoragePool(StoragePoolVO primaryStorage) {
 +        if (!primaryStorage.getStatus().equals(StoragePoolStatus.Up)) {
 +            throw new InvalidParameterValueException("Primary storage with id " + primaryStorage.getId() + " cannot be disabled. Storage pool state : " + primaryStorage.getStatus().toString());
 +        }
 +
 +        DataStoreProvider provider = _dataStoreProviderMgr.getDataStoreProvider(primaryStorage.getStorageProviderName());
 +        DataStoreLifeCycle dataStoreLifeCycle = provider.getDataStoreLifeCycle();
 +        DataStore store = _dataStoreMgr.getDataStore(primaryStorage.getId(), DataStoreRole.Primary);
 +        ((PrimaryDataStoreLifeCycle)dataStoreLifeCycle).disableStoragePool(store);
 +    }
 +
 +    @ActionEvent(eventType = EventTypes.EVENT_ENABLE_PRIMARY_STORAGE, eventDescription = "enable storage pool")
 +    private void enablePrimaryStoragePool(StoragePoolVO primaryStorage) {
 +        if (!primaryStorage.getStatus().equals(StoragePoolStatus.Disabled)) {
 +            throw new InvalidParameterValueException("Primary storage with id " + primaryStorage.getId() + " cannot be enabled. Storage pool state : " + primaryStorage.getStatus().toString());
 +        }
 +
 +        DataStoreProvider provider = _dataStoreProviderMgr.getDataStoreProvider(primaryStorage.getStorageProviderName());
 +        DataStoreLifeCycle dataStoreLifeCycle = provider.getDataStoreLifeCycle();
 +        DataStore store = _dataStoreMgr.getDataStore(primaryStorage.getId(), DataStoreRole.Primary);
 +        ((PrimaryDataStoreLifeCycle)dataStoreLifeCycle).enableStoragePool(store);
 +    }
 +
 +    @Override
 +    public PrimaryDataStoreInfo updateStoragePool(UpdateStoragePoolCmd cmd) throws IllegalArgumentException {
 +        // Input validation
 +        Long id = cmd.getId();
 +
 +        StoragePoolVO pool = _storagePoolDao.findById(id);
 +        if (pool == null) {
 +            throw new IllegalArgumentException("Unable to find storage pool with ID: " + id);
 +        }
 +
 +        final List<String> storagePoolTags = cmd.getTags();
 +        if (storagePoolTags != null) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Updating Storage Pool Tags to :" + storagePoolTags);
 +            }
 +            _storagePoolTagsDao.persist(pool.getId(), storagePoolTags);
 +        }
 +
 +        Long updatedCapacityBytes = null;
 +        Long capacityBytes = cmd.getCapacityBytes();
 +
 +        if (capacityBytes != null) {
 +            if (capacityBytes != pool.getCapacityBytes()) {
 +                updatedCapacityBytes = capacityBytes;
 +            }
 +        }
 +
 +        Long updatedCapacityIops = null;
 +        Long capacityIops = cmd.getCapacityIops();
 +
 +        if (capacityIops != null) {
 +            if (!capacityIops.equals(pool.getCapacityIops())) {
 +                updatedCapacityIops = capacityIops;
 +            }
 +        }
 +
 +        if (updatedCapacityBytes != null || updatedCapacityIops != null) {
 +            StoragePoolVO storagePool = _storagePoolDao.findById(id);
 +            DataStoreProvider dataStoreProvider = _dataStoreProviderMgr.getDataStoreProvider(storagePool.getStorageProviderName());
 +            DataStoreLifeCycle dataStoreLifeCycle = dataStoreProvider.getDataStoreLifeCycle();
 +
 +            if (dataStoreLifeCycle instanceof PrimaryDataStoreLifeCycle) {
 +                Map<String, String> details = new HashMap<String, String>();
 +
 +                details.put(PrimaryDataStoreLifeCycle.CAPACITY_BYTES, updatedCapacityBytes != null ? String.valueOf(updatedCapacityBytes) : null);
 +                details.put(PrimaryDataStoreLifeCycle.CAPACITY_IOPS, updatedCapacityIops != null ? String.valueOf(updatedCapacityIops) : null);
 +
 +                ((PrimaryDataStoreLifeCycle)dataStoreLifeCycle).updateStoragePool(storagePool, details);
 +            }
 +        }
 +
 +        Boolean enabled = cmd.getEnabled();
 +        if (enabled != null) {
 +            if (enabled) {
 +                enablePrimaryStoragePool(pool);
 +            } else {
 +                disablePrimaryStoragePool(pool);
 +            }
 +        }
 +
 +        if (updatedCapacityBytes != null) {
 +            _storagePoolDao.updateCapacityBytes(id, capacityBytes);
 +        }
 +
 +        if (updatedCapacityIops != null) {
 +            _storagePoolDao.updateCapacityIops(id, capacityIops);
 +        }
 +
 +        return (PrimaryDataStoreInfo)_dataStoreMgr.getDataStore(pool.getId(), DataStoreRole.Primary);
 +    }
 +
 +    @Override
 +    public void removeStoragePoolFromCluster(long hostId, String iScsiName, StoragePool storagePool) {
 +        final Map<String, String> details = new HashMap<>();
 +
 +        details.put(DeleteStoragePoolCommand.DATASTORE_NAME, iScsiName);
 +        details.put(DeleteStoragePoolCommand.IQN, iScsiName);
 +        details.put(DeleteStoragePoolCommand.STORAGE_HOST, storagePool.getHostAddress());
 +        details.put(DeleteStoragePoolCommand.STORAGE_PORT, String.valueOf(storagePool.getPort()));
 +
 +        final DeleteStoragePoolCommand cmd = new DeleteStoragePoolCommand();
 +
 +        cmd.setDetails(details);
 +        cmd.setRemoveDatastore(true);
 +
 +        final Answer answer = _agentMgr.easySend(hostId, cmd);
 +
 +        if (answer == null || !answer.getResult()) {
 +            String errMsg = "Error interacting with host (related to DeleteStoragePoolCommand)" + (StringUtils.isNotBlank(answer.getDetails()) ? ": " + answer.getDetails() : "");
 +
 +            s_logger.error(errMsg);
 +
 +            throw new CloudRuntimeException(errMsg);
 +        }
 +    }
 +
 +    @Override
 +    @DB
 +    public boolean deletePool(DeletePoolCmd cmd) {
 +        Long id = cmd.getId();
 +        boolean forced = cmd.isForced();
 +
 +        StoragePoolVO sPool = _storagePoolDao.findById(id);
 +        if (sPool == null) {
 +            s_logger.warn("Unable to find pool:" + id);
 +            throw new InvalidParameterValueException("Unable to find pool by id " + id);
 +        }
 +        if (sPool.getStatus() != StoragePoolStatus.Maintenance) {
 +            s_logger.warn("Unable to delete storage id: " + id + " due to it is not in Maintenance state");
 +            throw new InvalidParameterValueException("Unable to delete storage due to it is not in Maintenance state, id: " + id);
 +        }
 +        Pair<Long, Long> vlms = _volsDao.getCountAndTotalByPool(id);
 +        if (forced) {
 +            if (vlms.first() > 0) {
 +                Pair<Long, Long> nonDstrdVlms = _volsDao.getNonDestroyedCountAndTotalByPool(id);
 +                if (nonDstrdVlms.first() > 0) {
 +                    throw new CloudRuntimeException("Cannot delete pool " + sPool.getName() + " as there are associated " + "non-destroyed vols for this pool");
 +                }
 +                // force expunge non-destroyed volumes
 +                List<VolumeVO> vols = _volsDao.listVolumesToBeDestroyed();
 +                for (VolumeVO vol : vols) {
 +                    AsyncCallFuture<VolumeApiResult> future = volService.expungeVolumeAsync(volFactory.getVolume(vol.getId()));
 +                    try {
 +                        future.get();
 +                    } catch (InterruptedException e) {
 +                        s_logger.debug("expunge volume failed:" + vol.getId(), e);
 +                    } catch (ExecutionException e) {
 +                        s_logger.debug("expunge volume failed:" + vol.getId(), e);
 +                    }
 +                }
 +            }
 +        } else {
 +            // Check if the pool has associated volumes in the volumes table
 +            // If it does , then you cannot delete the pool
 +            if (vlms.first() > 0) {
 +                throw new CloudRuntimeException("Cannot delete pool " + sPool.getName() + " as there are associated volumes for this pool");
 +            }
 +        }
 +
 +        // First get the host_id from storage_pool_host_ref for given pool id
 +        StoragePoolVO lock = _storagePoolDao.acquireInLockTable(sPool.getId());
 +
 +        if (lock == null) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Failed to acquire lock when deleting PrimaryDataStoreVO with ID: " + sPool.getId());
 +            }
 +            return false;
 +        }
 +
 +        _storagePoolDao.releaseFromLockTable(lock.getId());
 +        s_logger.trace("Released lock for storage pool " + id);
 +
 +        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(sPool.getStorageProviderName());
 +        DataStoreLifeCycle lifeCycle = storeProvider.getDataStoreLifeCycle();
 +        DataStore store = _dataStoreMgr.getDataStore(sPool.getId(), DataStoreRole.Primary);
 +        return lifeCycle.deleteDataStore(store);
 +    }
 +
 +    @Override
 +    public void connectHostToSharedPool(long hostId, long poolId) throws StorageUnavailableException, StorageConflictException {
 +        StoragePool pool = (StoragePool)_dataStoreMgr.getDataStore(poolId, DataStoreRole.Primary);
 +        assert (pool.isShared()) : "Now, did you actually read the name of this method?";
 +        s_logger.debug("Adding pool " + pool.getName() + " to  host " + hostId);
 +
 +        DataStoreProvider provider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
 +        HypervisorHostListener listener = hostListeners.get(provider.getName());
 +        listener.hostConnect(hostId, pool.getId());
 +    }
 +
 +    @Override
 +    public BigDecimal getStorageOverProvisioningFactor(Long poolId) {
 +        return new BigDecimal(CapacityManager.StorageOverprovisioningFactor.valueIn(poolId));
 +    }
 +
 +    @Override
 +    public void createCapacityEntry(StoragePoolVO storagePool, short capacityType, long allocated) {
 +        SearchCriteria<CapacityVO> capacitySC = _capacityDao.createSearchCriteria();
 +        capacitySC.addAnd("hostOrPoolId", SearchCriteria.Op.EQ, storagePool.getId());
 +        capacitySC.addAnd("dataCenterId", SearchCriteria.Op.EQ, storagePool.getDataCenterId());
 +        capacitySC.addAnd("capacityType", SearchCriteria.Op.EQ, capacityType);
 +
 +        List<CapacityVO> capacities = _capacityDao.search(capacitySC, null);
 +
 +        long totalOverProvCapacity;
 +        if (storagePool.getPoolType().supportsOverProvisioning()) {
 +            // All this is for the inaccuracy of floats for big number multiplication.
 +            BigDecimal overProvFactor = getStorageOverProvisioningFactor(storagePool.getId());
 +            totalOverProvCapacity = overProvFactor.multiply(new BigDecimal(storagePool.getCapacityBytes())).longValue();
 +            s_logger.debug("Found storage pool " + storagePool.getName() + " of type " + storagePool.getPoolType().toString() + " with overprovisioning factor " + overProvFactor.toString());
 +            s_logger.debug("Total over provisioned capacity calculated is " + overProvFactor + " * " + storagePool.getCapacityBytes());
 +        } else {
 +            s_logger.debug("Found storage pool " + storagePool.getName() + " of type " + storagePool.getPoolType().toString());
 +            totalOverProvCapacity = storagePool.getCapacityBytes();
 +        }
 +
 +        s_logger.debug("Total over provisioned capacity of the pool " + storagePool.getName() + " id: " + storagePool.getId() + " is " + totalOverProvCapacity);
 +        CapacityState capacityState = CapacityState.Enabled;
 +        if (storagePool.getScope() == ScopeType.ZONE) {
 +            DataCenterVO dc = ApiDBUtils.findZoneById(storagePool.getDataCenterId());
 +            AllocationState allocationState = dc.getAllocationState();
 +            capacityState = (allocationState == AllocationState.Disabled) ? CapacityState.Disabled : CapacityState.Enabled;
 +        } else {
 +            if (storagePool.getClusterId() != null) {
 +                ClusterVO cluster = ApiDBUtils.findClusterById(storagePool.getClusterId());
 +                if (cluster != null) {
 +                    AllocationState allocationState = _configMgr.findClusterAllocationState(cluster);
 +                    capacityState = (allocationState == AllocationState.Disabled) ? CapacityState.Disabled : CapacityState.Enabled;
 +                }
 +            }
 +        }
 +
 +        if (storagePool.getScope() == ScopeType.HOST) {
 +            List<StoragePoolHostVO> stoargePoolHostVO = _storagePoolHostDao.listByPoolId(storagePool.getId());
 +
 +            if (stoargePoolHostVO != null && !stoargePoolHostVO.isEmpty()) {
 +                HostVO host = _hostDao.findById(stoargePoolHostVO.get(0).getHostId());
 +
 +                if (host != null) {
 +                    capacityState = (host.getResourceState() == ResourceState.Disabled) ? CapacityState.Disabled : CapacityState.Enabled;
 +                }
 +            }
 +        }
 +
 +        if (capacities.size() == 0) {
 +            CapacityVO capacity = new CapacityVO(storagePool.getId(), storagePool.getDataCenterId(), storagePool.getPodId(), storagePool.getClusterId(), allocated, totalOverProvCapacity,
 +                    capacityType);
 +            capacity.setCapacityState(capacityState);
 +            _capacityDao.persist(capacity);
 +        } else {
 +            CapacityVO capacity = capacities.get(0);
 +            if (capacity.getTotalCapacity() != totalOverProvCapacity || allocated != capacity.getUsedCapacity() || capacity.getCapacityState() != capacityState) {
 +                capacity.setTotalCapacity(totalOverProvCapacity);
 +                capacity.setUsedCapacity(allocated);
 +                capacity.setCapacityState(capacityState);
 +                _capacityDao.update(capacity.getId(), capacity);
 +            }
 +        }
 +        s_logger.debug("Successfully set Capacity - " + totalOverProvCapacity + " for capacity type - " + capacityType + " , DataCenterId - " + storagePool.getDataCenterId() + ", HostOrPoolId - "
 +                + storagePool.getId() + ", PodId " + storagePool.getPodId());
 +    }
 +
 +    @Override
 +    public List<Long> getUpHostsInPool(long poolId) {
 +        SearchCriteria<Long> sc = UpHostsInPoolSearch.create();
 +        sc.setParameters("pool", poolId);
 +        sc.setJoinParameters("hosts", "status", Status.Up);
 +        sc.setJoinParameters("hosts", "resourceState", ResourceState.Enabled);
 +        return _storagePoolHostDao.customSearch(sc, null);
 +    }
 +
 +    @Override
 +    public Pair<Long, Answer[]> sendToPool(StoragePool pool, long[] hostIdsToTryFirst, List<Long> hostIdsToAvoid, Commands cmds) throws StorageUnavailableException {
 +        List<Long> hostIds = getUpHostsInPool(pool.getId());
 +        Collections.shuffle(hostIds);
 +        if (hostIdsToTryFirst != null) {
 +            for (int i = hostIdsToTryFirst.length - 1; i >= 0; i--) {
 +                if (hostIds.remove(hostIdsToTryFirst[i])) {
 +                    hostIds.add(0, hostIdsToTryFirst[i]);
 +                }
 +            }
 +        }
 +
 +        if (hostIdsToAvoid != null) {
 +            hostIds.removeAll(hostIdsToAvoid);
 +        }
 +        if (hostIds == null || hostIds.isEmpty()) {
 +            throw new StorageUnavailableException("Unable to send command to the pool " + pool.getId() + " due to there is no enabled hosts up in this cluster", pool.getId());
 +        }
 +        for (Long hostId : hostIds) {
 +            try {
 +                List<Answer> answers = new ArrayList<Answer>();
 +                Command[] cmdArray = cmds.toCommands();
 +                for (Command cmd : cmdArray) {
 +                    long targetHostId = _hvGuruMgr.getGuruProcessedCommandTargetHost(hostId, cmd);
 +                    answers.add(_agentMgr.send(targetHostId, cmd));
 +                }
 +                return new Pair<Long, Answer[]>(hostId, answers.toArray(new Answer[answers.size()]));
 +            } catch (AgentUnavailableException e) {
 +                s_logger.debug("Unable to send storage pool command to " + pool + " via " + hostId, e);
 +            } catch (OperationTimedoutException e) {
 +                s_logger.debug("Unable to send storage pool command to " + pool + " via " + hostId, e);
 +            }
 +        }
 +
 +        throw new StorageUnavailableException("Unable to send command to the pool ", pool.getId());
 +    }
 +
 +    @Override
 +    public Pair<Long, Answer> sendToPool(StoragePool pool, long[] hostIdsToTryFirst, List<Long> hostIdsToAvoid, Command cmd) throws StorageUnavailableException {
 +        Commands cmds = new Commands(cmd);
 +        Pair<Long, Answer[]> result = sendToPool(pool, hostIdsToTryFirst, hostIdsToAvoid, cmds);
 +        return new Pair<Long, Answer>(result.first(), result.second()[0]);
 +    }
 +
 +    @Override
 +    public void cleanupStorage(boolean recurring) {
 +        GlobalLock scanLock = GlobalLock.getInternLock("storagemgr.cleanup");
 +
 +        try {
 +            if (scanLock.lock(3)) {
 +                try {
 +                    // Cleanup primary storage pools
 +                    if (TemplateCleanupEnabled.value()) {
 +                        List<StoragePoolVO> storagePools = _storagePoolDao.listAll();
 +                        for (StoragePoolVO pool : storagePools) {
 +                            try {
 +
 +                                List<VMTemplateStoragePoolVO> unusedTemplatesInPool = _tmpltMgr.getUnusedTemplatesInPool(pool);
 +                                s_logger.debug("Storage pool garbage collector found " + unusedTemplatesInPool.size() + " templates to clean up in storage pool: " + pool.getName());
 +                                for (VMTemplateStoragePoolVO templatePoolVO : unusedTemplatesInPool) {
 +                                    if (templatePoolVO.getDownloadState() != VMTemplateStorageResourceAssoc.Status.DOWNLOADED) {
 +                                        s_logger.debug("Storage pool garbage collector is skipping template with ID: " + templatePoolVO.getTemplateId() + " on pool " + templatePoolVO.getPoolId()
 +                                        + " because it is not completely downloaded.");
 +                                        continue;
 +                                    }
 +
 +                                    if (!templatePoolVO.getMarkedForGC()) {
 +                                        templatePoolVO.setMarkedForGC(true);
 +                                        _vmTemplatePoolDao.update(templatePoolVO.getId(), templatePoolVO);
 +                                        s_logger.debug("Storage pool garbage collector has marked template with ID: " + templatePoolVO.getTemplateId() + " on pool " + templatePoolVO.getPoolId()
 +                                        + " for garbage collection.");
 +                                        continue;
 +                                    }
 +
 +                                    _tmpltMgr.evictTemplateFromStoragePool(templatePoolVO);
 +                                }
 +                            } catch (Exception e) {
 +                                s_logger.warn("Problem cleaning up primary storage pool " + pool, e);
 +                            }
 +                        }
 +                    }
 +
 +                    //destroy snapshots in destroying state in snapshot_store_ref
 +                    List<SnapshotDataStoreVO> ssSnapshots = _snapshotStoreDao.listByState(ObjectInDataStoreStateMachine.State.Destroying);
 +                    for (SnapshotDataStoreVO ssSnapshotVO : ssSnapshots) {
 +                        try {
 +                            _snapshotService.deleteSnapshot(snapshotFactory.getSnapshot(ssSnapshotVO.getSnapshotId(), DataStoreRole.Image));
 +                        } catch (Exception e) {
 +                            s_logger.debug("Failed to delete snapshot: " + ssSnapshotVO.getId() + " from storage");
 +                        }
 +                    }
 +                    cleanupSecondaryStorage(recurring);
 +
 +                    List<VolumeVO> vols = _volsDao.listVolumesToBeDestroyed(new Date(System.currentTimeMillis() - ((long)StorageCleanupDelay.value() << 10)));
 +                    for (VolumeVO vol : vols) {
 +                        try {
 +                            // If this fails, just log a warning. It's ideal if we clean up the host-side clustered file
 +                            // system, but not necessary.
 +                            handleManagedStorage(vol);
 +                        } catch (Exception e) {
 +                            s_logger.warn("Unable to destroy host-side clustered file system " + vol.getUuid(), e);
 +                        }
 +
 +                        try {
 +                            VolumeInfo volumeInfo = volFactory.getVolume(vol.getId());
 +                            if (volumeInfo != null) {
 +                                volService.expungeVolumeAsync(volumeInfo);
 +                            } else {
 +                                s_logger.debug("Volume " + vol.getUuid() + " is already destroyed");
 +                            }
 +                        } catch (Exception e) {
 +                            s_logger.warn("Unable to destroy volume " + vol.getUuid(), e);
 +                        }
 +                    }
 +
 +                    // remove snapshots in Error state
 +                    List<SnapshotVO> snapshots = _snapshotDao.listAllByStatus(Snapshot.State.Error);
 +                    for (SnapshotVO snapshotVO : snapshots) {
 +                        try {
 +                            List<SnapshotDataStoreVO> storeRefs = _snapshotStoreDao.findBySnapshotId(snapshotVO.getId());
 +                            for (SnapshotDataStoreVO ref : storeRefs) {
 +                                _snapshotStoreDao.expunge(ref.getId());
 +                            }
 +                            _snapshotDao.expunge(snapshotVO.getId());
 +                        } catch (Exception e) {
 +                            s_logger.warn("Unable to destroy snapshot " + snapshotVO.getUuid(), e);
 +                        }
 +                    }
 +
 +                    // destroy uploaded volumes in abandoned/error state
 +                    List<VolumeDataStoreVO> volumeDataStores = _volumeDataStoreDao.listByVolumeState(Volume.State.UploadError, Volume.State.UploadAbandoned);
 +                    for (VolumeDataStoreVO volumeDataStore : volumeDataStores) {
 +                        VolumeVO volume = _volumeDao.findById(volumeDataStore.getVolumeId());
 +                        if (volume == null) {
 +                            s_logger.warn("Uploaded volume with id " + volumeDataStore.getVolumeId() + " not found, so cannot be destroyed");
 +                            continue;
 +                        }
 +                        try {
 +                            DataStore dataStore = _dataStoreMgr.getDataStore(volumeDataStore.getDataStoreId(), DataStoreRole.Image);
 +                            EndPoint ep = _epSelector.select(dataStore, volumeDataStore.getExtractUrl());
 +                            if (ep == null) {
 +                                s_logger.warn("There is no secondary storage VM for image store " + dataStore.getName() + ", cannot destroy uploaded volume " + volume.getUuid());
 +                                continue;
 +                            }
 +                            Host host = _hostDao.findById(ep.getId());
 +                            if (host != null && host.getManagementServerId() != null) {
 +                                if (_serverId == host.getManagementServerId().longValue()) {
 +                                    volService.destroyVolume(volume.getId());
 +                                    // decrement volume resource count
 +                                    _resourceLimitMgr.decrementResourceCount(volume.getAccountId(), ResourceType.volume, volume.isDisplayVolume());
 +                                    // expunge volume from secondary if volume is on image store
 +                                    VolumeInfo volOnSecondary = volFactory.getVolume(volume.getId(), DataStoreRole.Image);
 +                                    if (volOnSecondary != null) {
 +                                        s_logger.info("Expunging volume " + volume.getUuid() + " uploaded using HTTP POST from secondary data store");
 +                                        AsyncCallFuture<VolumeApiResult> future = volService.expungeVolumeAsync(volOnSecondary);
 +                                        VolumeApiResult result = future.get();
 +                                        if (!result.isSuccess()) {
 +                                            s_logger.warn("Failed to expunge volume " + volume.getUuid() + " from the image store " + dataStore.getName() + " due to: " + result.getResult());
 +                                        }
 +                                    }
 +                                }
 +                            }
 +                        } catch (Throwable th) {
 +                            s_logger.warn("Unable to destroy uploaded volume " + volume.getUuid() + ". Error details: " + th.getMessage());
 +                        }
 +                    }
 +
 +                    // destroy uploaded templates in abandoned/error state
 +                    List<TemplateDataStoreVO> templateDataStores = _templateStoreDao.listByTemplateState(VirtualMachineTemplate.State.UploadError, VirtualMachineTemplate.State.UploadAbandoned);
 +                    for (TemplateDataStoreVO templateDataStore : templateDataStores) {
 +                        VMTemplateVO template = _templateDao.findById(templateDataStore.getTemplateId());
 +                        if (template == null) {
 +                            s_logger.warn("Uploaded template with id " + templateDataStore.getTemplateId() + " not found, so cannot be destroyed");
 +                            continue;
 +                        }
 +                        try {
 +                            DataStore dataStore = _dataStoreMgr.getDataStore(templateDataStore.getDataStoreId(), DataStoreRole.Image);
 +                            EndPoint ep = _epSelector.select(dataStore, templateDataStore.getExtractUrl());
 +                            if (ep == null) {
 +                                s_logger.warn("There is no secondary storage VM for image store " + dataStore.getName() + ", cannot destroy uploaded template " + template.getUuid());
 +                                continue;
 +                            }
 +                            Host host = _hostDao.findById(ep.getId());
 +                            if (host != null && host.getManagementServerId() != null) {
 +                                if (_serverId == host.getManagementServerId().longValue()) {
 +                                    AsyncCallFuture<TemplateApiResult> future = _imageSrv.deleteTemplateAsync(tmplFactory.getTemplate(template.getId(), dataStore));
 +                                    TemplateApiResult result = future.get();
 +                                    if (!result.isSuccess()) {
 +                                        s_logger.warn("Failed to delete template " + template.getUuid() + " from the image store " + dataStore.getName() + " due to: " + result.getResult());
 +                                        continue;
 +                                    }
 +                                    // remove from template_zone_ref
 +                                    List<VMTemplateZoneVO> templateZones = _vmTemplateZoneDao.listByZoneTemplate(((ImageStoreEntity)dataStore).getDataCenterId(), template.getId());
 +                                    if (templateZones != null) {
 +                                        for (VMTemplateZoneVO templateZone : templateZones) {
 +                                            _vmTemplateZoneDao.remove(templateZone.getId());
 +                                        }
 +                                    }
 +                                    // mark all the occurrences of this template in the given store as destroyed
 +                                    _templateStoreDao.removeByTemplateStore(template.getId(), dataStore.getId());
 +                                    // find all eligible image stores for this template
 +                                    List<DataStore> imageStores = _tmpltMgr.getImageStoreByTemplate(template.getId(), null);
 +                                    if (imageStores == null || imageStores.size() == 0) {
 +                                        template.setState(VirtualMachineTemplate.State.Inactive);
 +                                        _templateDao.update(template.getId(), template);
 +
 +                                        // decrement template resource count
 +                                        _resourceLimitMgr.decrementResourceCount(template.getAccountId(), ResourceType.template);
 +                                    }
 +                                }
 +                            }
 +                        } catch (Throwable th) {
 +                            s_logger.warn("Unable to destroy uploaded template " + template.getUuid() + ". Error details: " + th.getMessage());
 +                        }
 +                    }
 +                } finally {
 +                    scanLock.unlock();
 +                }
 +            }
 +        } finally {
 +            scanLock.releaseRef();
 +        }
 +    }
 +
 +    /**
 +     * This method only applies for managed storage.
 +     *
 +     * For XenServer and vSphere, see if we need to remove an SR or a datastore, then remove the underlying volume
 +     * from any applicable access control list (before other code attempts to delete the volume that supports it).
 +     *
 +     * For KVM, just tell the underlying storage plug-in to remove the volume from any applicable access control list
 +     * (before other code attempts to delete the volume that supports it).
 +     */
 +    private void handleManagedStorage(Volume volume) {
 +        Long instanceId = volume.getInstanceId();
 +
 +        if (instanceId != null) {
 +            StoragePoolVO storagePool = _storagePoolDao.findById(volume.getPoolId());
 +
 +            if (storagePool != null && storagePool.isManaged()) {
 +                VMInstanceVO vmInstanceVO = _vmInstanceDao.findById(instanceId);
 +
 +                Long lastHostId = vmInstanceVO.getLastHostId();
 +
 +                if (lastHostId != null) {
 +                    HostVO host = _hostDao.findById(lastHostId);
 +                    ClusterVO cluster = _clusterDao.findById(host.getClusterId());
 +                    VolumeInfo volumeInfo = volFactory.getVolume(volume.getId());
 +
 +                    if (cluster.getHypervisorType() == HypervisorType.KVM) {
 +                        volService.revokeAccess(volumeInfo, host, volumeInfo.getDataStore());
 +                    } else {
 +                        DataTO volTO = volFactory.getVolume(volume.getId()).getTO();
 +                        DiskTO disk = new DiskTO(volTO, volume.getDeviceId(), volume.getPath(), volume.getVolumeType());
 +
 +                        DettachCommand cmd = new DettachCommand(disk, null);
 +
 +                        cmd.setManaged(true);
 +
 +                        cmd.setStorageHost(storagePool.getHostAddress());
 +                        cmd.setStoragePort(storagePool.getPort());
 +
 +                        cmd.set_iScsiName(volume.get_iScsiName());
 +
 +                        Answer answer = _agentMgr.easySend(lastHostId, cmd);
 +
 +                        if (answer != null && answer.getResult()) {
 +                            volService.revokeAccess(volumeInfo, host, volumeInfo.getDataStore());
 +                        } else {
 +                            s_logger.warn("Unable to remove host-side clustered file system for the following volume: " + volume.getUuid());
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    @DB
 +    List<Long> findAllVolumeIdInSnapshotTable(Long storeId) {
 +        String sql = "SELECT volume_id from snapshots, snapshot_store_ref WHERE snapshots.id = snapshot_store_ref.snapshot_id and store_id=? GROUP BY volume_id";
 +        List<Long> list = new ArrayList<Long>();
 +        try {
 +            TransactionLegacy txn = TransactionLegacy.currentTxn();
 +            ResultSet rs = null;
 +            PreparedStatement pstmt = null;
 +            pstmt = txn.prepareAutoCloseStatement(sql);
 +            pstmt.setLong(1, storeId);
 +            rs = pstmt.executeQuery();
 +            while (rs.next()) {
 +                list.add(rs.getLong(1));
 +            }
 +            return list;
 +        } catch (Exception e) {
 +            s_logger.debug("failed to get all volumes who has snapshots in secondary storage " + storeId + " due to " + e.getMessage());
 +            return null;
 +        }
 +
 +    }
 +
 +    List<String> findAllSnapshotForVolume(Long volumeId) {
 +        String sql = "SELECT backup_snap_id FROM snapshots WHERE volume_id=? and backup_snap_id is not NULL";
 +        try {
 +            TransactionLegacy txn = TransactionLegacy.currentTxn();
 +            ResultSet rs = null;
 +            PreparedStatement pstmt = null;
 +            pstmt = txn.prepareAutoCloseStatement(sql);
 +            pstmt.setLong(1, volumeId);
 +            rs = pstmt.executeQuery();
 +            List<String> list = new ArrayList<String>();
 +            while (rs.next()) {
 +                list.add(rs.getString(1));
 +            }
 +            return list;
 +        } catch (Exception e) {
 +            s_logger.debug("failed to get all snapshots for a volume " + volumeId + " due to " + e.getMessage());
 +            return null;
 +        }
 +    }
 +
 +    @Override
 +    @DB
 +    public void cleanupSecondaryStorage(boolean recurring) {
 +        // NOTE that object_store refactor will immediately delete the object from secondary storage when deleteTemplate etc api is issued.
 +        // so here we don't need to issue DeleteCommand to resource anymore, only need to remove db entry.
 +        try {
 +            // Cleanup templates in template_store_ref
 +            List<DataStore> imageStores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(null));
 +            for (DataStore store : imageStores) {
 +                try {
 +                    long storeId = store.getId();
 +                    List<TemplateDataStoreVO> destroyedTemplateStoreVOs = _templateStoreDao.listDestroyed(storeId);
 +                    s_logger.debug("Secondary storage garbage collector found " + destroyedTemplateStoreVOs.size() + " templates to cleanup on template_store_ref for store: " + store.getName());
 +                    for (TemplateDataStoreVO destroyedTemplateStoreVO : destroyedTemplateStoreVOs) {
 +                        if (s_logger.isDebugEnabled()) {
 +                            s_logger.debug("Deleting template store DB entry: " + destroyedTemplateStoreVO);
 +                        }
 +                        _templateStoreDao.remove(destroyedTemplateStoreVO.getId());
 +                    }
 +                } catch (Exception e) {
 +                    s_logger.warn("problem cleaning up templates in template_store_ref for store: " + store.getName(), e);
 +                }
 +            }
 +
 +            // CleanUp snapshots on snapshot_store_ref
 +            for (DataStore store : imageStores) {
 +                try {
 +                    List<SnapshotDataStoreVO> destroyedSnapshotStoreVOs = _snapshotStoreDao.listDestroyed(store.getId());
 +                    s_logger.debug("Secondary storage garbage collector found " + destroyedSnapshotStoreVOs.size() + " snapshots to cleanup on snapshot_store_ref for store: " + store.getName());
 +                    for (SnapshotDataStoreVO destroyedSnapshotStoreVO : destroyedSnapshotStoreVOs) {
 +                        // check if this snapshot has child
 +                        SnapshotInfo snap = snapshotFactory.getSnapshot(destroyedSnapshotStoreVO.getSnapshotId(), store);
 +                        if (snap.getChild() != null) {
 +                            s_logger.debug("Skip snapshot on store: " + destroyedSnapshotStoreVO + " , because it has child");
 +                            continue;
 +                        }
 +
 +                        if (s_logger.isDebugEnabled()) {
 +                            s_logger.debug("Deleting snapshot store DB entry: " + destroyedSnapshotStoreVO);
 +                        }
 +
 +                        _snapshotDao.remove(destroyedSnapshotStoreVO.getSnapshotId());
 +                        SnapshotDataStoreVO snapshotOnPrimary = _snapshotStoreDao.findBySnapshot(destroyedSnapshotStoreVO.getSnapshotId(), DataStoreRole.Primary);
 +                        if (snapshotOnPrimary != null) {
 +                            _snapshotStoreDao.remove(snapshotOnPrimary.getId());
 +                        }
 +                        _snapshotStoreDao.remove(destroyedSnapshotStoreVO.getId());
 +                    }
 +
 +                } catch (Exception e2) {
 +                    s_logger.warn("problem cleaning up snapshots in snapshot_store_ref for store: " + store.getName(), e2);
 +                }
 +
 +            }
 +
 +            // CleanUp volumes on volume_store_ref
 +            for (DataStore store : imageStores) {
 +                try {
 +                    List<VolumeDataStoreVO> destroyedStoreVOs = _volumeStoreDao.listDestroyed(store.getId());
 +                    s_logger.debug("Secondary storage garbage collector found " + destroyedStoreVOs.size() + " volumes to cleanup on volume_store_ref for store: " + store.getName());
 +                    for (VolumeDataStoreVO destroyedStoreVO : destroyedStoreVOs) {
 +                        if (s_logger.isDebugEnabled()) {
 +                            s_logger.debug("Deleting volume store DB entry: " + destroyedStoreVO);
 +                        }
 +                        _volumeStoreDao.remove(destroyedStoreVO.getId());
 +                    }
 +
 +                } catch (Exception e2) {
 +                    s_logger.warn("problem cleaning up volumes in volume_store_ref for store: " + store.getName(), e2);
 +                }
 +            }
 +        } catch (Exception e3) {
 +            s_logger.warn("problem cleaning up secondary storage DB entries. ", e3);
 +        }
 +    }
 +
 +    @Override
 +    public String getPrimaryStorageNameLabel(VolumeVO volume) {
 +        Long poolId = volume.getPoolId();
 +
 +        // poolId is null only if volume is destroyed, which has been checked
 +        // before.
 +        assert poolId != null;
 +        StoragePoolVO PrimaryDataStoreVO = _storagePoolDao.findById(poolId);
 +        assert PrimaryDataStoreVO != null;
 +        return PrimaryDataStoreVO.getUuid();
 +    }
 +
 +    @Override
 +    @DB
 +    public PrimaryDataStoreInfo preparePrimaryStorageForMaintenance(Long primaryStorageId) throws ResourceUnavailableException, InsufficientCapacityException {
 +        StoragePoolVO primaryStorage = null;
 +        primaryStorage = _storagePoolDao.findById(primaryStorageId);
 +
 +        if (primaryStorage == null) {
 +            String msg = "Unable to obtain lock on the storage pool record in preparePrimaryStorageForMaintenance()";
 +            s_logger.error(msg);
 +            throw new InvalidParameterValueException(msg);
 +        }
 +
 +        if (!primaryStorage.getStatus().equals(StoragePoolStatus.Up) && !primaryStorage.getStatus().equals(StoragePoolStatus.ErrorInMaintenance)) {
 +            throw new InvalidParameterValueException("Primary storage with id " + primaryStorageId + " is not ready for migration, as the status is:" + primaryStorage.getStatus().toString());
 +        }
 +
 +        DataStoreProvider provider = _dataStoreProviderMgr.getDataStoreProvider(primaryStorage.getStorageProviderName());
 +        DataStoreLifeCycle lifeCycle = provider.getDataStoreLifeCycle();
 +        DataStore store = _dataStoreMgr.getDataStore(primaryStorage.getId(), DataStoreRole.Primary);
 +        lifeCycle.maintain(store);
 +
 +        return (PrimaryDataStoreInfo)_dataStoreMgr.getDataStore(primaryStorage.getId(), DataStoreRole.Primary);
 +    }
 +
 +    @Override
 +    @DB
 +    public PrimaryDataStoreInfo cancelPrimaryStorageForMaintenance(CancelPrimaryStorageMaintenanceCmd cmd) throws ResourceUnavailableException {
 +        Long primaryStorageId = cmd.getId();
 +        StoragePoolVO primaryStorage = null;
 +
 +        primaryStorage = _storagePoolDao.findById(primaryStorageId);
 +
 +        if (primaryStorage == null) {
 +            String msg = "Unable to obtain lock on the storage pool in cancelPrimaryStorageForMaintenance()";
 +            s_logger.error(msg);
 +            throw new InvalidParameterValueException(msg);
 +        }
 +
 +        if (primaryStorage.getStatus().equals(StoragePoolStatus.Up) || primaryStorage.getStatus().equals(StoragePoolStatus.PrepareForMaintenance)) {
 +            throw new StorageUnavailableException("Primary storage with id " + primaryStorageId + " is not ready to complete migration, as the status is:" + primaryStorage.getStatus().toString(),
 +                    primaryStorageId);
 +        }
 +
 +        DataStoreProvider provider = _dataStoreProviderMgr.getDataStoreProvider(primaryStorage.getStorageProviderName());
 +        DataStoreLifeCycle lifeCycle = provider.getDataStoreLifeCycle();
 +        DataStore store = _dataStoreMgr.getDataStore(primaryStorage.getId(), DataStoreRole.Primary);
 +        lifeCycle.cancelMaintain(store);
 +
 +        return (PrimaryDataStoreInfo)_dataStoreMgr.getDataStore(primaryStorage.getId(), DataStoreRole.Primary);
 +    }
 +
 +    protected class StorageGarbageCollector extends ManagedContextRunnable {
 +
 +        public StorageGarbageCollector() {
 +        }
 +
 +        @Override
 +        protected void runInContext() {
 +            try {
 +                s_logger.trace("Storage Garbage Collection Thread is running.");
 +
 +                cleanupStorage(true);
 +
 +            } catch (Exception e) {
 +                s_logger.error("Caught the following Exception", e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
 +    }
 +
 +    @Override
 +    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
 +        for (ManagementServerHost vo : nodeList) {
 +            if (vo.getMsid() == _serverId) {
 +                s_logger.info("Cleaning up storage maintenance jobs associated with Management server: " + vo.getMsid());
 +                List<Long> poolIds = _storagePoolWorkDao.searchForPoolIdsForPendingWorkJobs(vo.getMsid());
 +                if (poolIds.size() > 0) {
 +                    for (Long poolId : poolIds) {
 +                        StoragePoolVO pool = _storagePoolDao.findById(poolId);
 +                        // check if pool is in an inconsistent state
 +                        if (pool != null && (pool.getStatus().equals(StoragePoolStatus.ErrorInMaintenance) || pool.getStatus().equals(StoragePoolStatus.PrepareForMaintenance)
 +                                || pool.getStatus().equals(StoragePoolStatus.CancelMaintenance))) {
 +                            _storagePoolWorkDao.removePendingJobsOnMsRestart(vo.getMsid(), poolId);
 +                            pool.setStatus(StoragePoolStatus.ErrorInMaintenance);
 +                            _storagePoolDao.update(poolId, pool);
 +                        }
 +
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void onManagementNodeIsolated() {
 +    }
 +
 +    @Override
 +    public CapacityVO getSecondaryStorageUsedStats(Long hostId, Long zoneId) {
 +        SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
 +        if (zoneId != null) {
 +            sc.addAnd("dataCenterId", SearchCriteria.Op.EQ, zoneId);
 +        }
 +
 +        List<Long> hosts = new ArrayList<Long>();
 +        if (hostId != null) {
 +            hosts.add(hostId);
 +        } else {
 +            List<DataStore> stores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(zoneId));
 +            if (stores != null) {
 +                for (DataStore store : stores) {
 +                    hosts.add(store.getId());
 +                }
 +            }
 +        }
 +
 +        CapacityVO capacity = new CapacityVO(hostId, zoneId, null, null, 0, 0, Capacity.CAPACITY_TYPE_SECONDARY_STORAGE);
 +        for (Long id : hosts) {
 +            StorageStats stats = ApiDBUtils.getSecondaryStorageStatistics(id);
 +            if (stats == null) {
 +                continue;
 +            }
 +            capacity.setUsedCapacity(stats.getByteUsed() + capacity.getUsedCapacity());
 +            capacity.setTotalCapacity(stats.getCapacityBytes() + capacity.getTotalCapacity());
 +        }
 +
 +        return capacity;
 +    }
 +
 +    @Override
 +    public CapacityVO getStoragePoolUsedStats(Long poolId, Long clusterId, Long podId, Long zoneId) {
 +        SearchCriteria<StoragePoolVO> sc = _storagePoolDao.createSearchCriteria();
 +        List<StoragePoolVO> pools = new ArrayList<StoragePoolVO>();
 +
 +        if (zoneId != null) {
 +            sc.addAnd("dataCenterId", SearchCriteria.Op.EQ, zoneId);
 +        }
 +
 +        if (podId != null) {
 +            sc.addAnd("podId", SearchCriteria.Op.EQ, podId);
 +        }
 +
 +        if (clusterId != null) {
 +            sc.addAnd("clusterId", SearchCriteria.Op.EQ, clusterId);
 +        }
 +
 +        if (poolId != null) {
 +            sc.addAnd("hostOrPoolId", SearchCriteria.Op.EQ, poolId);
 +        }
 +        if (poolId != null) {
 +            pools.add(_storagePoolDao.findById(poolId));
 +        } else {
 +            pools = _storagePoolDao.search(sc, null);
 +        }
 +
 +        CapacityVO capacity = new CapacityVO(poolId, zoneId, podId, clusterId, 0, 0, Capacity.CAPACITY_TYPE_STORAGE);
 +        for (StoragePoolVO PrimaryDataStoreVO : pools) {
 +            StorageStats stats = ApiDBUtils.getStoragePoolStatistics(PrimaryDataStoreVO.getId());
 +            if (stats == null) {
 +                continue;
 +            }
 +            capacity.setUsedCapacity(stats.getByteUsed() + capacity.getUsedCapacity());
 +            capacity.setTotalCapacity(stats.getCapacityBytes() + capacity.getTotalCapacity());
 +        }
 +        return capacity;
 +    }
 +
 +    @Override
 +    public PrimaryDataStoreInfo getStoragePool(long id) {
 +        return (PrimaryDataStoreInfo)_dataStoreMgr.getDataStore(id, DataStoreRole.Primary);
 +    }
 +
 +    @Override
 +    @DB
 +    public List<VMInstanceVO> listByStoragePool(long storagePoolId) {
 +        SearchCriteria<VMInstanceVO> sc = StoragePoolSearch.create();
 +        sc.setJoinParameters("vmVolume", "volumeType", Volume.Type.ROOT);
 +        sc.setJoinParameters("vmVolume", "poolId", storagePoolId);
 +        sc.setJoinParameters("vmVolume", "state", Volume.State.Ready);
 +        return _vmInstanceDao.search(sc, null);
 +    }
 +
 +    @Override
 +    @DB
 +    public StoragePoolVO findLocalStorageOnHost(long hostId) {
 +        SearchCriteria<StoragePoolVO> sc = LocalStorageSearch.create();
 +        sc.setParameters("type", new Object[] {StoragePoolType.Filesystem, StoragePoolType.LVM});
 +        sc.setJoinParameters("poolHost", "hostId", hostId);
 +        List<StoragePoolVO> storagePools = _storagePoolDao.search(sc, null);
 +        if (!storagePools.isEmpty()) {
 +            return storagePools.get(0);
 +        } else {
 +            return null;
 +        }
 +    }
 +
 +    @Override
 +    public Host updateSecondaryStorage(long secStorageId, String newUrl) {
 +        HostVO secHost = _hostDao.findById(secStorageId);
 +        if (secHost == null) {
 +            throw new InvalidParameterValueException("Can not find out the secondary storage id: " + secStorageId);
 +        }
 +
 +        if (secHost.getType() != Host.Type.SecondaryStorage) {
 +            throw new InvalidParameterValueException("host: " + secStorageId + " is not a secondary storage");
 +        }
 +
 +        URI uri = null;
 +        try {
 +            uri = new URI(UriUtils.encodeURIComponent(newUrl));
 +            if (uri.getScheme() == null) {
 +                throw new InvalidParameterValueException("uri.scheme is null " + newUrl + ", add nfs:// (or cifs://) as a prefix");
 +            } else if (uri.getScheme().equalsIgnoreCase("nfs")) {
 +                if (uri.getHost() == null || uri.getHost().equalsIgnoreCase("") || uri.getPath() == null || uri.getPath().equalsIgnoreCase("")) {
 +                    throw new InvalidParameterValueException("Your host and/or path is wrong.  Make sure it's of the format nfs://hostname/path");
 +                }
 +            } else if (uri.getScheme().equalsIgnoreCase("cifs")) {
 +                // Don't validate against a URI encoded URI.
 +                URI cifsUri = new URI(newUrl);
 +                String warnMsg = UriUtils.getCifsUriParametersProblems(cifsUri);
 +                if (warnMsg != null) {
 +                    throw new InvalidParameterValueException(warnMsg);
 +                }
 +            }
 +        } catch (URISyntaxException e) {
 +            throw new InvalidParameterValueException(newUrl + " is not a valid uri");
 +        }
 +
 +        String oldUrl = secHost.getStorageUrl();
 +
 +        URI oldUri = null;
 +        try {
 +            oldUri = new URI(UriUtils.encodeURIComponent(oldUrl));
 +            if (!oldUri.getScheme().equalsIgnoreCase(uri.getScheme())) {
 +                throw new InvalidParameterValueException("can not change old scheme:" + oldUri.getScheme() + " to " + uri.getScheme());
 +            }
 +        } catch (URISyntaxException e) {
 +            s_logger.debug("Failed to get uri from " + oldUrl);
 +        }
 +
 +        secHost.setStorageUrl(newUrl);
 +        secHost.setGuid(newUrl);
 +        secHost.setName(newUrl);
 +        _hostDao.update(secHost.getId(), secHost);
 +        return secHost;
 +    }
 +
 +    @Override
 +    public HypervisorType getHypervisorTypeFromFormat(ImageFormat format) {
 +
 +        if (format == null) {
 +            return HypervisorType.None;
 +        }
 +
 +        if (format == ImageFormat.VHD) {
 +            return HypervisorType.XenServer;
 +        } else if (format == ImageFormat.OVA) {
 +            return HypervisorType.VMware;
 +        } else if (format == ImageFormat.QCOW2) {
 +            return HypervisorType.KVM;
 +        } else if (format == ImageFormat.RAW) {
 +            return HypervisorType.Ovm;
 +        } else if (format == ImageFormat.VHDX) {
 +            return HypervisorType.Hyperv;
 +        } else {
 +            return HypervisorType.None;
 +        }
 +    }
 +
 +    private boolean checkUsagedSpace(StoragePool pool) {
 +        // Managed storage does not currently deal with accounting for physically used space (only provisioned space). Just return true if "pool" is managed.
 +        if (pool.isManaged()) {
 +            return true;
 +        }
 +
 +        StatsCollector sc = StatsCollector.getInstance();
 +        double storageUsedThreshold = CapacityManager.StorageCapacityDisableThreshold.valueIn(pool.getDataCenterId());
 +        if (sc != null) {
 +            long totalSize = pool.getCapacityBytes();
 +            StorageStats stats = sc.getStoragePoolStats(pool.getId());
 +            if (stats == null) {
 +                stats = sc.getStorageStats(pool.getId());
 +            }
 +            if (stats != null) {
 +                double usedPercentage = ((double)stats.getByteUsed() / (double)totalSize);
 +                if (s_logger.isDebugEnabled()) {
 +                    s_logger.debug("Checking pool " + pool.getId() + " for storage, totalSize: " + pool.getCapacityBytes() + ", usedBytes: " + stats.getByteUsed() + ", usedPct: " + usedPercentage
 +                            + ", disable threshold: " + storageUsedThreshold);
 +                }
 +                if (usedPercentage >= storageUsedThreshold) {
 +                    if (s_logger.isDebugEnabled()) {
 +                        s_logger.debug("Insufficient space on pool: " + pool.getId() + " since its usage percentage: " + usedPercentage + " has crossed the pool.storage.capacity.disablethreshold: "
 +                                + storageUsedThreshold);
 +                    }
 +                    return false;
 +                }
 +            }
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean storagePoolHasEnoughIops(List<Volume> requestedVolumes, StoragePool pool) {
 +        if (requestedVolumes == null || requestedVolumes.isEmpty() || pool == null) {
 +            return false;
 +        }
 +
 +        // Only IOPS-guaranteed primary storage like SolidFire is using/setting IOPS.
 +        // This check returns true for storage that does not specify IOPS.
 +        if (pool.getCapacityIops() == null) {
 +            s_logger.info("Storage pool " + pool.getName() + " (" + pool.getId() + ") does not supply IOPS capacity, assuming enough capacity");
 +
 +            return true;
 +        }
 +
 +        StoragePoolVO storagePoolVo = _storagePoolDao.findById(pool.getId());
 +        long currentIops = _capacityMgr.getUsedIops(storagePoolVo);
 +
 +        long requestedIops = 0;
 +
 +        for (Volume requestedVolume : requestedVolumes) {
 +            Long minIops = requestedVolume.getMinIops();
 +
 +            if (minIops != null && minIops > 0) {
 +                requestedIops += minIops;
 +            }
 +        }
 +
 +        long futureIops = currentIops + requestedIops;
 +
 +        return futureIops <= pool.getCapacityIops();
 +    }
 +
 +    @Override
 +    public boolean storagePoolHasEnoughSpace(List<Volume> volumes, StoragePool pool) {
 +        return storagePoolHasEnoughSpace(volumes, pool, null);
 +    }
 +
 +    @Override
 +    public boolean storagePoolHasEnoughSpace(List<Volume> volumes, StoragePool pool, Long clusterId) {
 +        if (volumes == null || volumes.isEmpty()) {
 +            return false;
 +        }
 +
 +        if (!checkUsagedSpace(pool)) {
 +            return false;
 +        }
 +
 +        // allocated space includes templates
 +        if (s_logger.isDebugEnabled()) {
 +            s_logger.debug("Destination pool id: " + pool.getId());
 +        }
- 
-         StoragePoolVO poolVO = _storagePoolDao.findById(pool.getId());
++        // allocated space includes templates
++        final StoragePoolVO poolVO = _storagePoolDao.findById(pool.getId());
 +        long allocatedSizeWithTemplate = _capacityMgr.getAllocatedPoolCapacity(poolVO, null);
 +        long totalAskingSize = 0;
 +
 +        for (Volume volume : volumes) {
 +            // refreshing the volume from the DB to get latest hv_ss_reserve (hypervisor snapshot reserve) field
 +            // I could have just assigned this to "volume", but decided to make a new variable for it so that it
 +            // might be clearer that this "volume" in "volumes" still might have an old value for hv_ss_reverse.
 +            VolumeVO volumeVO = _volumeDao.findById(volume.getId());
 +
 +            if (volumeVO.getHypervisorSnapshotReserve() == null) {
 +                // update the volume's hv_ss_reserve (hypervisor snapshot reserve) from a disk offering (used for managed storage)
 +                volService.updateHypervisorSnapshotReserveForVolume(getDiskOfferingVO(volumeVO), volumeVO.getId(), getHypervisorType(volumeVO));
 +
 +                // hv_ss_reserve field might have been updated; refresh from DB to make use of it in getDataObjectSizeIncludingHypervisorSnapshotReserve
 +                volumeVO = _volumeDao.findById(volume.getId());
 +            }
 +
 +            // this if statement should resolve to true at most once per execution of the for loop its contained within (for a root disk that is
 +            // to leverage a template)
 +            if (volume.getTemplateId() != null) {
 +                VMTemplateVO tmpl = _templateDao.findByIdIncludingRemoved(volume.getTemplateId());
 +
 +                if (tmpl != null && !ImageFormat.ISO.equals(tmpl.getFormat())) {
 +                    allocatedSizeWithTemplate = _capacityMgr.getAllocatedPoolCapacity(poolVO, tmpl);
 +                }
 +            }
 +
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Pool ID for the volume with ID " + volumeVO.getId() + " is " + volumeVO.getPoolId());
 +            }
 +
 +            // A ready-state volume is already allocated in a pool, so the asking size is zero for it.
 +            // In case the volume is moving across pools or is not ready yet, the asking size has to be computed.
 +            if ((volumeVO.getState() != Volume.State.Ready) || (volumeVO.getPoolId() != pool.getId())) {
 +                totalAskingSize += getDataObjectSizeIncludingHypervisorSnapshotReserve(volumeVO, poolVO);
 +
 +                totalAskingSize += getAskingSizeForTemplateBasedOnClusterAndStoragePool(volumeVO.getTemplateId(), clusterId, poolVO);
 +            }
 +        }
 +
++        return checkPoolforSpace(pool, allocatedSizeWithTemplate, totalAskingSize);
++    }
++
++    @Override
++    public boolean storagePoolHasEnoughSpaceForResize(StoragePool pool, long currentSize, long newSiz) {
++        if (!checkUsagedSpace(pool)) {
++            return false;
++        }
++        if (s_logger.isDebugEnabled()) {
++            s_logger.debug("Destination pool id: " + pool.getId());
++        }
++        long totalAskingSize = newSiz - currentSize;
++
++        if (totalAskingSize <= 0) {
++            return true;
++        } else {
++            final StoragePoolVO poolVO = _storagePoolDao.findById(pool.getId());
++            final long allocatedSizeWithTemplate = _capacityMgr.getAllocatedPoolCapacity(poolVO, null);
++            return checkPoolforSpace(pool, allocatedSizeWithTemplate, totalAskingSize);
++        }
++    }
++
++    private boolean checkPoolforSpace(StoragePool pool, long allocatedSizeWithTemplate, long totalAskingSize) {
++        // allocated space includes templates
++        StoragePoolVO poolVO = _storagePoolDao.findById(pool.getId());
++
 +        long totalOverProvCapacity;
 +
 +        if (pool.getPoolType().supportsOverProvisioning()) {
 +            BigDecimal overProvFactor = getStorageOverProvisioningFactor(pool.getId());
 +
 +            totalOverProvCapacity = overProvFactor.multiply(new BigDecimal(pool.getCapacityBytes())).longValue();
 +
 +            s_logger.debug("Found storage pool " + poolVO.getName() + " of type " + pool.getPoolType().toString() + " with over-provisioning factor " + overProvFactor.toString());
 +            s_logger.debug("Total over-provisioned capacity calculated is " + overProvFactor + " * " + pool.getCapacityBytes());
 +        } else {
 +            totalOverProvCapacity = pool.getCapacityBytes();
 +
 +            s_logger.debug("Found storage pool " + poolVO.getName() + " of type " + pool.getPoolType().toString());
 +        }
 +
 +        s_logger.debug("Total capacity of the pool " + poolVO.getName() + " with ID " + pool.getId() + " is " + totalOverProvCapacity);
 +
 +        double storageAllocatedThreshold = CapacityManager.StorageAllocatedCapacityDisableThreshold.valueIn(pool.getDataCenterId());
 +
 +        if (s_logger.isDebugEnabled()) {
-             s_logger.debug("Checking pool with ID " + pool.getId() + " for volume allocation " + volumes.toString() + ", maxSize: " + totalOverProvCapacity + ", totalAllocatedSize: "
-                     + allocatedSizeWithTemplate + ", askingSize: " + totalAskingSize + ", allocated disable threshold: " + storageAllocatedThreshold);
++            s_logger.debug("Checking pool: " + pool.getId() + " for storage allocation , maxSize : " + totalOverProvCapacity + ", totalAllocatedSize : " + allocatedSizeWithTemplate
++                    + ", askingSize : " + totalAskingSize + ", allocated disable threshold: " + storageAllocatedThreshold);
 +        }
 +
 +        double usedPercentage = (allocatedSizeWithTemplate + totalAskingSize) / (double)(totalOverProvCapacity);
 +
 +        if (usedPercentage > storageAllocatedThreshold) {
 +            if (s_logger.isDebugEnabled()) {
-                 s_logger.debug("Insufficient un-allocated capacity on the pool with ID " + pool.getId() + " for volume allocation: " + volumes.toString() + " since its allocated percentage "
-                         + usedPercentage + " has crossed the allocated pool.storage.allocated.capacity.disablethreshold " + storageAllocatedThreshold + ", skipping this pool");
++                s_logger.debug("Insufficient un-allocated capacity on: " + pool.getId() + " for storage allocation since its allocated percentage: " + usedPercentage
++                        + " has crossed the allocated pool.storage.allocated.capacity.disablethreshold: " + storageAllocatedThreshold + ", skipping this pool");
 +            }
 +
 +            return false;
 +        }
 +
 +        if (totalOverProvCapacity < (allocatedSizeWithTemplate + totalAskingSize)) {
 +            if (s_logger.isDebugEnabled()) {
-                 s_logger.debug("Insufficient un-allocated capacity on the pool with ID " + pool.getId() + " for volume allocation: " + volumes.toString() + "; not enough storage, maxSize: "
-                         + totalOverProvCapacity + ", totalAllocatedSize: " + allocatedSizeWithTemplate + ", askingSize: " + totalAskingSize);
++                s_logger.debug("Insufficient un-allocated capacity on: " + pool.getId() + " for storage allocation, not enough storage, maxSize : " + totalOverProvCapacity
++                        + ", totalAllocatedSize : " + allocatedSizeWithTemplate + ", askingSize : " + totalAskingSize);
 +            }
 +
 +            return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Storage plug-ins for managed storage can be designed in such a way as to store a template on the primary storage once and
 +     * make use of it via storage-side cloning.
 +     *
 +     * This method determines how many more bytes it will need for the template (if the template is already stored on the primary storage,
 +     * then the answer is 0).
 +     */
 +    private long getAskingSizeForTemplateBasedOnClusterAndStoragePool(Long templateId, Long clusterId, StoragePoolVO storagePoolVO) {
 +        if (templateId == null || clusterId == null || storagePoolVO == null || !storagePoolVO.isManaged()) {
 +            return 0;
 +        }
 +
 +        VMTemplateVO tmpl = _templateDao.findByIdIncludingRemoved(templateId);
 +
 +        if (tmpl == null || ImageFormat.ISO.equals(tmpl.getFormat())) {
 +            return 0;
 +        }
 +
 +        HypervisorType hypervisorType = tmpl.getHypervisorType();
 +
 +        // The getSupportsResigning method is applicable for XenServer as a UUID-resigning patch may or may not be installed on those hypervisor hosts.
 +        if (_clusterDao.getSupportsResigning(clusterId) || HypervisorType.VMware.equals(hypervisorType) || HypervisorType.KVM.equals(hypervisorType)) {
 +            return getBytesRequiredForTemplate(tmpl, storagePoolVO);
 +        }
 +
 +        return 0;
 +    }
 +
 +    private long getDataObjectSizeIncludingHypervisorSnapshotReserve(Volume volume, StoragePool pool) {
 +        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
 +        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
 +
 +        if (storeDriver instanceof PrimaryDataStoreDriver) {
 +            PrimaryDataStoreDriver primaryStoreDriver = (PrimaryDataStoreDriver)storeDriver;
 +
 +            VolumeInfo volumeInfo = volFactory.getVolume(volume.getId());
 +
 +            return primaryStoreDriver.getDataObjectSizeIncludingHypervisorSnapshotReserve(volumeInfo, pool);
 +        }
 +
 +        return volume.getSize();
 +    }
 +
 +    private DiskOfferingVO getDiskOfferingVO(Volume volume) {
 +        Long diskOfferingId = volume.getDiskOfferingId();
 +
 +        return _diskOfferingDao.findById(diskOfferingId);
 +    }
 +
 +    private HypervisorType getHypervisorType(Volume volume) {
 +        Long instanceId = volume.getInstanceId();
 +
 +        VMInstanceVO vmInstance = _vmInstanceDao.findById(instanceId);
 +
 +        if (vmInstance != null) {
 +            return vmInstance.getHypervisorType();
 +        }
 +
 +        return null;
 +    }
 +
 +    private long getBytesRequiredForTemplate(VMTemplateVO tmpl, StoragePool pool) {
 +        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(pool.getStorageProviderName());
 +        DataStoreDriver storeDriver = storeProvider.getDataStoreDriver();
 +
 +        if (storeDriver instanceof PrimaryDataStoreDriver) {
 +            PrimaryDataStoreDriver primaryStoreDriver = (PrimaryDataStoreDriver)storeDriver;
 +
 +            TemplateInfo templateInfo = tmplFactory.getReadyTemplateOnImageStore(tmpl.getId(), pool.getDataCenterId());
 +
 +            return primaryStoreDriver.getBytesRequiredForTemplate(templateInfo, pool);
 +        }
 +
 +        return tmpl.getSize();
 +    }
 +
 +    @Override
 +    public void createCapacityEntry(long poolId) {
 +        StoragePoolVO storage = _storagePoolDao.findById(poolId);
 +        createCapacityEntry(storage, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, 0);
 +    }
 +
 +    @Override
 +    public synchronized boolean registerHostListener(String providerName, HypervisorHostListener listener) {
 +        hostListeners.put(providerName, listener);
 +        return true;
 +    }
 +
 +    @Override
 +    public Answer sendToPool(long poolId, Command cmd) throws StorageUnavailableException {
 +        return null;
 +    }
 +
 +    @Override
 +    public Answer[] sendToPool(long poolId, Commands cmd) throws StorageUnavailableException {
 +        return null;
 +    }
 +
 +    @Override
 +    public String getName() {
 +        return null;
 +    }
 +
 +    @Override
 +    public ImageStore discoverImageStore(String name, String url, String providerName, Long zoneId, Map details) throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException {
 +        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(providerName);
 +
 +        if (storeProvider == null) {
 +            storeProvider = _dataStoreProviderMgr.getDefaultImageDataStoreProvider();
 +            if (storeProvider == null) {
 +                throw new InvalidParameterValueException("can't find image store provider: " + providerName);
 +            }
 +            providerName = storeProvider.getName(); // ignored passed provider name and use default image store provider name
 +        }
 +
 +        ScopeType scopeType = ScopeType.ZONE;
 +        if (zoneId == null) {
 +            scopeType = ScopeType.REGION;
 +        }
 +
 +        if (name == null) {
 +            name = url;
 +        }
 +
 +        ImageStoreVO imageStore = _imageStoreDao.findByName(name);
 +        if (imageStore != null) {
 +            throw new InvalidParameterValueException("The image store with name " + name + " already exists, try creating with another name");
 +        }
 +
 +        // check if scope is supported by store provider
 +        if (!((ImageStoreProvider)storeProvider).isScopeSupported(scopeType)) {
 +            throw new InvalidParameterValueException("Image store provider " + providerName + " does not support scope " + scopeType);
 +        }
 +
 +        // check if we have already image stores from other different providers,
 +        // we currently are not supporting image stores from different
 +        // providers co-existing
 +        List<ImageStoreVO> imageStores = _imageStoreDao.listImageStores();
 +        for (ImageStoreVO store : imageStores) {
 +            if (!store.getProviderName().equalsIgnoreCase(providerName)) {
 +                throw new InvalidParameterValueException("You can only add new image stores from the same provider " + store.getProviderName() + " already added");
 +            }
 +        }
 +
 +        if (zoneId != null) {
 +            // Check if the zone exists in the system
 +            DataCenterVO zone = _dcDao.findById(zoneId);
 +            if (zone == null) {
 +                throw new InvalidParameterValueException("Can't find zone by id " + zoneId);
 +            }
 +
 +            Account account = CallContext.current().getCallingAccount();
 +            if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(account.getId())) {
 +                PermissionDeniedException ex = new PermissionDeniedException("Cannot perform this operation, Zone with specified id is currently disabled");
 +                ex.addProxyObject(zone.getUuid(), "dcId");
 +                throw ex;
 +            }
 +        }
 +
 +        Map<String, Object> params = new HashMap<>();
 +        params.put("zoneId", zoneId);
 +        params.put("url", url);
 +        params.put("name", name);
 +        params.put("details", details);
 +        params.put("scope", scopeType);
 +        params.put("providerName", storeProvider.getName());
 +        params.put("role", DataStoreRole.Image);
 +
 +        DataStoreLifeCycle lifeCycle = storeProvider.getDataStoreLifeCycle();
 +
 +        DataStore store;
 +        try {
 +            store = lifeCycle.initialize(params);
 +        } catch (Exception e) {
 +            if (s_logger.isDebugEnabled()) {
 +                s_logger.debug("Failed to add data store: " + e.getMessage(), e);
 +            }
 +            throw new CloudRuntimeException("Failed to add data store: " + e.getMessage(), e);
 +        }
 +
 +        if (((ImageStoreProvider)storeProvider).needDownloadSysTemplate()) {
 +            // trigger system vm template download
 +            _imageSrv.downloadBootstrapSysTemplate(store);
 +        } else {
 +            // populate template_store_ref table
 +            _imageSrv.addSystemVMTemplatesToSecondary(store);
 +        }
 +
 +        // associate builtin template with zones associated with this image store
 +        associateCrosszoneTemplatesToZone(zoneId);
 +
 +        // duplicate cache store records to region wide storage
 +        if (scopeType == ScopeType.REGION) {
 +            duplicateCacheStoreRecordsToRegionStore(store.getId());
 +        }
 +
 +        return (ImageStore)_dataStoreMgr.getDataStore(store.getId(), DataStoreRole.Image);
 +    }
 +
 +    @Override
 +    public ImageStore migrateToObjectStore(String name, String url, String providerName, Map<String, String> details) throws DiscoveryException, InvalidParameterValueException {
 +        // check if current cloud is ready to migrate, we only support cloud with only NFS secondary storages
 +        List<ImageStoreVO> imgStores = _imageStoreDao.listImageStores();
 +        List<ImageStoreVO> nfsStores = new ArrayList<ImageStoreVO>();
 +        if (imgStores != null && imgStores.size() > 0) {
 +            for (ImageStoreVO store : imgStores) {
 +                if (!store.getProviderName().equals(DataStoreProvider.NFS_IMAGE)) {
 +                    throw new InvalidParameterValueException("We only support migrate NFS secondary storage to use object store!");
 +                } else {
 +                    nfsStores.add(store);
 +                }
 +            }
 +        }
 +        // convert all NFS secondary storage to staging store
 +        if (nfsStores != null && nfsStores.size() > 0) {
 +            for (ImageStoreVO store : nfsStores) {
 +                long storeId = store.getId();
 +
 +                _accountMgr.checkAccessAndSpecifyAuthority(CallContext.current().getCallingAccount(), store.getDataCenterId());
 +
 +                DataStoreProvider provider = _dataStoreProviderMgr.getDataStoreProvider(store.getProviderName());
 +                DataStoreLifeCycle lifeCycle = provider.getDataStoreLifeCycle();
 +                DataStore secStore = _dataStoreMgr.getDataStore(storeId, DataStoreRole.Image);
 +                lifeCycle.migrateToObjectStore(secStore);
 +                // update store_role in template_store_ref and snapshot_store_ref to ImageCache
 +                _templateStoreDao.updateStoreRoleToCachce(storeId);
 +                _snapshotStoreDao.updateStoreRoleToCache(storeId);
 +            }
 +        }
 +        // add object store
 +        return discoverImageStore(name, url, providerName, null, details);
 +    }
 +
 +    private void duplicateCacheStoreRecordsToRegionStore(long storeId) {
 +        _templateStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
 +        _snapshotStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
 +        _volumeStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
 +    }
 +
 +    private void associateCrosszoneTemplatesToZone(Long zoneId) {
 +        VMTemplateZoneVO tmpltZone;
 +
 +        List<VMTemplateVO> allTemplates = _vmTemplateDao.listAll();
 +        List<Long> dcIds = new ArrayList<Long>();
 +        if (zoneId != null) {
 +            dcIds.add(zoneId);
 +        } else {
 +            List<DataCenterVO> dcs = _dcDao.listAll();
 +            if (dcs != null) {
 +                for (DataCenterVO dc : dcs) {
 +                    dcIds.add(dc.getId());
 +                }
 +            }
 +        }
 +
 +        for (VMTemplateVO vt : allTemplates) {
 +            if (vt.isCrossZones()) {
 +                for (Long dcId : dcIds) {
 +                    tmpltZone = _vmTemplateZoneDao.findByZoneTemplate(dcId, vt.getId());
 +                    if (tmpltZone == null) {
 +                        VMTemplateZoneVO vmTemplateZone = new VMTemplateZoneVO(dcId, vt.getId(), new Date());
 +                        _vmTemplateZoneDao.persist(vmTemplateZone);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public boolean deleteImageStore(DeleteImageStoreCmd cmd) {
 +        final long storeId = cmd.getId();
 +        // Verify that image store exists
 +        ImageStoreVO store = _imageStoreDao.findById(storeId);
 +        if (store == null) {
 +            throw new InvalidParameterValueException("Image store with id " + storeId + " doesn't exist");
 +        }
 +        _accountMgr.checkAccessAndSpecifyAuthority(CallContext.current().getCallingAccount(), store.getDataCenterId());
 +
 +        // Verify that there are no live snapshot, template, volume on the image
 +        // store to be deleted
 +        List<SnapshotDataStoreVO> snapshots = _snapshotStoreDao.listByStoreId(storeId, DataStoreRole.Image);
 +        if (snapshots != null && snapshots.size() > 0) {
 +            throw new InvalidParameterValueException("Cannot delete image store with active snapshots backup!");
 +        }
 +        List<VolumeDataStoreVO> volumes = _volumeStoreDao.listByStoreId(storeId);
 +        if (volumes != null && volumes.size() > 0) {
 +            throw new InvalidParameterValueException("Cannot delete image store with active volumes backup!");
 +        }
 +
 +        // search if there are user templates stored on this image store, excluding system, builtin templates
 +        List<TemplateJoinVO> templates = _templateViewDao.listActiveTemplates(storeId);
 +        if (templates != null && templates.size() > 0) {
 +            throw new InvalidParameterValueException("Cannot delete image store with active templates backup!");
 +        }
 +
 +        // ready to delete
 +        Transaction.execute(new TransactionCallbackNoReturn() {
 +            @Override
 +            public void doInTransactionWithoutResult(TransactionStatus status) {
 +                // first delete from image_store_details table, we need to do that since
 +                // we are not actually deleting record from main
 +                // image_data_store table, so delete cascade will not work
 +                _imageStoreDetailsDao.deleteDetails(storeId);
 +                _snapshotStoreDao.deletePrimaryRecordsForStore(storeId, DataStoreRole.Image);
 +                _volumeStoreDao.deletePrimaryRecordsForStore(storeId);
 +                _templateStoreDao.deletePrimaryRecordsForStore(storeId);
 +                _imageStoreDao.remove(storeId);
 +            }
 +        });
 +
 +        return true;
 +    }
 +
 +    @Override
 +    public ImageStore createSecondaryStagingStore(CreateSecondaryStagingStoreCmd cmd) {
 +        String providerName = cmd.getProviderName();
 +        DataStoreProvider storeProvider = _dataStoreProviderMgr.getDataStoreProvider(providerName);
 +
 +        if (storeProvider == null) {
 +            storeProvider = _dataStoreProviderMgr.getDefaultCacheDataStoreProvider();
 +            if (storeProvider == null) {
 +                throw new InvalidParameterValueException("can't find cache store provider: " + providerName);
 +            }
 +        }
 +
 +        Long dcId = cmd.getZoneId();
 +
 +        ScopeType scopeType = null;
 +        String scope = cmd.getScope();
 +        if (scope != null) {
 +            try {
 +                scopeType = Enum.valueOf(ScopeType.class, scope.toUpperCase());
 +
 +            } catch (Exception e) {
 +                throw new InvalidParameterValueException("invalid scope for cache store " + scope);
 +            }
 +
 +            if (scopeType != ScopeType.ZONE) {
 +                throw new InvalidParameterValueException("Only zone wide cache storage is supported");
 +            }
 +        }
 +
 +        if (scopeType == ScopeType.ZONE && dcId == null) {
 +            throw new InvalidParameterValueException("zone id can't be null, if scope is zone");
 +        }
 +
 +        // Check if the zone exists in the system
 +        DataCenterVO zone = _dcDao.findById(dcId);
 +        if (zone == null) {
 +            throw new InvalidParameterValueException("Can't find zone by id " + dcId);
 +        }
 +
 +        Account account = CallContext.current().getCallingAccount();
 +        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(account.getId())) {
 +            PermissionDeniedException ex = new PermissionDeniedException("Cannot perform this operation, Zone with specified id is currently disabled");
 +            ex.addProxyObject(zone.getUuid(), "dcId");
 +            throw ex;
 +        }
 +
 +        Map<String, Object> params = new HashMap<String, Object>();
 +        params.put("zoneId", dcId);
 +        params.put("url", cmd.getUrl());
 +        params.put("name", cmd.getUrl());
 +        params.put("details", cmd.getDetails());
 +        params.put("scope", scopeType);
 +        params.put("providerName", storeProvider.getName());
 +        params.put("role", DataStoreRole.ImageCache);
 +
 +        DataStoreLifeCycle lifeCycle = storeProvider.getDataStoreLifeCycle();
 +        DataStore store = null;
 +        try {
 +            store = lifeCycle.initialize(params);
 +        } catch (Exception e) {
 +            s_logger.debug("Failed to add data store: " + e.getMessage(), e);
 +            throw new CloudRuntimeException("Failed to add data store: " + e.getMessage(), e);
 +        }
 +
 +        return (ImageStore)_dataStoreMgr.getDataStore(store.getId(), DataStoreRole.ImageCache);
 +    }
 +
 +    @Override
 +    public boolean deleteSecondaryStagingStore(DeleteSecondaryStagingStoreCmd cmd) {
 +        final long storeId = cmd.getId();
 +        // Verify that cache store exists
 +        ImageStoreVO store = _imageStoreDao.findById(storeId);
 +        if (store == null) {
 +            throw new InvalidParameterValueException("Cache store with id " + storeId + " doesn't exist");
 +        }
 +        _accountMgr.checkAccessAndSpecifyAuthority(CallContext.current().getCallingAccount(), store.getDataCenterId());
 +
 +        // Verify that there are no live snapshot, template, volume on the cache
 +        // store that is currently referenced
 +        List<SnapshotDataStoreVO> snapshots = _snapshotStoreDao.listActiveOnCache(storeId);
 +        if (snapshots != null && snapshots.size() > 0) {
 +            throw new InvalidParameterValueException("Cannot delete cache store with staging snapshots currently in use!");
 +        }
 +        List<VolumeDataStoreVO> volumes = _volumeStoreDao.listActiveOnCache(storeId);
 +        if (volumes != null && volumes.size() > 0) {
 +            throw new InvalidParameterValueException("Cannot delete cache store with staging volumes currently in use!");
 +        }
 +
 +        List<TemplateDataStoreVO> templates = _templateStoreDao.listActiveOnCache(storeId);
 +        if (templates != null && templates.size() > 0) {
 +            throw new InvalidParameterValueException("Cannot delete cache store with staging templates currently in use!");
 +        }
 +
 +        // ready to delete
 +        Transaction.execute(new TransactionCallbackNoReturn() {
 +            @Override
 +            public void doInTransactionWithoutResult(TransactionStatus status) {
 +                // first delete from image_store_details table, we need to do that since
 +                // we are not actually deleting record from main
 +                // image_data_store table, so delete cascade will not work
 +                _imageStoreDetailsDao.deleteDetails(storeId);
 +                _snapshotStoreDao.deletePrimaryRecordsForStore(storeId, DataStoreRole.ImageCache);
 +                _volumeStoreDao.deletePrimaryRecordsForStore(storeId);
 +                _templateStoreDao.deletePrimaryRecordsForStore(storeId);
 +                _imageStoreDao.remove(storeId);
 +            }
 +        });
 +
 +        return true;
 +    }
 +
 +    protected class DownloadURLGarbageCollector implements Runnable {
 +
 +        public DownloadURLGarbageCollector() {
 +        }
 +
 +        @Override
 +        public void run() {
 +            try {
 +                s_logger.trace("Download URL Garbage Collection Thread is running.");
 +
 +                cleanupDownloadUrls();
 +
 +            } catch (Exception e) {
 +                s_logger.error("Caught the following Exception", e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void cleanupDownloadUrls() {
 +
 +        // Cleanup expired volume URLs
 +        List<VolumeDataStoreVO> volumesOnImageStoreList = _volumeStoreDao.listVolumeDownloadUrls();
 +        HashSet<Long> expiredVolumeIds = new HashSet<Long>();
 +        HashSet<Long> activeVolumeIds = new HashSet<Long>();
 +        for (VolumeDataStoreVO volumeOnImageStore : volumesOnImageStoreList) {
 +
 +            long volumeId = volumeOnImageStore.getVolumeId();
 +            try {
 +                long downloadUrlCurrentAgeInSecs = DateUtil.getTimeDifference(DateUtil.now(), volumeOnImageStore.getExtractUrlCreated());
 +                if (downloadUrlCurrentAgeInSecs < _downloadUrlExpirationInterval) {  // URL hasnt expired yet
 +                    activeVolumeIds.add(volumeId);
 +                    continue;
 +                }
 +                expiredVolumeIds.add(volumeId);
 +                s_logger.debug("Removing download url " + volumeOnImageStore.getExtractUrl() + " for volume id " + volumeId);
 +
 +                // Remove it from image store
 +                ImageStoreEntity secStore = (ImageStoreEntity)_dataStoreMgr.getDataStore(volumeOnImageStore.getDataStoreId(), DataStoreRole.Image);
 +                secStore.deleteExtractUrl(volumeOnImageStore.getInstallPath(), volumeOnImageStore.getExtractUrl(), Upload.Type.VOLUME);
 +
 +                // Now expunge it from DB since this entry was created only for download purpose
 +                _volumeStoreDao.expunge(volumeOnImageStore.getId());
 +            } catch (Throwable th) {
 +                s_logger.warn("Caught exception while deleting download url " + volumeOnImageStore.getExtractUrl() + " for volume id " + volumeOnImageStore.getVolumeId(), th);
 +            }
 +        }
 +        for (Long volumeId : expiredVolumeIds) {
 +            if (activeVolumeIds.contains(volumeId)) {
 +                continue;
 +            }
 +            Volume volume = _volumeDao.findById(volumeId);
 +            if (volume != null && volume.getState() == Volume.State.Expunged) {
 +                _volumeDao.remove(volumeId);
 +            }
 +        }
 +
 +        // Cleanup expired template URLs
 +        List<TemplateDataStoreVO> templatesOnImageStoreList = _templateStoreDao.listTemplateDownloadUrls();
 +        for (TemplateDataStoreVO templateOnImageStore : templatesOnImageStoreList) {
 +
 +            try {
 +                long downloadUrlCurrentAgeInSecs = DateUtil.getTimeDifference(DateUtil.now(), templateOnImageStore.getExtractUrlCreated());
 +                if (downloadUrlCurrentAgeInSecs < _downloadUrlExpirationInterval) {  // URL hasnt expired yet
 +                    continue;
 +                }
 +
 +                s_logger.debug("Removing download url " + templateOnImageStore.getExtractUrl() + " for template id " + templateOnImageStore.getTemplateId());
 +
 +                // Remove it from image store
 +                ImageStoreEntity secStore = (ImageStoreEntity)_dataStoreMgr.getDataStore(templateOnImageStore.getDataStoreId(), DataStoreRole.Image);
 +                secStore.deleteExtractUrl(templateOnImageStore.getInstallPath(), templateOnImageStore.getExtractUrl(), Upload.Type.TEMPLATE);
 +
 +                // Now remove download details from DB.
 +                templateOnImageStore.setExtractUrl(null);
 +                templateOnImageStore.setExtractUrlCreated(null);
 +                _templateStoreDao.update(templateOnImageStore.getId(), templateOnImageStore);
 +            } catch (Throwable th) {
 +                s_logger.warn("caught exception while deleting download url " + templateOnImageStore.getExtractUrl() + " for template id " + templateOnImageStore.getTemplateId(), th);
 +            }
 +        }
 +    }
 +
 +    // get bytesReadRate from service_offering, disk_offering and vm.disk.throttling.bytes_read_rate
 +    @Override
 +    public Long getDiskBytesReadRate(final ServiceOffering offering, final DiskOffering diskOffering) {
 +        if ((offering != null) && (offering.getBytesReadRate() != null) && (offering.getBytesReadRate() > 0)) {
 +            return offering.getBytesReadRate();
 +        } else if ((diskOffering != null) && (diskOffering.getBytesReadRate() != null) && (diskOffering.getBytesReadRate() > 0)) {
 +            return diskOffering.getBytesReadRate();
 +        } else {
 +            Long bytesReadRate = Long.parseLong(_configDao.getValue(Config.VmDiskThrottlingBytesReadRate.key()));
 +            if ((bytesReadRate > 0) && ((offering == null) || (!offering.getSystemUse()))) {
 +                return bytesReadRate;
 +            }
 +        }
 +        return 0L;
 +    }
 +
 +    // get bytesWriteRate from service_offering, disk_offering and vm.disk.throttling.bytes_write_rate
 +    @Override
 +    public Long getDiskBytesWriteRate(final ServiceOffering offering, final DiskOffering diskOffering) {
 +        if ((offering != null) && (offering.getBytesWriteRate() != null) && (offering.getBytesWriteRate() > 0)) {
 +            return offering.getBytesWriteRate();
 +        } else if ((diskOffering != null) && (diskOffering.getBytesWriteRate() != null) && (diskOffering.getBytesWriteRate() > 0)) {
 +            return diskOffering.getBytesWriteRate();
 +        } else {
 +            Long bytesWriteRate = Long.parseLong(_configDao.getValue(Config.VmDiskThrottlingBytesWriteRate.key()));
 +            if ((bytesWriteRate > 0) && ((offering == null) || (!offering.getSystemUse()))) {
 +                return bytesWriteRate;
 +            }
 +        }
 +        return 0L;
 +    }
 +
 +    // get iopsReadRate from service_offering, disk_offering and vm.disk.throttling.iops_read_rate
 +    @Override
 +    public Long getDiskIopsReadRate(final ServiceOffering offering, final DiskOffering diskOffering) {
 +        if ((offering != null) && (offering.getIopsReadRate() != null) && (offering.getIopsReadRate() > 0)) {
 +            return offering.getIopsReadRate();
 +        } else if ((diskOffering != null) && (diskOffering.getIopsReadRate() != null) && (diskOffering.getIopsReadRate() > 0)) {
 +            return diskOffering.getIopsReadRate();
 +        } else {
 +            Long iopsReadRate = Long.parseLong(_configDao.getValue(Config.VmDiskThrottlingIopsReadRate.key()));
 +            if ((iopsReadRate > 0) && ((offering == null) || (!offering.getSystemUse()))) {
 +                return iopsReadRate;
 +            }
 +        }
 +        return 0L;
 +    }
 +
 +    // get iopsWriteRate from service_offering, disk_offering and vm.disk.throttling.iops_write_rate
 +    @Override
 +    public Long getDiskIopsWriteRate(final ServiceOffering offering, final DiskOffering diskOffering) {
 +        if ((offering != null) && (offering.getIopsWriteRate() != null) && (offering.getIopsWriteRate() > 0)) {
 +            return offering.getIopsWriteRate();
 +        } else if ((diskOffering != null) && (diskOffering.getIopsWriteRate() != null) && (diskOffering.getIopsWriteRate() > 0)) {
 +            return diskOffering.getIopsWriteRate();
 +        } else {
 +            Long iopsWriteRate = Long.parseLong(_configDao.getValue(Config.VmDiskThrottlingIopsWriteRate.key()));
 +            if ((iopsWriteRate > 0) && ((offering == null) || (!offering.getSystemUse()))) {
 +                return iopsWriteRate;
 +            }
 +        }
 +        return 0L;
 +    }
 +
 +    @Override
 +    public String getConfigComponentName() {
 +        return StorageManager.class.getSimpleName();
 +    }
 +
 +    @Override
 +    public ConfigKey<?>[] getConfigKeys() {
 +        return new ConfigKey<?>[] { StorageCleanupInterval, StorageCleanupDelay, StorageCleanupEnabled, TemplateCleanupEnabled,
 +                KvmStorageOfflineMigrationWait, KvmStorageOnlineMigrationWait, MaxNumberOfManagedClusteredFileSystems };
 +    }
 +
 +    @Override
 +    public void setDiskProfileThrottling(DiskProfile dskCh, final ServiceOffering offering, final DiskOffering diskOffering) {
 +        dskCh.setBytesReadRate(getDiskBytesReadRate(offering, diskOffering));
 +        dskCh.setBytesWriteRate(getDiskBytesWriteRate(offering, diskOffering));
 +        dskCh.setIopsReadRate(getDiskIopsReadRate(offering, diskOffering));
 +        dskCh.setIopsWriteRate(getDiskIopsWriteRate(offering, diskOffering));
 +    }
 +
 +    @Override
 +    public DiskTO getDiskWithThrottling(final DataTO volTO, final Volume.Type volumeType, final long deviceId, final String path, final long offeringId, final long diskOfferingId) {
 +        DiskTO disk = null;
 +        if (volTO != null && volTO instanceof VolumeObjectTO) {
 +            VolumeObjectTO volumeTO = (VolumeObjectTO)volTO;
 +            ServiceOffering offering = _entityMgr.findById(ServiceOffering.class, offeringId);
 +            DiskOffering diskOffering = _entityMgr.findById(DiskOffering.class, diskOfferingId);
 +            if (volumeType == Volume.Type.ROOT) {
 +                setVolumeObjectTOThrottling(volumeTO, offering, diskOffering);
 +            } else {
 +                setVolumeObjectTOThrottling(volumeTO, null, diskOffering);
 +            }
 +            disk = new DiskTO(volumeTO, deviceId, path, volumeType);
 +        } else {
 +            disk = new DiskTO(volTO, deviceId, path, volumeType);
 +        }
 +        return disk;
 +    }
 +
 +    private void setVolumeObjectTOThrottling(VolumeObjectTO volumeTO, final ServiceOffering offering, final DiskOffering diskOffering) {
 +        volumeTO.setBytesReadRate(getDiskBytesReadRate(offering, diskOffering));
 +        volumeTO.setBytesWriteRate(getDiskBytesWriteRate(offering, diskOffering));
 +        volumeTO.setIopsReadRate(getDiskIopsReadRate(offering, diskOffering));
 +        volumeTO.setIopsWriteRate(getDiskIopsWriteRate(offering, diskOffering));
 +    }
 +
 +}
diff --cc server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
index 3160dd3,0000000..b0c0b7c
mode 100644,000000..100644
--- a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
@@@ -1,3340 -1,0 +1,3350 @@@
 +// Licensed to the Apache Software Foundation (ASF) under one
 +// or more contributor license agreements.  See the NOTICE file
 +// distributed with this work for additional information
 +// regarding copyright ownership.  The ASF licenses this file
 +// to you under the Apache License, Version 2.0 (the
 +// "License"); you may not use this file except in compliance
 +// with the License.  You may obtain a copy of the License at
 +//
 +//   http://www.apache.org/licenses/LICENSE-2.0
 +//
 +// Unless required by applicable law or agreed to in writing,
 +// software distributed under the License is distributed on an
 +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +// KIND, either express or implied.  See the License for the
 +// specific language governing permissions and limitations
 +// under the License.
 +package com.cloud.storage;
 +
 +import java.net.MalformedURLException;
 +import java.net.URL;
 +import java.util.ArrayList;
 +import java.util.Arrays;
++import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutionException;
 +
 +import javax.inject.Inject;
 +
 +import org.apache.cloudstack.api.command.user.volume.AttachVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.CreateVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.DetachVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.ExtractVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.GetUploadParamsForVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.MigrateVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
 +import org.apache.cloudstack.api.command.user.volume.UploadVolumeCmd;
 +import org.apache.cloudstack.api.response.GetUploadParamsResponse;
 +import org.apache.cloudstack.context.CallContext;
 +import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
 +import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 +import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 +import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
 +import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
 +import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
 +import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
 +import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult;
 +import org.apache.cloudstack.framework.async.AsyncCallFuture;
 +import org.apache.cloudstack.framework.config.ConfigKey;
 +import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 +import org.apache.cloudstack.framework.jobs.AsyncJob;
 +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 +import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 +import org.apache.cloudstack.framework.jobs.Outcome;
 +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
 +import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
 +import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 +import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 +import org.apache.cloudstack.jobs.JobInfo;
 +import org.apache.cloudstack.storage.command.AttachAnswer;
 +import org.apache.cloudstack.storage.command.AttachCommand;
 +import org.apache.cloudstack.storage.command.DettachCommand;
 +import org.apache.cloudstack.storage.command.TemplateOrVolumePostUploadCommand;
 +import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
 +import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 +import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 +import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
 +import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
 +import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
 +import org.apache.cloudstack.utils.identity.ManagementServerNode;
 +import org.apache.cloudstack.utils.imagestore.ImageStoreUtil;
 +import org.apache.cloudstack.utils.volume.VirtualMachineDiskInfo;
 +import org.apache.commons.collections.CollectionUtils;
 +import org.apache.log4j.Logger;
 +import org.joda.time.DateTime;
 +import org.joda.time.DateTimeZone;
 +
 +import com.cloud.agent.AgentManager;
 +import com.cloud.agent.api.Answer;
 +import com.cloud.agent.api.ModifyTargetsCommand;
 +import com.cloud.agent.api.to.DataTO;
 +import com.cloud.agent.api.to.DiskTO;
 +import com.cloud.api.ApiDBUtils;
 +import com.cloud.configuration.Config;
 +import com.cloud.configuration.ConfigurationManager;
 +import com.cloud.configuration.Resource.ResourceType;
 +import com.cloud.dc.ClusterDetailsDao;
 +import com.cloud.dc.DataCenter;
 +import com.cloud.dc.DataCenterVO;
 +import com.cloud.dc.dao.DataCenterDao;
 +import com.cloud.domain.Domain;
 +import com.cloud.event.ActionEvent;
 +import com.cloud.event.EventTypes;
 +import com.cloud.event.UsageEventUtils;
 +import com.cloud.exception.ConcurrentOperationException;
 +import com.cloud.exception.InvalidParameterValueException;
 +import com.cloud.exception.PermissionDeniedException;
 +import com.cloud.exception.ResourceAllocationException;
 +import com.cloud.exception.StorageUnavailableException;
 +import com.cloud.gpu.GPU;
 +import com.cloud.host.HostVO;
 +import com.cloud.host.dao.HostDao;
 +import com.cloud.hypervisor.Hypervisor.HypervisorType;
 +import com.cloud.hypervisor.HypervisorCapabilitiesVO;
 +import com.cloud.hypervisor.dao.HypervisorCapabilitiesDao;
 +import com.cloud.org.Grouping;
 +import com.cloud.serializer.GsonHelper;
 +import com.cloud.service.dao.ServiceOfferingDetailsDao;
 +import com.cloud.storage.Storage.ImageFormat;
 +import com.cloud.storage.dao.DiskOfferingDao;
 +import com.cloud.storage.dao.SnapshotDao;
 +import com.cloud.storage.dao.VMTemplateDao;
 +import com.cloud.storage.dao.VolumeDao;
 +import com.cloud.storage.snapshot.SnapshotApiService;
 +import com.cloud.storage.snapshot.SnapshotManager;
 +import com.cloud.template.TemplateManager;
 +import com.cloud.user.Account;
 +import com.cloud.user.AccountManager;
 +import com.cloud.user.ResourceLimitService;
 +import com.cloud.user.User;
 +import com.cloud.user.VmDiskStatisticsVO;
 +import com.cloud.user.dao.AccountDao;
 +import com.cloud.user.dao.VmDiskStatisticsDao;
 +import com.cloud.utils.DateUtil;
 +import com.cloud.utils.EncryptionUtil;
 +import com.cloud.utils.EnumUtils;
 +import com.cloud.utils.NumbersUtil;
 +import com.cloud.utils.Pair;
 +import com.cloud.utils.Predicate;
 +import com.cloud.utils.ReflectionUse;
 +import com.cloud.utils.StringUtils;
 +import com.cloud.utils.UriUtils;
 +import com.cloud.utils.component.ManagerBase;
 +import com.cloud.utils.db.DB;
 +import com.cloud.utils.db.EntityManager;
 +import com.cloud.utils.db.Transaction;
 +import com.cloud.utils.db.TransactionCallback;
 +import com.cloud.utils.db.TransactionCallbackWithException;
 +import com.cloud.utils.db.TransactionStatus;
 +import com.cloud.utils.db.UUIDManager;
 +import com.cloud.utils.exception.CloudRuntimeException;
 +import com.cloud.utils.fsm.NoTransitionException;
 +import com.cloud.utils.fsm.StateMachine2;
 +import com.cloud.vm.UserVmManager;
 +import com.cloud.vm.UserVmService;
 +import com.cloud.vm.UserVmVO;
 +import com.cloud.vm.VMInstanceVO;
 +import com.cloud.vm.VirtualMachine;
 +import com.cloud.vm.VirtualMachine.State;
 +import com.cloud.vm.VmDetailConstants;
 +import com.cloud.vm.VmWork;
 +import com.cloud.vm.VmWorkAttachVolume;
 +import com.cloud.vm.VmWorkConstants;
 +import com.cloud.vm.VmWorkDetachVolume;
 +import com.cloud.vm.VmWorkExtractVolume;
 +import com.cloud.vm.VmWorkJobHandler;
 +import com.cloud.vm.VmWorkJobHandlerProxy;
 +import com.cloud.vm.VmWorkMigrateVolume;
 +import com.cloud.vm.VmWorkResizeVolume;
 +import com.cloud.vm.VmWorkSerializer;
 +import com.cloud.vm.VmWorkTakeVolumeSnapshot;
 +import com.cloud.vm.dao.UserVmDao;
 +import com.cloud.vm.dao.VMInstanceDao;
 +import com.cloud.vm.snapshot.VMSnapshotVO;
 +import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 +import com.google.common.base.Strings;
 +import com.google.gson.Gson;
 +import com.google.gson.GsonBuilder;
 +import com.google.gson.JsonParseException;
 +
 +public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiService, VmWorkJobHandler {
 +    private final static Logger s_logger = Logger.getLogger(VolumeApiServiceImpl.class);
 +    public static final String VM_WORK_JOB_HANDLER = VolumeApiServiceImpl.class.getSimpleName();
 +
 +    @Inject
 +    private UserVmManager _userVmMgr;
 +    @Inject
 +    private VolumeOrchestrationService _volumeMgr;
 +    @Inject
 +    private EntityManager _entityMgr;
 +    @Inject
 +    private AgentManager _agentMgr;
 +    @Inject
 +    private TemplateManager _tmpltMgr;
 +    @Inject
 +    private SnapshotManager _snapshotMgr;
 +    @Inject
 +    private AccountManager _accountMgr;
 +    @Inject
 +    private ConfigurationManager _configMgr;
 +    @Inject
 +    private VolumeDao _volsDao;
 +    @Inject
 +    private HostDao _hostDao;
 +    @Inject
 +    private SnapshotDao _snapshotDao;
 +    @Inject
 +    private ServiceOfferingDetailsDao _serviceOfferingDetailsDao;
 +    @Inject
 +    private UserVmDao _userVmDao;
 +    @Inject
 +    private UserVmService _userVmService;
 +    @Inject
 +    private VolumeDataStoreDao _volumeStoreDao;
 +    @Inject
 +    private VMInstanceDao _vmInstanceDao;
 +    @Inject
 +    private PrimaryDataStoreDao _storagePoolDao;
 +    @Inject
 +    private DiskOfferingDao _diskOfferingDao;
 +    @Inject
 +    private AccountDao _accountDao;
 +    @Inject
 +    private DataCenterDao _dcDao;
 +    @Inject
 +    private VMTemplateDao _templateDao;
 +    @Inject
 +    private ResourceLimitService _resourceLimitMgr;
 +    @Inject
 +    private VmDiskStatisticsDao _vmDiskStatsDao;
 +    @Inject
 +    private VMSnapshotDao _vmSnapshotDao;
 +    @Inject
 +    private ConfigurationDao _configDao;
 +    @Inject
 +    private DataStoreManager dataStoreMgr;
 +    @Inject
 +    private VolumeService volService;
 +    @Inject
 +    private VolumeDataFactory volFactory;
 +    @Inject
 +    private SnapshotApiService snapshotMgr;
 +    @Inject
 +    private UUIDManager _uuidMgr;
 +    @Inject
 +    private HypervisorCapabilitiesDao _hypervisorCapabilitiesDao;
 +    @Inject
 +    private AsyncJobManager _jobMgr;
 +    @Inject
 +    private VmWorkJobDao _workJobDao;
 +    @Inject
 +    private ClusterDetailsDao _clusterDetailsDao;
 +    @Inject
 +    private StorageManager storageMgr;
 +    @Inject
 +    private StoragePoolDetailsDao storagePoolDetailsDao;
 +    @Inject
 +    private StorageUtil storageUtil;
 +
 +    protected Gson _gson;
 +
 +    private List<StoragePoolAllocator> _storagePoolAllocators;
 +
 +    VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
 +
 +    static final ConfigKey<Long> VmJobCheckInterval = new ConfigKey<Long>("Advanced", Long.class, "vm.job.check.interval", "3000", "Interval in milliseconds to check if the job is complete", false);
 +
 +    static final ConfigKey<Boolean> VolumeUrlCheck = new ConfigKey<Boolean>("Advanced", Boolean.class, "volume.url.check", "true",
 +            "Check the url for a volume before downloading it from the management server. Set to false when you managment has no internet access.", true);
 +
 +    private long _maxVolumeSizeInGb;
 +    private final StateMachine2<Volume.State, Volume.Event, Volume> _volStateMachine;
 +
 +    protected VolumeApiServiceImpl() {
 +        _volStateMachine = Volume.State.getStateMachine();
 +        _gson = GsonHelper.getGsonLogger();
 +    }
 +
 +    /*
 +     * Upload the volume to secondary storage.
 +     */
 +    @Override
 +    @DB
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_UPLOAD, eventDescription = "uploading volume", async = true)
 +    public VolumeVO uploadVolume(UploadVolumeCmd cmd) throws ResourceAllocationException {
 +        Account caller = CallContext.current().getCallingAccount();
 +        long ownerId = cmd.getEntityOwnerId();
 +        Account owner = _entityMgr.findById(Account.class, ownerId);
 +        Long zoneId = cmd.getZoneId();
 +        String volumeName = cmd.getVolumeName();
 +        String url = cmd.getUrl();
 +        String format = cmd.getFormat();
 +        Long diskOfferingId = cmd.getDiskOfferingId();
 +        String imageStoreUuid = cmd.getImageStoreUuid();
 +        DataStore store = _tmpltMgr.getImageStore(imageStoreUuid, zoneId);
 +
 +        validateVolume(caller, ownerId, zoneId, volumeName, url, format, diskOfferingId);
 +
 +        VolumeVO volume = persistVolume(owner, zoneId, volumeName, url, cmd.getFormat(), diskOfferingId, Volume.State.Allocated);
 +
 +        VolumeInfo vol = volFactory.getVolume(volume.getId());
 +
 +        RegisterVolumePayload payload = new RegisterVolumePayload(cmd.getUrl(), cmd.getChecksum(), cmd.getFormat());
 +        vol.addPayload(payload);
 +
 +        volService.registerVolume(vol, store);
 +        return volume;
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_UPLOAD, eventDescription = "uploading volume for post upload", async = true)
 +    public GetUploadParamsResponse uploadVolume(final GetUploadParamsForVolumeCmd cmd) throws ResourceAllocationException, MalformedURLException {
 +        Account caller = CallContext.current().getCallingAccount();
 +        long ownerId = cmd.getEntityOwnerId();
 +        final Account owner = _entityMgr.findById(Account.class, ownerId);
 +        final Long zoneId = cmd.getZoneId();
 +        final String volumeName = cmd.getName();
 +        String format = cmd.getFormat();
 +        final Long diskOfferingId = cmd.getDiskOfferingId();
 +        String imageStoreUuid = cmd.getImageStoreUuid();
 +        final DataStore store = _tmpltMgr.getImageStore(imageStoreUuid, zoneId);
 +
 +        validateVolume(caller, ownerId, zoneId, volumeName, null, format, diskOfferingId);
 +
 +        return Transaction.execute(new TransactionCallbackWithException<GetUploadParamsResponse, MalformedURLException>() {
 +            @Override
 +            public GetUploadParamsResponse doInTransaction(TransactionStatus status) throws MalformedURLException {
 +
 +                VolumeVO volume = persistVolume(owner, zoneId, volumeName, null, cmd.getFormat(), diskOfferingId, Volume.State.NotUploaded);
 +
 +                VolumeInfo vol = volFactory.getVolume(volume.getId());
 +
 +                RegisterVolumePayload payload = new RegisterVolumePayload(null, cmd.getChecksum(), cmd.getFormat());
 +                vol.addPayload(payload);
 +
 +                Pair<EndPoint, DataObject> pair = volService.registerVolumeForPostUpload(vol, store);
 +                EndPoint ep = pair.first();
 +                DataObject dataObject = pair.second();
 +
 +                GetUploadParamsResponse response = new GetUploadParamsResponse();
 +
 +                String ssvmUrlDomain = _configDao.getValue(Config.SecStorageSecureCopyCert.key());
 +
 +                String url = ImageStoreUtil.generatePostUploadUrl(ssvmUrlDomain, ep.getPublicAddr(), vol.getUuid());
 +                response.setPostURL(new URL(url));
 +
 +                // set the post url, this is used in the monitoring thread to determine the SSVM
 +                VolumeDataStoreVO volumeStore = _volumeStoreDao.findByVolume(vol.getId());
 +                assert (volumeStore != null) : "sincle volume is registered, volumestore cannot be null at this stage";
 +                volumeStore.setExtractUrl(url);
 +                _volumeStoreDao.persist(volumeStore);
 +
 +                response.setId(UUID.fromString(vol.getUuid()));
 +
 +                int timeout = ImageStoreUploadMonitorImpl.getUploadOperationTimeout();
 +                DateTime currentDateTime = new DateTime(DateTimeZone.UTC);
 +                String expires = currentDateTime.plusMinutes(timeout).toString();
 +                response.setTimeout(expires);
 +
 +                String key = _configDao.getValue(Config.SSVMPSK.key());
 +                /*
 +                 * encoded metadata using the post upload config key
 +                 */
 +                TemplateOrVolumePostUploadCommand command = new TemplateOrVolumePostUploadCommand(vol.getId(), vol.getUuid(), volumeStore.getInstallPath(), cmd.getChecksum(), vol.getType().toString(),
 +                        vol.getName(), vol.getFormat().toString(), dataObject.getDataStore().getUri(), dataObject.getDataStore().getRole().toString());
 +                command.setLocalPath(volumeStore.getLocalDownloadPath());
 +                //using the existing max upload size configuration
 +                command.setProcessTimeout(NumbersUtil.parseLong(_configDao.getValue("vmware.package.ova.timeout"), 3600));
 +                command.setMaxUploadSize(_configDao.getValue(Config.MaxUploadVolumeSize.key()));
 +                command.setDefaultMaxAccountSecondaryStorage(_configDao.getValue(Config.DefaultMaxAccountSecondaryStorage.key()));
 +                command.setAccountId(vol.getAccountId());
 +                Gson gson = new GsonBuilder().create();
 +                String metadata = EncryptionUtil.encodeData(gson.toJson(command), key);
 +                response.setMetadata(metadata);
 +
 +                /*
 +                 * signature calculated on the url, expiry, metadata.
 +                 */
 +                response.setSignature(EncryptionUtil.generateSignature(metadata + url + expires, key));
 +                return response;
 +            }
 +        });
 +    }
 +
 +    private boolean validateVolume(Account caller, long ownerId, Long zoneId, String volumeName, String url, String format, Long diskOfferingId) throws ResourceAllocationException {
 +
 +        // permission check
 +        Account volumeOwner = _accountMgr.getActiveAccountById(ownerId);
 +        _accountMgr.checkAccess(caller, null, true, volumeOwner);
 +
 +        // Check that the resource limit for volumes won't be exceeded
 +        _resourceLimitMgr.checkResourceLimit(volumeOwner, ResourceType.volume);
 +
 +        // Verify that zone exists
 +        DataCenterVO zone = _dcDao.findById(zoneId);
 +        if (zone == null) {
 +            throw new InvalidParameterValueException("Unable to find zone by id " + zoneId);
 +        }
 +
 +        // Check if zone is disabled
 +        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(caller.getId())) {
 +            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zoneId);
 +        }
 +
 +        //validating the url only when url is not null. url can be null incase of form based post upload
 +        if (url != null) {
 +            if (url.toLowerCase().contains("file://")) {
 +                throw new InvalidParameterValueException("File:// type urls are currently unsupported");
 +            }
 +            UriUtils.validateUrl(format, url);
 +            if (VolumeUrlCheck.value()) { // global setting that can be set when their MS does not have internet access
 +                s_logger.debug("Checking url: " + url);
 +                UriUtils.checkUrlExistence(url);
 +            }
 +            // Check that the resource limit for secondary storage won't be exceeded
 +            _resourceLimitMgr.checkResourceLimit(_accountMgr.getAccount(ownerId), ResourceType.secondary_storage, UriUtils.getRemoteSize(url));
 +        } else {
 +            _resourceLimitMgr.checkResourceLimit(_accountMgr.getAccount(ownerId), ResourceType.secondary_storage);
 +        }
 +
 +        try {
 +            ImageFormat.valueOf(format.toUpperCase());
 +        } catch (IllegalArgumentException e) {
 +            s_logger.debug("ImageFormat IllegalArgumentException: " + e.getMessage());
 +            throw new IllegalArgumentException("Image format: " + format + " is incorrect. Supported formats are " + EnumUtils.listValues(ImageFormat.values()));
 +        }
 +
 +        // Check that the the disk offering specified is valid
 +        if (diskOfferingId != null) {
 +            DiskOfferingVO diskOffering = _diskOfferingDao.findById(diskOfferingId);
 +            if ((diskOffering == null) || diskOffering.getRemoved() != null || !DiskOfferingVO.Type.Disk.equals(diskOffering.getType())) {
 +                throw new InvalidParameterValueException("Please specify a valid disk offering.");
 +            }
 +            if (!diskOffering.isCustomized()) {
 +                throw new InvalidParameterValueException("Please specify a custom sized disk offering.");
 +            }
 +
 +            if (diskOffering.getDomainId() == null) {
 +                // do nothing as offering is public
 +            } else {
 +                _configMgr.checkDiskOfferingAccess(volumeOwner, diskOffering);
 +            }
 +        }
 +
 +        return false;
 +    }
 +
 +    public String getRandomVolumeName() {
 +        return UUID.randomUUID().toString();
 +    }
 +
 +    @DB
 +    protected VolumeVO persistVolume(final Account owner, final Long zoneId, final String volumeName, final String url, final String format, final Long diskOfferingId, final Volume.State state) {
 +        return Transaction.execute(new TransactionCallback<VolumeVO>() {
 +            @Override
 +            public VolumeVO doInTransaction(TransactionStatus status) {
 +                VolumeVO volume = new VolumeVO(volumeName, zoneId, -1, -1, -1, new Long(-1), null, null, Storage.ProvisioningType.THIN, 0, Volume.Type.DATADISK);
 +                volume.setPoolId(null);
 +                volume.setDataCenterId(zoneId);
 +                volume.setPodId(null);
 +                volume.setState(state); // initialize the state
 +                // to prevent a null pointer deref I put the system account id here when no owner is given.
 +                // TODO Decide if this is valid or whether  throwing a CloudRuntimeException is more appropriate
 +                volume.setAccountId((owner == null) ? Account.ACCOUNT_ID_SYSTEM : owner.getAccountId());
 +                volume.setDomainId((owner == null) ? Domain.ROOT_DOMAIN : owner.getDomainId());
 +
 +                if (diskOfferingId == null) {
 +                    DiskOfferingVO diskOfferingVO = _diskOfferingDao.findByUniqueName("Cloud.com-Custom");
 +                    if (diskOfferingVO != null) {
 +                        long defaultDiskOfferingId = diskOfferingVO.getId();
 +                        volume.setDiskOfferingId(defaultDiskOfferingId);
 +                    }
 +                } else {
 +                    volume.setDiskOfferingId(diskOfferingId);
 +
 +                    DiskOfferingVO diskOfferingVO = _diskOfferingDao.findById(diskOfferingId);
 +
 +                    Boolean isCustomizedIops = diskOfferingVO != null && diskOfferingVO.isCustomizedIops() != null ? diskOfferingVO.isCustomizedIops() : false;
 +
 +                    if (isCustomizedIops == null || !isCustomizedIops) {
 +                        volume.setMinIops(diskOfferingVO.getMinIops());
 +                        volume.setMaxIops(diskOfferingVO.getMaxIops());
 +                    }
 +                }
 +
 +                // volume.setSize(size);
 +                volume.setInstanceId(null);
 +                volume.setUpdated(new Date());
 +                volume.setDomainId((owner == null) ? Domain.ROOT_DOMAIN : owner.getDomainId());
 +                volume.setFormat(ImageFormat.valueOf(format));
 +                volume = _volsDao.persist(volume);
 +                CallContext.current().setEventDetails("Volume Id: " + volume.getUuid());
 +
 +                // Increment resource count during allocation; if actual creation fails,
 +                // decrement it
 +                _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.volume);
 +                //url can be null incase of postupload
 +                if (url != null) {
 +                    _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.secondary_storage, UriUtils.getRemoteSize(url));
 +                }
 +
 +                return volume;
 +            }
 +        });
 +    }
 +
 +    /**
 +     * Retrieves the volume name from CreateVolumeCmd object.
 +     *
 +     * If the retrieved volume name is null, empty or blank, then A random name
 +     * will be generated using getRandomVolumeName method.
 +     *
 +     * @param cmd
 +     * @return Either the retrieved name or a random name.
 +     */
 +    public String getVolumeNameFromCommand(CreateVolumeCmd cmd) {
 +        String userSpecifiedName = cmd.getVolumeName();
 +
 +        if (org.apache.commons.lang.StringUtils.isBlank(userSpecifiedName)) {
 +            userSpecifiedName = getRandomVolumeName();
 +        }
 +
 +        return userSpecifiedName;
 +    }
 +
 +    /*
 +     * Just allocate a volume in the database, don't send the createvolume cmd
 +     * to hypervisor. The volume will be finally created only when it's attached
 +     * to a VM.
 +     */
 +    @Override
 +    @DB
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_CREATE, eventDescription = "creating volume", create = true)
 +    public VolumeVO allocVolume(CreateVolumeCmd cmd) throws ResourceAllocationException {
 +        Account caller = CallContext.current().getCallingAccount();
 +
 +        long ownerId = cmd.getEntityOwnerId();
 +        Account owner = _accountMgr.getActiveAccountById(ownerId);
 +        Boolean displayVolume = cmd.getDisplayVolume();
 +
 +        // permission check
 +        _accountMgr.checkAccess(caller, null, true, _accountMgr.getActiveAccountById(ownerId));
 +
 +        if (displayVolume == null) {
 +            displayVolume = true;
 +        } else {
 +            if (!_accountMgr.isRootAdmin(caller.getId())) {
 +                throw new PermissionDeniedException("Cannot update parameter displayvolume, only admin permitted ");
 +            }
 +        }
 +
 +        // Check that the resource limit for volumes won't be exceeded
 +        _resourceLimitMgr.checkResourceLimit(owner, ResourceType.volume, displayVolume);
 +
 +        Long zoneId = cmd.getZoneId();
 +        Long diskOfferingId = null;
 +        DiskOfferingVO diskOffering = null;
 +        Storage.ProvisioningType provisioningType;
 +        Long size = null;
 +        Long minIops = null;
 +        Long maxIops = null;
 +        // Volume VO used for extracting the source template id
 +        VolumeVO parentVolume = null;
 +
 +        // validate input parameters before creating the volume
 +        if ((cmd.getSnapshotId() == null && cmd.getDiskOfferingId() == null) || (cmd.getSnapshotId() != null && cmd.getDiskOfferingId() != null)) {
 +            throw new InvalidParameterValueException("Either disk Offering Id or snapshot Id must be passed whilst creating volume");
 +        }
 +
 +        if (cmd.getSnapshotId() == null) {// create a new volume
 +
 +            diskOfferingId = cmd.getDiskOfferingId();
 +            size = cmd.getSize();
 +            Long sizeInGB = size;
 +            if (size != null) {
 +                if (size > 0) {
 +                    size = size * 1024 * 1024 * 1024; // user specify size in GB
 +                } else {
 +                    throw new InvalidParameterValueException("Disk size must be larger than 0");
 +                }
 +            }
 +
 +            // Check that the the disk offering is specified
 +            diskOffering = _diskOfferingDao.findById(diskOfferingId);
 +            if ((diskOffering == null) || diskOffering.getRemoved() != null || !DiskOfferingVO.Type.Disk.equals(diskOffering.getType())) {
 +                throw new InvalidParameterValueException("Please specify a valid disk offering.");
 +            }
 +
 +            if (diskOffering.isCustomized()) {
 +                if (size == null) {
 +                    throw new InvalidParameterValueException("This disk offering requires a custom size specified");
 +                }
 +                Long customDiskOfferingMaxSize = VolumeOrchestrationService.CustomDiskOfferingMaxSize.value();
 +                Long customDiskOfferingMinSize = VolumeOrchestrationService.CustomDiskOfferingMinSize.value();
 +
 +                if ((sizeInGB < customDiskOfferingMinSize) || (sizeInGB > customDiskOfferingMaxSize)) {
 +                    throw new InvalidParameterValueException("Volume size: " + sizeInGB + "GB is out of allowed range. Max: " + customDiskOfferingMaxSize + " Min:" + customDiskOfferingMinSize);
 +                }
 +            }
 +
 +            if (!diskOffering.isCustomized() && size != null) {
 +                throw new InvalidParameterValueException("This disk offering does not allow custom size");
 +            }
 +
 +            if (diskOffering.getDomainId() == null) {
 +                // do nothing as offering is public
 +            } else {
 +                _configMgr.checkDiskOfferingAccess(caller, diskOffering);
 +            }
 +
 +            if (diskOffering.getDiskSize() > 0) {
 +                size = diskOffering.getDiskSize();
 +            }
 +
 +            Boolean isCustomizedIops = diskOffering.isCustomizedIops();
 +
 +            if (isCustomizedIops != null) {
 +                if (isCustomizedIops) {
 +                    minIops = cmd.getMinIops();
 +                    maxIops = cmd.getMaxIops();
 +
 +                    if (minIops == null && maxIops == null) {
 +                        minIops = 0L;
 +                        maxIops = 0L;
 +                    } else {
 +                        if (minIops == null || minIops <= 0) {
 +                            throw new InvalidParameterValueException("The min IOPS must be greater than 0.");
 +                        }
 +
 +                        if (maxIops == null) {
 +                            maxIops = 0L;
 +                        }
 +
 +                        if (minIops > maxIops) {
 +                            throw new InvalidParameterValueException("The min IOPS must be less than or equal to the max IOPS.");
 +                        }
 +                    }
 +                } else {
 +                    minIops = diskOffering.getMinIops();
 +                    maxIops = diskOffering.getMaxIops();
 +                }
 +            }
 +
 +            provisioningType = diskOffering.getProvisioningType();
 +
 +            if (!validateVolumeSizeRange(size)) {// convert size from mb to gb
 +                // for validation
 +                throw new InvalidParameterValueException("Invalid size for custom volume creation: " + size + " ,max volume size is:" + _maxVolumeSizeInGb);
 +            }
 +        } else { // create volume from snapshot
 +            Long snapshotId = cmd.getSnapshotId();
 +            SnapshotVO snapshotCheck = _snapshotDao.findById(snapshotId);
 +            if (snapshotCheck == null) {
 +                throw new InvalidParameterValueException("unable to find a snapshot with id " + snapshotId);
 +            }
 +
 +            if (snapshotCheck.getState() != Snapshot.State.BackedUp) {
 +                throw new InvalidParameterValueException("Snapshot id=" + snapshotId + " is not in " + Snapshot.State.BackedUp + " state yet and can't be used for volume creation");
 +            }
 +            parentVolume = _volsDao.findByIdIncludingRemoved(snapshotCheck.getVolumeId());
 +
 +            diskOfferingId = snapshotCheck.getDiskOfferingId();
 +            diskOffering = _diskOfferingDao.findById(diskOfferingId);
 +            if (zoneId == null) {
 +                // if zoneId is not provided, we default to create volume in the same zone as the snapshot zone.
 +                zoneId = snapshotCheck.getDataCenterId();
 +            }
 +            size = snapshotCheck.getSize(); // ; disk offering is used for tags
 +            // purposes
 +
 +            minIops = snapshotCheck.getMinIops();
 +            maxIops = snapshotCheck.getMaxIops();
 +
 +            provisioningType = diskOffering.getProvisioningType();
 +            // check snapshot permissions
 +            _accountMgr.checkAccess(caller, null, true, snapshotCheck);
 +
 +            // one step operation - create volume in VM's cluster and attach it
 +            // to the VM
 +            Long vmId = cmd.getVirtualMachineId();
 +            if (vmId != null) {
 +                // Check that the virtual machine ID is valid and it's a user vm
 +                UserVmVO vm = _userVmDao.findById(vmId);
 +                if (vm == null || vm.getType() != VirtualMachine.Type.User) {
 +                    throw new InvalidParameterValueException("Please specify a valid User VM.");
 +                }
 +
 +                // Check that the VM is in the correct state
 +                if (vm.getState() != State.Running && vm.getState() != State.Stopped) {
 +                    throw new InvalidParameterValueException("Please specify a VM that is either running or stopped.");
 +                }
 +
 +                // permission check
 +                _accountMgr.checkAccess(caller, null, false, vm);
 +            }
 +
 +        }
 +
 +        // Check that the resource limit for primary storage won't be exceeded
 +        _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, displayVolume, new Long(size));
 +
 +        // Verify that zone exists
 +        DataCenterVO zone = _dcDao.findById(zoneId);
 +        if (zone == null) {
 +            throw new InvalidParameterValueException("Unable to find zone by id " + zoneId);
 +        }
 +
 +        // Check if zone is disabled
 +        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(caller.getId())) {
 +            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zoneId);
 +        }
 +
 +        // If local storage is disabled then creation of volume with local disk
 +        // offering not allowed
 +        if (!zone.isLocalStorageEnabled() && diskOffering.getUseLocalStorage()) {
 +            throw new InvalidParameterValueException("Zone is not configured to use local storage but volume's disk offering " + diskOffering.getName() + " uses it");
 +        }
 +
 +        String userSpecifiedName = getVolumeNameFromCommand(cmd);
 +
 +        VolumeVO volume = commitVolume(cmd, caller, owner, displayVolume, zoneId, diskOfferingId, provisioningType, size, minIops, maxIops, parentVolume, userSpecifiedName,
 +                _uuidMgr.generateUuid(Volume.class, cmd.getCustomId()));
 +
 +        return volume;
 +    }
 +
 +    private VolumeVO commitVolume(final CreateVolumeCmd cmd, final Account caller, final Account owner, final Boolean displayVolume, final Long zoneId, final Long diskOfferingId,
 +            final Storage.ProvisioningType provisioningType, final Long size, final Long minIops, final Long maxIops, final VolumeVO parentVolume, final String userSpecifiedName, final String uuid) {
 +        return Transaction.execute(new TransactionCallback<VolumeVO>() {
 +            @Override
 +            public VolumeVO doInTransaction(TransactionStatus status) {
 +                VolumeVO volume = new VolumeVO(userSpecifiedName, -1, -1, -1, -1, new Long(-1), null, null, provisioningType, 0, Volume.Type.DATADISK);
 +                volume.setPoolId(null);
 +                volume.setUuid(uuid);
 +                volume.setDataCenterId(zoneId);
 +                volume.setPodId(null);
 +                volume.setAccountId(owner.getId());
 +                volume.setDomainId(owner.getDomainId());
 +                volume.setDiskOfferingId(diskOfferingId);
 +                volume.setSize(size);
 +                volume.setMinIops(minIops);
 +                volume.setMaxIops(maxIops);
 +                volume.setInstanceId(null);
 +                volume.setUpdated(new Date());
 +                volume.setDisplayVolume(displayVolume);
 +                if (parentVolume != null) {
 +                    volume.setTemplateId(parentVolume.getTemplateId());
 +                    volume.setFormat(parentVolume.getFormat());
 +                } else {
 +                    volume.setTemplateId(null);
 +                }
 +
 +                volume = _volsDao.persist(volume);
 +                if (cmd.getSnapshotId() == null && displayVolume) {
 +                    // for volume created from snapshot, create usage event after volume creation
 +                    UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_CREATE, volume.getAccountId(), volume.getDataCenterId(), volume.getId(), volume.getName(), diskOfferingId, null, size,
 +                            Volume.class.getName(), volume.getUuid(), displayVolume);
 +                }
 +
 +                CallContext.current().setEventDetails("Volume Id: " + volume.getUuid());
 +
 +                // Increment resource count during allocation; if actual creation fails,
 +                // decrement it
 +                _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.volume, displayVolume);
 +                _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.primary_storage, displayVolume, new Long(volume.getSize()));
 +                return volume;
 +            }
 +        });
 +    }
 +
 +    public boolean validateVolumeSizeRange(long size) {
 +        if (size < 0 || (size > 0 && size < (1024 * 1024 * 1024))) {
 +            throw new InvalidParameterValueException("Please specify a size of at least 1 GB.");
 +        } else if (size > (_maxVolumeSizeInGb * 1024 * 1024 * 1024)) {
 +            throw new InvalidParameterValueException("Requested volume size is " + size + ", but the maximum size allowed is " + _maxVolumeSizeInGb + " GB.");
 +        }
 +
 +        return true;
 +    }
 +
 +    @Override
 +    @DB
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_CREATE, eventDescription = "creating volume", async = true)
 +    public VolumeVO createVolume(CreateVolumeCmd cmd) {
 +        VolumeVO volume = _volsDao.findById(cmd.getEntityId());
 +        boolean created = true;
 +
 +        try {
 +            if (cmd.getSnapshotId() != null) {
 +                volume = createVolumeFromSnapshot(volume, cmd.getSnapshotId(), cmd.getVirtualMachineId());
 +                if (volume.getState() != Volume.State.Ready) {
 +                    created = false;
 +                }
 +
 +                // if VM Id is provided, attach the volume to the VM
 +                if (cmd.getVirtualMachineId() != null) {
 +                    try {
 +                        attachVolumeToVM(cmd.getVirtualMachineId(), volume.getId(), volume.getDeviceId());
 +                    } catch (Exception ex) {
 +                        StringBuilder message = new StringBuilder("Volume: ");
 +                        message.append(volume.getUuid());
 +                        message.append(" created successfully, but failed to attach the newly created volume to VM: ");
 +                        message.append(cmd.getVirtualMachineId());
 +                        message.append(" due to error: ");
 +                        message.append(ex.getMessage());
 +                        if (s_logger.isDebugEnabled()) {
 +                            s_logger.debug(message, ex);
 +                        }
 +                        throw new CloudRuntimeException(message.toString());
 +                    }
 +                }
 +            }
 +            return volume;
 +        } catch (Exception e) {
 +            created = false;
 +            VolumeInfo vol = volFactory.getVolume(cmd.getEntityId());
 +            vol.stateTransit(Volume.Event.DestroyRequested);
 +            throw new CloudRuntimeException("Failed to create volume: " + volume.getId(), e);
 +        } finally {
 +            if (!created) {
 +                s_logger.trace("Decrementing volume resource count for account id=" + volume.getAccountId() + " as volume failed to create on the backend");
 +                _resourceLimitMgr.decrementResourceCount(volume.getAccountId(), ResourceType.volume, cmd.getDisplayVolume());
 +                _resourceLimitMgr.decrementResourceCount(volume.getAccountId(), ResourceType.primary_storage, cmd.getDisplayVolume(), new Long(volume.getSize()));
 +            }
 +        }
 +    }
 +
 +    protected VolumeVO createVolumeFromSnapshot(VolumeVO volume, long snapshotId, Long vmId) throws StorageUnavailableException {
 +        VolumeInfo createdVolume = null;
 +        SnapshotVO snapshot = _snapshotDao.findById(snapshotId);
 +        snapshot.getVolumeId();
 +
 +        UserVmVO vm = null;
 +        if (vmId != null) {
 +            vm = _userVmDao.findById(vmId);
 +        }
 +
 +        // sync old snapshots to region store if necessary
 +
 +        createdVolume = _volumeMgr.createVolumeFromSnapshot(volume, snapshot, vm);
 +        VolumeVO volumeVo = _volsDao.findById(createdVolume.getId());
 +        UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_CREATE, createdVolume.getAccountId(), createdVolume.getDataCenterId(), createdVolume.getId(), createdVolume.getName(),
 +                createdVolume.getDiskOfferingId(), null, createdVolume.getSize(), Volume.class.getName(), createdVolume.getUuid(), volumeVo.isDisplayVolume());
 +
 +        return volumeVo;
 +    }
 +
 +    @Override
 +    @DB
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_RESIZE, eventDescription = "resizing volume", async = true)
 +    public VolumeVO resizeVolume(ResizeVolumeCmd cmd) throws ResourceAllocationException {
 +        Long newSize;
 +        Long newMinIops;
 +        Long newMaxIops;
 +        Integer newHypervisorSnapshotReserve;
 +        boolean shrinkOk = cmd.getShrinkOk();
 +
 +        VolumeVO volume = _volsDao.findById(cmd.getEntityId());
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("No such volume");
 +        }
 +
 +        // checking if there are any ongoing snapshots on the volume which is to be resized
 +        List<SnapshotVO> ongoingSnapshots = _snapshotDao.listByStatus(cmd.getId(), Snapshot.State.Creating, Snapshot.State.CreatedOnPrimary, Snapshot.State.BackingUp);
 +        if (ongoingSnapshots.size() > 0) {
 +            throw new CloudRuntimeException("There is/are unbacked up snapshot(s) on this volume, resize volume is not permitted, please try again later.");
 +        }
 +
 +        /* Does the caller have authority to act on this volume? */
 +        _accountMgr.checkAccess(CallContext.current().getCallingAccount(), null, true, volume);
 +
 +        DiskOfferingVO diskOffering = _diskOfferingDao.findById(volume.getDiskOfferingId());
 +        DiskOfferingVO newDiskOffering = null;
 +
 +        if (cmd.getNewDiskOfferingId() != null && volume.getDiskOfferingId() != cmd.getNewDiskOfferingId()) {
 +            newDiskOffering = _diskOfferingDao.findById(cmd.getNewDiskOfferingId());
 +        }
 +
 +        /* Only works for KVM/XenServer/VMware (or "Any") for now, and volumes with 'None' since they're just allocated in DB */
 +
 +        HypervisorType hypervisorType = _volsDao.getHypervisorType(volume.getId());
 +
 +        if (hypervisorType != HypervisorType.KVM && hypervisorType != HypervisorType.XenServer && hypervisorType != HypervisorType.VMware && hypervisorType != HypervisorType.Any
 +                && hypervisorType != HypervisorType.None) {
 +            throw new InvalidParameterValueException("Hypervisor " + hypervisorType + " does not support  rootdisksize override");
 +        }
 +
 +        if (volume.getState() != Volume.State.Ready && volume.getState() != Volume.State.Allocated) {
 +            throw new InvalidParameterValueException("Volume should be in ready or allocated state before attempting a resize. Volume " + volume.getUuid() + " is in state " + volume.getState() + ".");
 +        }
 +
 +        // if we are to use the existing disk offering
 +        if (newDiskOffering == null) {
 +            newSize = cmd.getSize();
 +            newHypervisorSnapshotReserve = volume.getHypervisorSnapshotReserve();
 +
 +            // if the caller is looking to change the size of the volume
 +            if (newSize != null) {
 +                if (!diskOffering.isCustomized() && !volume.getVolumeType().equals(Volume.Type.ROOT)) {
 +                    throw new InvalidParameterValueException("To change a volume's size without providing a new disk offering, its current disk offering must be "
 +                            + "customizable or it must be a root volume (if providing a disk offering, make sure it is different from the current disk offering).");
 +                }
 +
 +                // convert from bytes to GiB
 +                newSize = newSize << 30;
 +            } else {
 +                // no parameter provided; just use the original size of the volume
 +                newSize = volume.getSize();
 +            }
 +
 +            newMinIops = cmd.getMinIops();
 +
 +            if (newMinIops != null) {
 +                if (!volume.getVolumeType().equals(Volume.Type.ROOT) && (diskOffering.isCustomizedIops() == null || !diskOffering.isCustomizedIops())) {
 +                    throw new InvalidParameterValueException("The current disk offering does not support customization of the 'Min IOPS' parameter.");
 +                }
 +            } else {
 +                // no parameter provided; just use the original min IOPS of the volume
 +                newMinIops = volume.getMinIops();
 +            }
 +
 +            newMaxIops = cmd.getMaxIops();
 +
 +            if (newMaxIops != null) {
 +                if (!volume.getVolumeType().equals(Volume.Type.ROOT) && (diskOffering.isCustomizedIops() == null || !diskOffering.isCustomizedIops())) {
 +                    throw new InvalidParameterValueException("The current disk offering does not support customization of the 'Max IOPS' parameter.");
 +                }
 +            } else {
 +                // no parameter provided; just use the original max IOPS of the volume
 +                newMaxIops = volume.getMaxIops();
 +            }
 +
 +            validateIops(newMinIops, newMaxIops);
 +        } else {
 +            if (newDiskOffering.getRemoved() != null) {
 +                throw new InvalidParameterValueException("Requested disk offering has been removed.");
 +            }
 +
 +            if (!DiskOfferingVO.Type.Disk.equals(newDiskOffering.getType())) {
 +                throw new InvalidParameterValueException("Requested disk offering type is invalid.");
 +            }
 +
 +            if (diskOffering.getTags() != null) {
 +                if (!StringUtils.areTagsEqual(diskOffering.getTags(), newDiskOffering.getTags())) {
 +                    throw new InvalidParameterValueException("The tags on the new and old disk offerings must match.");
 +                }
 +            } else if (newDiskOffering.getTags() != null) {
 +                throw new InvalidParameterValueException("There are no tags on the current disk offering. The new disk offering needs to have no tags, as well.");
 +            }
 +
 +            if (newDiskOffering.getDomainId() != null) {
 +                // not a public offering; check access
 +                _configMgr.checkDiskOfferingAccess(CallContext.current().getCallingAccount(), newDiskOffering);
 +            }
 +
 +            if (newDiskOffering.isCustomized()) {
 +                newSize = cmd.getSize();
 +
 +                if (newSize == null) {
 +                    throw new InvalidParameterValueException("The new disk offering requires that a size be specified.");
 +                }
 +
 +                // convert from GiB to bytes
 +                newSize = newSize << 30;
 +            } else {
 +                if (cmd.getSize() != null) {
 +                    throw new InvalidParameterValueException("You cannnot pass in a custom disk size to a non-custom disk offering.");
 +                }
 +
 +                newSize = newDiskOffering.getDiskSize();
 +            }
 +
 +            if (!volume.getSize().equals(newSize) && !volume.getVolumeType().equals(Volume.Type.DATADISK)) {
 +                throw new InvalidParameterValueException("Only data volumes can be resized via a new disk offering.");
 +            }
 +
 +            if (newDiskOffering.isCustomizedIops() != null && newDiskOffering.isCustomizedIops()) {
 +                newMinIops = cmd.getMinIops() != null ? cmd.getMinIops() : volume.getMinIops();
 +                newMaxIops = cmd.getMaxIops() != null ? cmd.getMaxIops() : volume.getMaxIops();
 +
 +                validateIops(newMinIops, newMaxIops);
 +            } else {
 +                newMinIops = newDiskOffering.getMinIops();
 +                newMaxIops = newDiskOffering.getMaxIops();
 +            }
 +
 +            // if the hypervisor snapshot reserve value is null, it must remain null (currently only KVM uses null and null is all KVM uses for a value here)
 +            newHypervisorSnapshotReserve = volume.getHypervisorSnapshotReserve() != null ? newDiskOffering.getHypervisorSnapshotReserve() : null;
 +        }
 +
 +        long currentSize = volume.getSize();
 +
 +        // if the caller is looking to change the size of the volume
 +        if (currentSize != newSize) {
 +            if (volume.getInstanceId() != null) {
 +                // Check that VM to which this volume is attached does not have VM snapshots
 +                if (_vmSnapshotDao.findByVm(volume.getInstanceId()).size() > 0) {
 +                    throw new InvalidParameterValueException("A volume that is attached to a VM with any VM snapshots cannot be resized.");
 +                }
 +            }
 +
 +            if (!validateVolumeSizeRange(newSize)) {
 +                throw new InvalidParameterValueException("Requested size out of range");
 +            }
 +
 +            Long storagePoolId = volume.getPoolId();
 +
 +            if (storagePoolId != null) {
 +                StoragePoolVO storagePoolVO = _storagePoolDao.findById(storagePoolId);
 +
 +                if (storagePoolVO.isManaged()) {
 +                    Long instanceId = volume.getInstanceId();
 +
 +                    if (instanceId != null) {
 +                        VMInstanceVO vmInstanceVO = _vmInstanceDao.findById(instanceId);
 +
 +                        if (vmInstanceVO.getHypervisorType() == HypervisorType.KVM && vmInstanceVO.getState() != State.Stopped) {
 +                            throw new CloudRuntimeException("This kind of KVM disk cannot be resized while it is connected to a VM that's not in the Stopped state.");
 +                        }
 +                    }
 +                }
 +            }
 +
 +            /*
 +             * Let's make certain they (think they) know what they're doing if they
 +             * want to shrink by forcing them to provide the shrinkok parameter.
 +             * This will be checked again at the hypervisor level where we can see
 +             * the actual disk size.
 +             */
 +            if (currentSize > newSize && !shrinkOk) {
 +                throw new InvalidParameterValueException("Going from existing size of " + currentSize + " to size of " + newSize + " would shrink the volume."
 +                        + "Need to sign off by supplying the shrinkok parameter with value of true.");
 +            }
 +
 +            if (newSize > currentSize) {
 +                /* Check resource limit for this account on primary storage resource */
 +                _resourceLimitMgr.checkResourceLimit(_accountMgr.getAccount(volume.getAccountId()), ResourceType.primary_storage, volume.isDisplayVolume(),
 +                        new Long(newSize - currentSize).longValue());
 +            }
 +        }
 +
 +        // Note: The storage plug-in in question should perform validation on the IOPS to check if a sufficient number of IOPS is available to perform
 +        // the requested change
 +
 +        /* If this volume has never been beyond allocated state, short circuit everything and simply update the database. */
 +        if (volume.getState() == Volume.State.Allocated) {
 +            s_logger.debug("Volume is in the allocated state, but has never been created. Simply updating database with new size and IOPS.");
 +
 +            volume.setSize(newSize);
 +            volume.setMinIops(newMinIops);
 +            volume.setMaxIops(newMaxIops);
 +            volume.setHypervisorSnapshotReserve(newHypervisorSnapshotReserve);
 +
 +            if (newDiskOffering != null) {
 +                volume.setDiskOfferingId(cmd.getNewDiskOfferingId());
 +            }
 +
 +            _volsDao.update(volume.getId(), volume);
 +
 +            return volume;
 +        }
 +
 +        UserVmVO userVm = _userVmDao.findById(volume.getInstanceId());
 +
 +        if (userVm != null) {
 +            if (volume.getVolumeType().equals(Volume.Type.ROOT) && userVm.getPowerState() != VirtualMachine.PowerState.PowerOff && hypervisorType == HypervisorType.VMware) {
 +                s_logger.error(" For ROOT volume resize VM should be in Power Off state.");
 +                throw new InvalidParameterValueException("VM current state is : " + userVm.getPowerState() + ". But VM should be in " + VirtualMachine.PowerState.PowerOff + " state.");
 +            }
 +            // serialize VM operation
 +            AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +
 +            if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
 +                // avoid re-entrance
 +
 +                VmWorkJobVO placeHolder = null;
 +
 +                placeHolder = createPlaceHolderWork(userVm.getId());
 +
 +                try {
 +                    return orchestrateResizeVolume(volume.getId(), currentSize, newSize, newMinIops, newMaxIops, newHypervisorSnapshotReserve,
 +                            newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
 +                } finally {
 +                    _workJobDao.expunge(placeHolder.getId());
 +                }
 +            } else {
 +                Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize, newMinIops, newMaxIops, newHypervisorSnapshotReserve,
 +                        newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
 +
 +                try {
 +                    outcome.get();
 +                } catch (InterruptedException e) {
 +                    throw new RuntimeException("Operation was interrupted", e);
 +                } catch (java.util.concurrent.ExecutionException e) {
 +                    throw new RuntimeException("Execution exception", e);
 +                }
 +
 +                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
 +
 +                if (jobResult != null) {
 +                    if (jobResult instanceof ConcurrentOperationException) {
 +                        throw (ConcurrentOperationException)jobResult;
 +                    } else if (jobResult instanceof ResourceAllocationException) {
 +                        throw (ResourceAllocationException)jobResult;
 +                    } else if (jobResult instanceof RuntimeException) {
 +                        throw (RuntimeException)jobResult;
 +                    } else if (jobResult instanceof Throwable) {
 +                        throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
 +                    } else if (jobResult instanceof Long) {
 +                        return _volsDao.findById((Long)jobResult);
 +                    }
 +                }
 +
 +                return volume;
 +            }
 +        }
 +
 +        return orchestrateResizeVolume(volume.getId(), currentSize, newSize, newMinIops, newMaxIops, newHypervisorSnapshotReserve, newDiskOffering != null ? cmd.getNewDiskOfferingId() : null,
 +                shrinkOk);
 +    }
 +
 +    private void validateIops(Long minIops, Long maxIops) {
 +        if ((minIops == null && maxIops != null) || (minIops != null && maxIops == null)) {
 +            throw new InvalidParameterValueException("Either 'miniops' and 'maxiops' must both be provided or neither must be provided.");
 +        }
 +
 +        if (minIops != null && maxIops != null) {
 +            if (minIops > maxIops) {
 +                throw new InvalidParameterValueException("The 'miniops' parameter must be less than or equal to the 'maxiops' parameter.");
 +            }
 +        }
 +    }
 +
 +    private VolumeVO orchestrateResizeVolume(long volumeId, long currentSize, long newSize, Long newMinIops, Long newMaxIops, Integer newHypervisorSnapshotReserve, Long newDiskOfferingId,
 +            boolean shrinkOk) {
 +        VolumeVO volume = _volsDao.findById(volumeId);
 +        UserVmVO userVm = _userVmDao.findById(volume.getInstanceId());
 +        StoragePoolVO storagePool = _storagePoolDao.findById(volume.getPoolId());
 +        boolean isManaged = storagePool.isManaged();
++
++        if (!storageMgr.storagePoolHasEnoughSpaceForResize(storagePool, currentSize, newSize)) {
++            throw new CloudRuntimeException("Storage pool " + storagePool.getName() + " does not have enough space to resize volume " + volume.getName());
++        }
++
 +        /*
 +         * get a list of hosts to send the commands to, try the system the
 +         * associated vm is running on first, then the last known place it ran.
 +         * If not attached to a userVm, we pass 'none' and resizevolume.sh is ok
 +         * with that since it only needs the vm name to live resize
 +         */
 +        long[] hosts = null;
 +        String instanceName = "none";
 +        if (userVm != null) {
 +            instanceName = userVm.getInstanceName();
 +            if (userVm.getHostId() != null) {
 +                hosts = new long[] {userVm.getHostId()};
 +            } else if (userVm.getLastHostId() != null) {
 +                hosts = new long[] {userVm.getLastHostId()};
 +            }
 +
 +            final String errorMsg = "The VM must be stopped or the disk detached in order to resize with the XenServer Hypervisor.";
 +
 +            if (storagePool.isManaged() && storagePool.getHypervisor() == HypervisorType.Any && hosts != null && hosts.length > 0) {
 +                HostVO host = _hostDao.findById(hosts[0]);
 +
 +                if (currentSize != newSize && host.getHypervisorType() == HypervisorType.XenServer && !userVm.getState().equals(State.Stopped)) {
 +                    throw new InvalidParameterValueException(errorMsg);
 +                }
 +            }
 +
 +            /* Xen only works offline, SR does not support VDI.resizeOnline */
 +            if (currentSize != newSize && _volsDao.getHypervisorType(volume.getId()) == HypervisorType.XenServer && !userVm.getState().equals(State.Stopped)) {
 +                throw new InvalidParameterValueException(errorMsg);
 +            }
 +        }
 +
 +        ResizeVolumePayload payload = new ResizeVolumePayload(newSize, newMinIops, newMaxIops, newHypervisorSnapshotReserve, shrinkOk, instanceName, hosts, isManaged);
 +
 +        try {
 +            VolumeInfo vol = volFactory.getVolume(volume.getId());
 +            vol.addPayload(payload);
 +
 +            // this call to resize has a different impact depending on whether the
 +            // underlying primary storage is managed or not
 +            // if managed, this is the chance for the plug-in to change the size and/or IOPS values
 +            // if not managed, this is the chance for the plug-in to talk to the hypervisor layer
 +            // to change the size of the disk
 +            AsyncCallFuture<VolumeApiResult> future = volService.resize(vol);
 +            VolumeApiResult result = future.get();
 +
 +            if (result.isFailed()) {
 +                s_logger.warn("Failed to resize the volume " + volume);
 +                String details = "";
 +                if (result.getResult() != null && !result.getResult().isEmpty()) {
 +                    details = result.getResult();
 +                }
 +                throw new CloudRuntimeException(details);
 +            }
 +
 +            // managed storage is designed in such a way that the storage plug-in does not
 +            // talk to the hypervisor layer; as such, if the storage is managed and the
 +            // current and new sizes are different, then CloudStack (i.e. not a storage plug-in)
 +            // needs to tell the hypervisor to resize the disk
 +            if (storagePool.isManaged() && currentSize != newSize) {
 +                if (hosts != null && hosts.length > 0) {
 +                    HostVO hostVO = _hostDao.findById(hosts[0]);
 +
 +                    if (hostVO.getHypervisorType() != HypervisorType.KVM) {
 +                        volService.resizeVolumeOnHypervisor(volumeId, newSize, hosts[0], instanceName);
 +                    }
 +                }
 +
 +                volume.setSize(newSize);
 +
 +                _volsDao.update(volume.getId(), volume);
 +            }
 +
 +            volume = _volsDao.findById(volume.getId());
 +
 +            if (newDiskOfferingId != null) {
 +                volume.setDiskOfferingId(newDiskOfferingId);
 +            }
 +
 +            if (currentSize != newSize) {
 +                volume.setSize(newSize);
 +            }
 +
 +            _volsDao.update(volume.getId(), volume);
 +
 +            /* Update resource count for the account on primary storage resource */
 +            if (!shrinkOk) {
 +                _resourceLimitMgr.incrementResourceCount(volume.getAccountId(), ResourceType.primary_storage, volume.isDisplayVolume(), new Long(newSize - currentSize));
 +            } else {
 +                _resourceLimitMgr.decrementResourceCount(volume.getAccountId(), ResourceType.primary_storage, volume.isDisplayVolume(), new Long(currentSize - newSize));
 +            }
 +            return volume;
 +        } catch (InterruptedException e) {
 +            s_logger.warn("failed get resize volume result", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        } catch (ExecutionException e) {
 +            s_logger.warn("failed get resize volume result", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        } catch (Exception e) {
 +            s_logger.warn("failed get resize volume result", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        }
 +    }
 +
 +    @DB
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_DELETE, eventDescription = "deleting volume")
 +    /**
 +     * Executes the removal of the volume. If the volume is only allocated we do not try to remove it from primary and secondary storage.
 +     * Otherwise, after the removal in the database, we will try to remove the volume from both primary and secondary storage.
 +     */
 +    public boolean deleteVolume(long volumeId, Account caller) throws ConcurrentOperationException {
 +        VolumeVO volume = retrieveAndValidateVolume(volumeId, caller);
 +        try {
 +            destroyVolumeIfPossible(volume);
 +            // Mark volume as removed if volume has not been created on primary or secondary
 +            if (volume.getState() == Volume.State.Allocated) {
 +                _volsDao.remove(volumeId);
 +                stateTransitTo(volume, Volume.Event.DestroyRequested);
 +                return true;
 +            }
 +            expungeVolumesInPrimaryStorageIfNeeded(volume);
 +            expungeVolumesInSecondaryStorageIfNeeded(volume);
 +            cleanVolumesCache(volume);
 +            return true;
 +        } catch (InterruptedException | ExecutionException | NoTransitionException e) {
 +            s_logger.warn("Failed to expunge volume: " + volume.getUuid(), e);
 +            return false;
 +        }
 +    }
 +
 +    /**
 +     *  Retrieves and validates the volume for the {@link #deleteVolume(long, Account)} method. The following validation are executed.
 +     *  <ul>
 +     *      <li> if no volume is found in the database, we throw an {@link InvalidParameterValueException};
 +     *      <li> if there are snapshots operation on the volume we cannot delete it. Therefore, an {@link InvalidParameterValueException} is thrown;
 +     *      <li> if the volume is still attached to a VM we throw an {@link InvalidParameterValueException};
 +     *      <li> if volume state is in {@link Volume.State#UploadOp}, we check the {@link VolumeDataStoreVO}. Then, if the {@link VolumeDataStoreVO} for the given volume has download status of {@link VMTemplateStorageResourceAssoc.Status#DOWNLOAD_IN_PROGRESS}, an exception is throw;
 +     *      <li> if the volume state is in {@link Volume.State#NotUploaded} or if the state is {@link Volume.State#UploadInProgress}, an {@link InvalidParameterValueException} is thrown;
 +     *      <li> we also check if the user has access to the given volume using {@link AccountManager#checkAccess(Account, org.apache.cloudstack.acl.SecurityChecker.AccessType, boolean, String)}.
 +     *  </ul>
 +     *
 +     *  After all validations we return the volume object.
 +     */
 +    protected VolumeVO retrieveAndValidateVolume(long volumeId, Account caller) {
 +        VolumeVO volume = _volsDao.findById(volumeId);
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("Unable to find volume with ID: " + volumeId);
 +        }
 +        if (!_snapshotMgr.canOperateOnVolume(volume)) {
 +            throw new InvalidParameterValueException("There are snapshot operations in progress on the volume, unable to delete it");
 +        }
 +        if (volume.getInstanceId() != null) {
 +            throw new InvalidParameterValueException("Please specify a volume that is not attached to any VM.");
 +        }
 +        if (volume.getState() == Volume.State.UploadOp) {
 +            VolumeDataStoreVO volumeStore = _volumeStoreDao.findByVolume(volume.getId());
 +            if (volumeStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.DOWNLOAD_IN_PROGRESS) {
 +                throw new InvalidParameterValueException("Please specify a volume that is not uploading");
 +            }
 +        }
 +        if (volume.getState() == Volume.State.NotUploaded || volume.getState() == Volume.State.UploadInProgress) {
 +            throw new InvalidParameterValueException("The volume is either getting uploaded or it may be initiated shortly, please wait for it to be completed");
 +        }
 +        _accountMgr.checkAccess(caller, null, true, volume);
 +        return volume;
 +    }
 +
 +    /**
 +     * Destroy the volume if possible and then decrement the following resource types.
 +     * <ul>
 +     *  <li> {@link ResourceType#volume};
 +     *  <li> {@link ResourceType#primary_storage}
 +     * </ul>
 +     *
 +     * A volume can be destroyed if it is not in any of the following states.
 +     * <ul>
 +     *  <li> {@value Volume.State#Destroy};
 +     *  <li> {@value Volume.State#Expunging};
 +     *  <li> {@value Volume.State#Expunged}.
 +     * </ul>
 +     *
 +     * The volume is destroyed via {@link VolumeService#destroyVolume(long)} method.
 +     */
 +    protected void destroyVolumeIfPossible(VolumeVO volume) {
 +        if (volume.getState() != Volume.State.Destroy && volume.getState() != Volume.State.Expunging && volume.getState() != Volume.State.Expunged) {
 +            volService.destroyVolume(volume.getId());
 +
 +            // Decrement the resource count for volumes and primary storage belonging user VM's only
 +            _resourceLimitMgr.decrementResourceCount(volume.getAccountId(), ResourceType.volume, volume.isDisplayVolume());
 +            _resourceLimitMgr.decrementResourceCount(volume.getAccountId(), ResourceType.primary_storage, volume.isDisplayVolume(), volume.getSize());
 +        }
 +    }
 +
 +    /**
 +     * We will check if the given volume is in the primary storage. If it is, we will execute an asynchronous call to delete it there.
 +     * If the volume is not in the primary storage, we do nothing here.
 +     */
 +    protected void expungeVolumesInPrimaryStorageIfNeeded(VolumeVO volume) throws InterruptedException, ExecutionException {
 +        VolumeInfo volOnPrimary = volFactory.getVolume(volume.getId(), DataStoreRole.Primary);
 +        if (volOnPrimary != null) {
 +            s_logger.info("Expunging volume " + volume.getId() + " from primary data store");
 +            AsyncCallFuture<VolumeApiResult> future = volService.expungeVolumeAsync(volOnPrimary);
 +            future.get();
 +        }
 +    }
 +
 +    /**
 +     * We will check if the given volume is in the secondary storage. If the volume is not in the primary storage, we do nothing here.
 +     * If it is, we will execute an asynchronous call to delete it there. Then, we decrement the {@link ResourceType#secondary_storage} for the account that owns the volume.
 +     */
 +    protected void expungeVolumesInSecondaryStorageIfNeeded(VolumeVO volume) throws InterruptedException, ExecutionException {
 +        VolumeInfo volOnSecondary = volFactory.getVolume(volume.getId(), DataStoreRole.Image);
 +        if (volOnSecondary != null) {
 +            s_logger.info("Expunging volume " + volume.getId() + " from secondary data store");
 +            AsyncCallFuture<VolumeApiResult> future2 = volService.expungeVolumeAsync(volOnSecondary);
 +            future2.get();
 +
 +            _resourceLimitMgr.decrementResourceCount(volOnSecondary.getAccountId(), ResourceType.secondary_storage, volOnSecondary.getSize());
 +        }
 +    }
 +
 +    /**
 +     * Clean volumes cache entries (if they exist).
 +     */
 +    protected void cleanVolumesCache(VolumeVO volume) {
 +        List<VolumeInfo> cacheVols = volFactory.listVolumeOnCache(volume.getId());
 +        if (CollectionUtils.isEmpty(cacheVols)) {
 +            return;
 +        }
 +        for (VolumeInfo volOnCache : cacheVols) {
 +            s_logger.info("Delete volume from image cache store: " + volOnCache.getDataStore().getName());
 +            volOnCache.delete();
 +        }
 +    }
 +
 +    protected boolean stateTransitTo(Volume vol, Volume.Event event) throws NoTransitionException {
 +        return _volStateMachine.transitTo(vol, event, null, _volsDao);
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_ATTACH, eventDescription = "attaching volume", async = true)
 +    public Volume attachVolumeToVM(AttachVolumeCmd command) {
 +        return attachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
 +    }
 +
 +    private Volume orchestrateAttachVolumeToVM(Long vmId, Long volumeId, Long deviceId) {
 +        VolumeInfo volumeToAttach = volFactory.getVolume(volumeId);
 +
 +        if (volumeToAttach.isAttachedVM()) {
 +            throw new CloudRuntimeException("This volume is already attached to a VM.");
 +        }
 +
 +        UserVmVO vm = _userVmDao.findById(vmId);
 +        VolumeVO exstingVolumeOfVm = null;
 +        List<VolumeVO> rootVolumesOfVm = _volsDao.findByInstanceAndType(vmId, Volume.Type.ROOT);
 +        if (rootVolumesOfVm.size() > 1) {
 +            throw new CloudRuntimeException("The VM " + vm.getHostName() + " has more than one ROOT volume and is in an invalid state.");
 +        } else {
 +            if (!rootVolumesOfVm.isEmpty()) {
 +                exstingVolumeOfVm = rootVolumesOfVm.get(0);
 +            } else {
 +                // locate data volume of the vm
 +                List<VolumeVO> diskVolumesOfVm = _volsDao.findByInstanceAndType(vmId, Volume.Type.DATADISK);
 +                for (VolumeVO diskVolume : diskVolumesOfVm) {
 +                    if (diskVolume.getState() != Volume.State.Allocated) {
 +                        exstingVolumeOfVm = diskVolume;
 +                        break;
 +                    }
 +                }
 +            }
 +        }
 +
 +        HypervisorType rootDiskHyperType = vm.getHypervisorType();
 +        HypervisorType volumeToAttachHyperType = _volsDao.getHypervisorType(volumeToAttach.getId());
 +
 +        VolumeInfo newVolumeOnPrimaryStorage = volumeToAttach;
 +
 +        //don't create volume on primary storage if its being attached to the vm which Root's volume hasn't been created yet
 +        StoragePoolVO destPrimaryStorage = null;
 +        if (exstingVolumeOfVm != null && !exstingVolumeOfVm.getState().equals(Volume.State.Allocated)) {
 +            destPrimaryStorage = _storagePoolDao.findById(exstingVolumeOfVm.getPoolId());
 +        }
 +
 +        boolean volumeOnSecondary = volumeToAttach.getState() == Volume.State.Uploaded;
 +
 +        if (destPrimaryStorage != null && (volumeToAttach.getState() == Volume.State.Allocated || volumeOnSecondary)) {
 +            try {
 +                newVolumeOnPrimaryStorage = _volumeMgr.createVolumeOnPrimaryStorage(vm, volumeToAttach, rootDiskHyperType, destPrimaryStorage);
 +            } catch (NoTransitionException e) {
 +                s_logger.debug("Failed to create volume on primary storage", e);
 +                throw new CloudRuntimeException("Failed to create volume on primary storage", e);
 +            }
 +        }
 +
 +        // reload the volume from db
 +        newVolumeOnPrimaryStorage = volFactory.getVolume(newVolumeOnPrimaryStorage.getId());
 +        boolean moveVolumeNeeded = needMoveVolume(exstingVolumeOfVm, newVolumeOnPrimaryStorage);
 +
 +        if (moveVolumeNeeded) {
 +            PrimaryDataStoreInfo primaryStore = (PrimaryDataStoreInfo)newVolumeOnPrimaryStorage.getDataStore();
 +            if (primaryStore.isLocal()) {
 +                throw new CloudRuntimeException(
 +                        "Failed to attach local data volume " + volumeToAttach.getName() + " to VM " + vm.getDisplayName() + " as migration of local data volume is not allowed");
 +            }
 +            StoragePoolVO vmRootVolumePool = _storagePoolDao.findById(exstingVolumeOfVm.getPoolId());
 +
 +            try {
 +                newVolumeOnPrimaryStorage = _volumeMgr.moveVolume(newVolumeOnPrimaryStorage, vmRootVolumePool.getDataCenterId(), vmRootVolumePool.getPodId(), vmRootVolumePool.getClusterId(),
 +                        volumeToAttachHyperType);
 +            } catch (ConcurrentOperationException e) {
 +                s_logger.debug("move volume failed", e);
 +                throw new CloudRuntimeException("move volume failed", e);
 +            } catch (StorageUnavailableException e) {
 +                s_logger.debug("move volume failed", e);
 +                throw new CloudRuntimeException("move volume failed", e);
 +            }
 +        }
 +        VolumeVO newVol = _volsDao.findById(newVolumeOnPrimaryStorage.getId());
 +        // Getting the fresh vm object in case of volume migration to check the current state of VM
 +        if (moveVolumeNeeded || volumeOnSecondary) {
 +            vm = _userVmDao.findById(vmId);
 +            if (vm == null) {
 +                throw new InvalidParameterValueException("VM not found.");
 +            }
 +        }
 +        newVol = sendAttachVolumeCommand(vm, newVol, deviceId);
 +        return newVol;
 +    }
 +
 +    public Volume attachVolumeToVM(Long vmId, Long volumeId, Long deviceId) {
 +        Account caller = CallContext.current().getCallingAccount();
 +
 +        // Check that the volume ID is valid
 +        VolumeInfo volumeToAttach = volFactory.getVolume(volumeId);
 +        // Check that the volume is a data volume
 +        if (volumeToAttach == null || !(volumeToAttach.getVolumeType() == Volume.Type.DATADISK || volumeToAttach.getVolumeType() == Volume.Type.ROOT)) {
 +            throw new InvalidParameterValueException("Please specify a volume with the valid type: " + Volume.Type.ROOT.toString() + " or " + Volume.Type.DATADISK.toString());
 +        }
 +
 +        // Check that the volume is not currently attached to any VM
 +        if (volumeToAttach.getInstanceId() != null) {
 +            throw new InvalidParameterValueException("Please specify a volume that is not attached to any VM.");
 +        }
 +
 +        // Check that the volume is not destroyed
 +        if (volumeToAttach.getState() == Volume.State.Destroy) {
 +            throw new InvalidParameterValueException("Please specify a volume that is not destroyed.");
 +        }
 +
 +        // Check that the virtual machine ID is valid and it's a user vm
 +        UserVmVO vm = _userVmDao.findById(vmId);
 +        if (vm == null || vm.getType() != VirtualMachine.Type.User) {
 +            throw new InvalidParameterValueException("Please specify a valid User VM.");
 +        }
 +
 +        // Check that the VM is in the correct state
 +        if (vm.getState() != State.Running && vm.getState() != State.Stopped) {
 +            throw new InvalidParameterValueException("Please specify a VM that is either running or stopped.");
 +        }
 +
 +        // Check that the VM and the volume are in the same zone
 +        if (vm.getDataCenterId() != volumeToAttach.getDataCenterId()) {
 +            throw new InvalidParameterValueException("Please specify a VM that is in the same zone as the volume.");
 +        }
 +
 +        // Check that the device ID is valid
 +        if (deviceId != null) {
 +            // validate ROOT volume type
 +            if (deviceId.longValue() == 0) {
 +                validateRootVolumeDetachAttach(_volsDao.findById(volumeToAttach.getId()), vm);
 +                // vm shouldn't have any volume with deviceId 0
 +                if (!_volsDao.findByInstanceAndDeviceId(vm.getId(), 0).isEmpty()) {
 +                    throw new InvalidParameterValueException("Vm already has root volume attached to it");
 +                }
 +                // volume can't be in Uploaded state
 +                if (volumeToAttach.getState() == Volume.State.Uploaded) {
 +                    throw new InvalidParameterValueException("No support for Root volume attach in state " + Volume.State.Uploaded);
 +                }
 +            }
 +        }
 +
 +        // Check that the number of data volumes attached to VM is less than
 +        // that supported by hypervisor
 +        if (deviceId == null || deviceId.longValue() != 0) {
 +            List<VolumeVO> existingDataVolumes = _volsDao.findByInstanceAndType(vmId, Volume.Type.DATADISK);
 +            int maxAttachableDataVolumesSupported = getMaxDataVolumesSupported(vm);
 +            if (existingDataVolumes.size() >= maxAttachableDataVolumesSupported) {
 +                throw new InvalidParameterValueException(
 +                        "The specified VM already has the maximum number of data disks (" + maxAttachableDataVolumesSupported + ") attached. Please specify another VM.");
 +            }
 +        }
 +
 +        // If local storage is disabled then attaching a volume with local disk
 +        // offering not allowed
 +        DataCenterVO dataCenter = _dcDao.findById(volumeToAttach.getDataCenterId());
 +        if (!dataCenter.isLocalStorageEnabled()) {
 +            DiskOfferingVO diskOffering = _diskOfferingDao.findById(volumeToAttach.getDiskOfferingId());
 +            if (diskOffering.getUseLocalStorage()) {
 +                throw new InvalidParameterValueException("Zone is not configured to use local storage but volume's disk offering " + diskOffering.getName() + " uses it");
 +            }
 +        }
 +
 +        // if target VM has associated VM snapshots
 +        List<VMSnapshotVO> vmSnapshots = _vmSnapshotDao.findByVm(vmId);
 +        if (vmSnapshots.size() > 0) {
 +            throw new InvalidParameterValueException("Unable to attach volume, please specify a VM that does not have VM snapshots");
 +        }
 +
 +        // permission check
 +        _accountMgr.checkAccess(caller, null, true, volumeToAttach, vm);
 +
 +        if (!(Volume.State.Allocated.equals(volumeToAttach.getState()) || Volume.State.Ready.equals(volumeToAttach.getState()) || Volume.State.Uploaded.equals(volumeToAttach.getState()))) {
 +            throw new InvalidParameterValueException("Volume state must be in Allocated, Ready or in Uploaded state");
 +        }
 +
 +        Account owner = _accountDao.findById(volumeToAttach.getAccountId());
 +
 +        if (!(volumeToAttach.getState() == Volume.State.Allocated || volumeToAttach.getState() == Volume.State.Ready)) {
 +            try {
 +                _resourceLimitMgr.checkResourceLimit(owner, ResourceType.primary_storage, volumeToAttach.getSize());
 +            } catch (ResourceAllocationException e) {
 +                s_logger.error("primary storage resource limit check failed", e);
 +                throw new InvalidParameterValueException(e.getMessage());
 +            }
 +        }
 +
 +        HypervisorType rootDiskHyperType = vm.getHypervisorType();
 +        HypervisorType volumeToAttachHyperType = _volsDao.getHypervisorType(volumeToAttach.getId());
 +
 +        StoragePoolVO volumeToAttachStoragePool = _storagePoolDao.findById(volumeToAttach.getPoolId());
 +
 +        // managed storage can be used for different types of hypervisors
 +        // only perform this check if the volume's storage pool is not null and not managed
 +        if (volumeToAttachStoragePool != null && !volumeToAttachStoragePool.isManaged()) {
 +            if (volumeToAttachHyperType != HypervisorType.None && rootDiskHyperType != volumeToAttachHyperType) {
 +                throw new InvalidParameterValueException("Can't attach a volume created by: " + volumeToAttachHyperType + " to a " + rootDiskHyperType + " vm");
 +            }
 +        }
 +
 +        AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +
 +        if (asyncExecutionContext != null) {
 +            AsyncJob job = asyncExecutionContext.getJob();
 +
 +            if (s_logger.isInfoEnabled()) {
 +                s_logger.info("Trying to attaching volume " + volumeId + " to vm instance:" + vm.getId() + ", update async job-" + job.getId() + " progress status");
 +            }
 +
 +            _jobMgr.updateAsyncJobAttachment(job.getId(), "Volume", volumeId);
 +        }
 +
 +        AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +        if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
 +            // avoid re-entrance
 +
 +            VmWorkJobVO placeHolder = null;
 +            placeHolder = createPlaceHolderWork(vmId);
 +            try {
 +                return orchestrateAttachVolumeToVM(vmId, volumeId, deviceId);
 +            } finally {
 +                _workJobDao.expunge(placeHolder.getId());
 +            }
 +
 +        } else {
 +            Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(vmId, volumeId, deviceId);
 +
 +            Volume vol = null;
 +            try {
 +                outcome.get();
 +            } catch (InterruptedException e) {
 +                throw new RuntimeException("Operation is interrupted", e);
 +            } catch (java.util.concurrent.ExecutionException e) {
 +                throw new RuntimeException("Execution excetion", e);
 +            }
 +
 +            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
 +            if (jobResult != null) {
 +                if (jobResult instanceof ConcurrentOperationException) {
 +                    throw (ConcurrentOperationException)jobResult;
 +                } else if (jobResult instanceof InvalidParameterValueException) {
 +                    throw (InvalidParameterValueException)jobResult;
 +                } else if (jobResult instanceof RuntimeException) {
 +                    throw (RuntimeException)jobResult;
 +                } else if (jobResult instanceof Throwable) {
 +                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
 +                } else if (jobResult instanceof Long) {
 +                    vol = _volsDao.findById((Long)jobResult);
 +                }
 +            }
 +            return vol;
 +        }
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_UPDATE, eventDescription = "updating volume", async = true)
 +    public Volume updateVolume(long volumeId, String path, String state, Long storageId, Boolean displayVolume, String customId, long entityOwnerId, String chainInfo) {
 +
 +        VolumeVO volume = _volsDao.findById(volumeId);
 +
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("The volume id doesn't exist");
 +        }
 +
 +        if (path != null) {
 +            volume.setPath(path);
 +        }
 +
 +        if (chainInfo != null) {
 +            volume.setChainInfo(chainInfo);
 +        }
 +
 +        if (state != null) {
 +            try {
 +                Volume.State volumeState = Volume.State.valueOf(state);
 +                volume.setState(volumeState);
 +            } catch (IllegalArgumentException ex) {
 +                throw new InvalidParameterValueException("Invalid volume state specified");
 +            }
 +        }
 +
 +        if (storageId != null) {
 +            StoragePool pool = _storagePoolDao.findById(storageId);
 +            if (pool.getDataCenterId() != volume.getDataCenterId()) {
 +                throw new InvalidParameterValueException("Invalid storageId specified; refers to the pool outside of the volume's zone");
 +            }
 +            volume.setPoolId(pool.getId());
 +        }
 +
 +        if (customId != null) {
 +            volume.setUuid(customId);
 +        }
 +
 +        updateDisplay(volume, displayVolume);
 +
 +        _volsDao.update(volumeId, volume);
 +
 +        return volume;
 +    }
 +
 +    @Override
 +    public void updateDisplay(Volume volume, Boolean displayVolume) {
 +        // 1. Resource limit changes
 +        updateResourceCount(volume, displayVolume);
 +
 +        // 2. generate usage event if not in destroyed state
 +        saveUsageEvent(volume, displayVolume);
 +
 +        // 3. Set the flag
 +        if (displayVolume != null && displayVolume != volume.isDisplayVolume()) {
 +            // FIXME - Confused - typecast for now.
 +            ((VolumeVO)volume).setDisplayVolume(displayVolume);
 +            _volsDao.update(volume.getId(), (VolumeVO)volume);
 +        }
 +
 +    }
 +
 +    private void updateResourceCount(Volume volume, Boolean displayVolume) {
 +        // Update only when the flag has changed.
 +        if (displayVolume != null && displayVolume != volume.isDisplayVolume()) {
 +            _resourceLimitMgr.changeResourceCount(volume.getAccountId(), ResourceType.volume, displayVolume);
 +            _resourceLimitMgr.changeResourceCount(volume.getAccountId(), ResourceType.primary_storage, displayVolume, new Long(volume.getSize()));
 +        }
 +    }
 +
 +    private void saveUsageEvent(Volume volume, Boolean displayVolume) {
 +
 +        // Update only when the flag has changed  &&  only when volume in a non-destroyed state.
 +        if ((displayVolume != null && displayVolume != volume.isDisplayVolume()) && !isVolumeDestroyed(volume)) {
 +            if (displayVolume) {
 +                // flag turned 1 equivalent to freshly created volume
 +                UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_CREATE, volume.getAccountId(), volume.getDataCenterId(), volume.getId(), volume.getName(), volume.getDiskOfferingId(),
 +                        volume.getTemplateId(), volume.getSize(), Volume.class.getName(), volume.getUuid());
 +            } else {
 +                // flag turned 0 equivalent to deleting a volume
 +                UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VOLUME_DELETE, volume.getAccountId(), volume.getDataCenterId(), volume.getId(), volume.getName(), Volume.class.getName(),
 +                        volume.getUuid());
 +            }
 +        }
 +    }
 +
 +    private boolean isVolumeDestroyed(Volume volume) {
 +        if (volume.getState() == Volume.State.Destroy || volume.getState() == Volume.State.Expunging && volume.getState() == Volume.State.Expunged) {
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_DETACH, eventDescription = "detaching volume", async = true)
 +    public Volume detachVolumeFromVM(DetachVolumeCmd cmmd) {
 +        Account caller = CallContext.current().getCallingAccount();
 +        if ((cmmd.getId() == null && cmmd.getDeviceId() == null && cmmd.getVirtualMachineId() == null) || (cmmd.getId() != null && (cmmd.getDeviceId() != null || cmmd.getVirtualMachineId() != null))
 +                || (cmmd.getId() == null && (cmmd.getDeviceId() == null || cmmd.getVirtualMachineId() == null))) {
 +            throw new InvalidParameterValueException("Please provide either a volume id, or a tuple(device id, instance id)");
 +        }
 +
 +        Long volumeId = cmmd.getId();
 +        VolumeVO volume = null;
 +
 +        if (volumeId != null) {
 +            volume = _volsDao.findById(volumeId);
 +        } else {
 +            volume = _volsDao.findByInstanceAndDeviceId(cmmd.getVirtualMachineId(), cmmd.getDeviceId()).get(0);
 +        }
 +
 +        // Check that the volume ID is valid
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("Unable to find volume with ID: " + volumeId);
 +        }
 +
 +        Long vmId = null;
 +
 +        if (cmmd.getVirtualMachineId() == null) {
 +            vmId = volume.getInstanceId();
 +        } else {
 +            vmId = cmmd.getVirtualMachineId();
 +        }
 +
 +        // Permissions check
 +        _accountMgr.checkAccess(caller, null, true, volume);
 +
 +        // Check that the volume is currently attached to a VM
 +        if (vmId == null) {
 +            throw new InvalidParameterValueException("The specified volume is not attached to a VM.");
 +        }
 +
 +        // Check that the VM is in the correct state
 +        UserVmVO vm = _userVmDao.findById(vmId);
 +        if (vm.getState() != State.Running && vm.getState() != State.Stopped && vm.getState() != State.Destroyed) {
 +            throw new InvalidParameterValueException("Please specify a VM that is either running or stopped.");
 +        }
 +
 +        // Check that the volume is a data/root volume
 +        if (!(volume.getVolumeType() == Volume.Type.ROOT || volume.getVolumeType() == Volume.Type.DATADISK)) {
 +            throw new InvalidParameterValueException("Please specify volume of type " + Volume.Type.DATADISK.toString() + " or " + Volume.Type.ROOT.toString());
 +        }
 +
 +        // Root volume detach is allowed for following hypervisors: Xen/KVM/VmWare
 +        if (volume.getVolumeType() == Volume.Type.ROOT) {
 +            validateRootVolumeDetachAttach(volume, vm);
 +        }
 +
 +        // Don't allow detach if target VM has associated VM snapshots
 +        List<VMSnapshotVO> vmSnapshots = _vmSnapshotDao.findByVm(vmId);
 +        if (vmSnapshots.size() > 0) {
 +            throw new InvalidParameterValueException("Unable to detach volume, please specify a VM that does not have VM snapshots");
 +        }
 +
 +        AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +        if (asyncExecutionContext != null) {
 +            AsyncJob job = asyncExecutionContext.getJob();
 +
 +            if (s_logger.isInfoEnabled()) {
 +                s_logger.info("Trying to attaching volume " + volumeId + "to vm instance:" + vm.getId() + ", update async job-" + job.getId() + " progress status");
 +            }
 +
 +            _jobMgr.updateAsyncJobAttachment(job.getId(), "Volume", volumeId);
 +        }
 +
 +        AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +        if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
 +            // avoid re-entrance
 +            VmWorkJobVO placeHolder = null;
 +            placeHolder = createPlaceHolderWork(vmId);
 +            try {
 +                return orchestrateDetachVolumeFromVM(vmId, volumeId);
 +            } finally {
 +                _workJobDao.expunge(placeHolder.getId());
 +            }
 +        } else {
 +            Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId);
 +
 +            Volume vol = null;
 +            try {
 +                outcome.get();
 +            } catch (InterruptedException e) {
 +                throw new RuntimeException("Operation is interrupted", e);
 +            } catch (java.util.concurrent.ExecutionException e) {
 +                throw new RuntimeException("Execution excetion", e);
 +            }
 +
 +            Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
 +            if (jobResult != null) {
 +                if (jobResult instanceof ConcurrentOperationException) {
 +                    throw (ConcurrentOperationException)jobResult;
 +                } else if (jobResult instanceof RuntimeException) {
 +                    throw (RuntimeException)jobResult;
 +                } else if (jobResult instanceof Throwable) {
 +                    throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
 +                } else if (jobResult instanceof Long) {
 +                    vol = _volsDao.findById((Long)jobResult);
 +                }
 +            }
 +            return vol;
 +        }
 +    }
 +
 +    private void validateRootVolumeDetachAttach(VolumeVO volume, UserVmVO vm) {
 +        if (!(vm.getHypervisorType() == HypervisorType.XenServer || vm.getHypervisorType() == HypervisorType.VMware || vm.getHypervisorType() == HypervisorType.KVM
 +                || vm.getHypervisorType() == HypervisorType.Simulator)) {
 +            throw new InvalidParameterValueException("Root volume detach is not supported for hypervisor type " + vm.getHypervisorType());
 +        }
 +        if (!(vm.getState() == State.Stopped) || (vm.getState() == State.Destroyed)) {
 +            throw new InvalidParameterValueException("Root volume detach can happen only when vm is in states: " + State.Stopped.toString() + " or " + State.Destroyed.toString());
 +        }
 +
 +        if (volume.getPoolId() != null) {
 +            StoragePoolVO pool = _storagePoolDao.findById(volume.getPoolId());
 +            if (pool.isManaged()) {
 +                throw new InvalidParameterValueException("Root volume detach is not supported for Managed DataStores");
 +            }
 +        }
 +    }
 +
 +    private Volume orchestrateDetachVolumeFromVM(long vmId, long volumeId) {
 +
 +        Volume volume = _volsDao.findById(volumeId);
 +        VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +
 +        String errorMsg = "Failed to detach volume " + volume.getName() + " from VM " + vm.getHostName();
 +        boolean sendCommand = vm.getState() == State.Running;
 +
 +        Long hostId = vm.getHostId();
 +
 +        if (hostId == null) {
 +            hostId = vm.getLastHostId();
 +
 +            HostVO host = _hostDao.findById(hostId);
 +
 +            if (host != null && host.getHypervisorType() == HypervisorType.VMware) {
 +                sendCommand = true;
 +            }
 +        }
 +
 +        HostVO host = null;
 +        StoragePoolVO volumePool = _storagePoolDao.findByIdIncludingRemoved(volume.getPoolId());
 +
 +        if (hostId != null) {
 +            host = _hostDao.findById(hostId);
 +
 +            if (host != null && host.getHypervisorType() == HypervisorType.XenServer && volumePool != null && volumePool.isManaged()) {
 +                sendCommand = true;
 +            }
 +        }
 +
 +        Answer answer = null;
 +
 +        if (sendCommand) {
 +            // collect vm disk statistics before detach a volume
 +            UserVmVO userVm = _userVmDao.findById(vmId);
 +            if (userVm != null && userVm.getType() == VirtualMachine.Type.User) {
 +                _userVmService.collectVmDiskStatistics(userVm);
 +            }
 +
 +            DataTO volTO = volFactory.getVolume(volume.getId()).getTO();
 +            DiskTO disk = new DiskTO(volTO, volume.getDeviceId(), volume.getPath(), volume.getVolumeType());
 +
 +            DettachCommand cmd = new DettachCommand(disk, vm.getInstanceName());
 +
 +            cmd.setManaged(volumePool.isManaged());
 +
 +            cmd.setStorageHost(volumePool.getHostAddress());
 +            cmd.setStoragePort(volumePool.getPort());
 +
 +            cmd.set_iScsiName(volume.get_iScsiName());
 +
 +            try {
 +                answer = _agentMgr.send(hostId, cmd);
 +            } catch (Exception e) {
 +                throw new CloudRuntimeException(errorMsg + " due to: " + e.getMessage());
 +            }
 +        }
 +
 +        if (!sendCommand || (answer != null && answer.getResult())) {
 +            // Mark the volume as detached
 +            _volsDao.detachVolume(volume.getId());
 +
 +            // volume.getPoolId() should be null if the VM we are detaching the disk from has never been started before
 +            DataStore dataStore = volume.getPoolId() != null ? dataStoreMgr.getDataStore(volume.getPoolId(), DataStoreRole.Primary) : null;
 +
 +            volService.revokeAccess(volFactory.getVolume(volume.getId()), host, dataStore);
 +
 +            handleTargetsForVMware(hostId, volumePool.getHostAddress(), volumePool.getPort(), volume.get_iScsiName());
 +
 +            return _volsDao.findById(volumeId);
 +        } else {
 +
 +            if (answer != null) {
 +                String details = answer.getDetails();
 +                if (details != null && !details.isEmpty()) {
 +                    errorMsg += "; " + details;
 +                }
 +            }
 +
 +            throw new CloudRuntimeException(errorMsg);
 +        }
 +    }
 +
 +    public void updateMissingRootDiskController(final VMInstanceVO vm, final String rootVolChainInfo) {
 +        if (vm == null || !VirtualMachine.Type.User.equals(vm.getType()) || Strings.isNullOrEmpty(rootVolChainInfo)) {
 +            return;
 +        }
 +        String rootDiskController = null;
 +        try {
 +            final VirtualMachineDiskInfo infoInChain = _gson.fromJson(rootVolChainInfo, VirtualMachineDiskInfo.class);
 +            if (infoInChain != null) {
 +                rootDiskController = infoInChain.getControllerFromDeviceBusName();
 +            }
 +            final UserVmVO userVmVo = _userVmDao.findById(vm.getId());
 +            if ((rootDiskController != null) && (!rootDiskController.isEmpty())) {
 +                _userVmDao.loadDetails(userVmVo);
 +                _userVmMgr.persistDeviceBusInfo(userVmVo, rootDiskController);
 +            }
 +        } catch (JsonParseException e) {
 +            s_logger.debug("Error parsing chain info json: " + e.getMessage());
 +        }
 +    }
 +
 +    private void handleTargetsForVMware(long hostId, String storageAddress, int storagePort, String iScsiName) {
 +        HostVO host = _hostDao.findById(hostId);
 +
 +        if (host.getHypervisorType() == HypervisorType.VMware) {
 +            ModifyTargetsCommand cmd = new ModifyTargetsCommand();
 +
 +            List<Map<String, String>> targets = new ArrayList<>();
 +
 +            Map<String, String> target = new HashMap<>();
 +
 +            target.put(ModifyTargetsCommand.STORAGE_HOST, storageAddress);
 +            target.put(ModifyTargetsCommand.STORAGE_PORT, String.valueOf(storagePort));
 +            target.put(ModifyTargetsCommand.IQN, iScsiName);
 +
 +            targets.add(target);
 +
 +            cmd.setTargets(targets);
 +            cmd.setApplyToAllHostsInCluster(true);
 +            cmd.setAdd(false);
 +            cmd.setTargetTypeToRemove(ModifyTargetsCommand.TargetTypeToRemove.DYNAMIC);
 +
 +            sendModifyTargetsCommand(cmd, hostId);
 +        }
 +    }
 +
 +    private void sendModifyTargetsCommand(ModifyTargetsCommand cmd, long hostId) {
 +        Answer answer = _agentMgr.easySend(hostId, cmd);
 +
 +        if (answer == null) {
 +            String msg = "Unable to get an answer to the modify targets command";
 +
 +            s_logger.warn(msg);
 +        } else if (!answer.getResult()) {
 +            String msg = "Unable to modify target on the following host: " + hostId;
 +
 +            s_logger.warn(msg);
 +        }
 +    }
 +
 +    @DB
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_MIGRATE, eventDescription = "migrating volume", async = true)
 +    public Volume migrateVolume(MigrateVolumeCmd cmd) {
 +        Long volumeId = cmd.getVolumeId();
 +        Long storagePoolId = cmd.getStoragePoolId();
 +
 +        VolumeVO vol = _volsDao.findById(volumeId);
 +        if (vol == null) {
 +            throw new InvalidParameterValueException("Failed to find the volume id: " + volumeId);
 +        }
 +
 +        if (vol.getState() != Volume.State.Ready) {
 +            throw new InvalidParameterValueException("Volume must be in ready state");
 +        }
 +
 +        boolean liveMigrateVolume = false;
 +        Long instanceId = vol.getInstanceId();
 +        Long srcClusterId = null;
 +        VMInstanceVO vm = null;
 +        if (instanceId != null) {
 +            vm = _vmInstanceDao.findById(instanceId);
 +        }
 +
 +        // Check that Vm to which this volume is attached does not have VM Snapshots
 +        if (vm != null && _vmSnapshotDao.findByVm(vm.getId()).size() > 0) {
 +            throw new InvalidParameterValueException("Volume cannot be migrated, please remove all VM snapshots for VM to which this volume is attached");
 +        }
 +
 +        if (vm != null && vm.getState() == State.Running) {
 +            // Check if the VM is GPU enabled.
 +            if (_serviceOfferingDetailsDao.findDetail(vm.getServiceOfferingId(), GPU.Keys.pciDevice.toString()) != null) {
 +                throw new InvalidParameterValueException("Live Migration of GPU enabled VM is not supported");
 +            }
 +            // Check if the underlying hypervisor supports storage motion.
 +            Long hostId = vm.getHostId();
 +            if (hostId != null) {
 +                HostVO host = _hostDao.findById(hostId);
 +                HypervisorCapabilitiesVO capabilities = null;
 +                if (host != null) {
 +                    capabilities = _hypervisorCapabilitiesDao.findByHypervisorTypeAndVersion(host.getHypervisorType(), host.getHypervisorVersion());
 +                    srcClusterId = host.getClusterId();
 +                }
 +
 +                if (capabilities != null) {
 +                    liveMigrateVolume = capabilities.isStorageMotionSupported();
 +                }
 +            }
 +
 +            // If vm is running, and hypervisor doesn't support live migration, then return error
 +            if (!liveMigrateVolume) {
 +                throw new InvalidParameterValueException("Volume needs to be detached from VM");
 +            }
 +        }
 +
 +        if (liveMigrateVolume && !cmd.isLiveMigrate()) {
 +            throw new InvalidParameterValueException("The volume " + vol + "is attached to a vm and for migrating it " + "the parameter livemigrate should be specified");
 +        }
 +
 +        StoragePool destPool = (StoragePool)dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
 +        if (destPool == null) {
 +            throw new InvalidParameterValueException("Failed to find the destination storage pool: " + storagePoolId);
 +        } else if (destPool.isInMaintenance()) {
 +            throw new InvalidParameterValueException("Cannot migrate volume " + vol + "to the destination storage pool " + destPool.getName() + " as the storage pool is in maintenance mode.");
 +        }
 +
++        if (!storageMgr.storagePoolHasEnoughSpace(Collections.singletonList(vol), destPool)) {
++            throw new CloudRuntimeException("Storage pool " + destPool.getName() + " does not have enough space to migrate volume " + vol.getName());
++        }
++
 +        if (liveMigrateVolume && destPool.getClusterId() != null && srcClusterId != null) {
 +            if (!srcClusterId.equals(destPool.getClusterId())) {
 +                throw new InvalidParameterValueException("Cannot migrate a volume of a virtual machine to a storage pool in a different cluster");
 +            }
 +        }
 +        // In case of VMware, if ROOT volume is being cold-migrated, then ensure destination storage pool is in the same Datacenter as the VM.
 +        if (vm != null && vm.getHypervisorType().equals(HypervisorType.VMware)) {
 +            if (!liveMigrateVolume && vol.volumeType.equals(Volume.Type.ROOT)) {
 +                Long hostId = vm.getHostId() != null ? vm.getHostId() : vm.getLastHostId();
 +                HostVO host = _hostDao.findById(hostId);
 +                if (host != null) {
 +                    srcClusterId = host.getClusterId();
 +                }
 +                if (srcClusterId != null && destPool.getClusterId() != null && !srcClusterId.equals(destPool.getClusterId())) {
 +                    String srcDcName = _clusterDetailsDao.getVmwareDcName(srcClusterId);
 +                    String destDcName = _clusterDetailsDao.getVmwareDcName(destPool.getClusterId());
 +                    if (srcDcName != null && destDcName != null && !srcDcName.equals(destDcName)) {
 +                        throw new InvalidParameterValueException("Cannot migrate ROOT volume of a stopped VM to a storage pool in a different VMware datacenter");
 +                    }
 +                }
 +                updateMissingRootDiskController(vm, vol.getChainInfo());
 +            }
 +        }
 +        DiskOfferingVO newDiskOffering = retrieveAndValidateNewDiskOffering(cmd);
 +        validateConditionsToReplaceDiskOfferingOfVolume(vol, newDiskOffering, destPool);
 +        if (vm != null) {
 +            // serialize VM operation
 +            AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +            if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
 +                // avoid re-entrance
 +
 +                VmWorkJobVO placeHolder = null;
 +                placeHolder = createPlaceHolderWork(vm.getId());
 +                try {
 +                    return orchestrateMigrateVolume(vol, destPool, liveMigrateVolume, newDiskOffering);
 +                } finally {
 +                    _workJobDao.expunge(placeHolder.getId());
 +                }
 +
 +            } else {
 +                Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm, vol, destPool, liveMigrateVolume, newDiskOffering);
 +
 +                try {
 +                    outcome.get();
 +                } catch (InterruptedException e) {
 +                    throw new RuntimeException("Operation is interrupted", e);
 +                } catch (java.util.concurrent.ExecutionException e) {
 +                    throw new RuntimeException("Execution excetion", e);
 +                }
 +
 +                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
 +                if (jobResult != null) {
 +                    if (jobResult instanceof ConcurrentOperationException) {
 +                        throw (ConcurrentOperationException)jobResult;
 +                    } else if (jobResult instanceof RuntimeException) {
 +                        throw (RuntimeException)jobResult;
 +                    } else if (jobResult instanceof Throwable) {
 +                        throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
 +                    }
 +                }
 +
 +                // retrieve the migrated new volume from job result
 +                if (jobResult != null && jobResult instanceof Long) {
 +                    return _entityMgr.findById(VolumeVO.class, ((Long)jobResult));
 +                }
 +                return null;
 +            }
 +        }
 +
 +        return orchestrateMigrateVolume(vol, destPool, liveMigrateVolume, newDiskOffering);
 +    }
 +
 +    /**
 +     * Retrieves the new disk offering UUID that might be sent to replace the current one in the volume being migrated.
 +     * If no disk offering UUID is provided we return null. Otherwise, we perform the following checks.
 +     * <ul>
 +     *  <li>Is the disk offering UUID entered valid? If not, an  {@link InvalidParameterValueException} is thrown;
 +     *  <li>If the disk offering was already removed, we thrown an {@link InvalidParameterValueException} is thrown;
 +     *  <li>We then check if the user executing the operation has access to the given disk offering.
 +     * </ul>
 +     *
 +     * If all checks pass, we move forward returning the disk offering object.
 +     */
 +    private DiskOfferingVO retrieveAndValidateNewDiskOffering(MigrateVolumeCmd cmd) {
 +        String newDiskOfferingUuid = cmd.getNewDiskOfferingUuid();
 +        if (org.apache.commons.lang.StringUtils.isBlank(newDiskOfferingUuid)) {
 +            return null;
 +        }
 +        DiskOfferingVO newDiskOffering = _diskOfferingDao.findByUuid(newDiskOfferingUuid);
 +        if (newDiskOffering == null) {
 +            throw new InvalidParameterValueException(String.format("The disk offering informed is not valid [id=%s].", newDiskOfferingUuid));
 +        }
 +        if (newDiskOffering.getRemoved() != null) {
 +            throw new InvalidParameterValueException(String.format("We cannot assign a removed disk offering [id=%s] to a volume. ", newDiskOffering.getUuid()));
 +        }
 +        Account caller = CallContext.current().getCallingAccount();
 +        _accountMgr.checkAccess(caller, newDiskOffering);
 +        return newDiskOffering;
 +    }
 +
 +    /**
 +     * Performs the validations required for replacing the disk offering while migrating the volume of storage. If no new disk offering is provided, we do not execute any validation.
 +     * If a disk offering is informed, we then proceed with the following checks.
 +     * <ul>
 +     *  <li>We check if the given volume is of ROOT type. We cannot change the disk offering of a ROOT volume. Therefore, we thrown an {@link InvalidParameterValueException};
 +     *  <li>We the disk is being migrated to shared storage and the new disk offering is for local storage (or vice versa), we throw an {@link InvalidParameterValueException}. Bear in mind that we are validating only the new disk offering. If none is provided we can override the current disk offering. This means, placing a volume with shared disk offering in local storage and vice versa;
 +     *  <li>We then proceed checking the target storage pool supports the new disk offering {@link #doesTargetStorageSupportNewDiskOffering(StoragePool, DiskOfferingVO)}.
 +     * </ul>
 +     *
 +     * If all of the above validations pass, we check if the size of the new disk offering is different from the volume. If it is, we log a warning message.
 +     */
 +    protected void validateConditionsToReplaceDiskOfferingOfVolume(VolumeVO volume, DiskOfferingVO newDiskOffering, StoragePool destPool) {
 +        if (newDiskOffering == null) {
 +            return;
 +        }
 +        if ((destPool.isShared() && newDiskOffering.getUseLocalStorage()) || destPool.isLocal() && newDiskOffering.isShared()) {
 +            throw new InvalidParameterValueException("You cannot move the volume to a shared storage and assing a disk offering for local storage and vice versa.");
 +        }
 +        if (!doesTargetStorageSupportNewDiskOffering(destPool, newDiskOffering)) {
 +            throw new InvalidParameterValueException(String.format("Target Storage [id=%s] tags [%s] does not match new disk offering [id=%s] tags [%s].", destPool.getUuid(),
 +                    getStoragePoolTags(destPool), newDiskOffering.getUuid(), newDiskOffering.getTags()));
 +        }
 +        if (volume.getSize() != newDiskOffering.getDiskSize()) {
 +            DiskOfferingVO oldDiskOffering = this._diskOfferingDao.findById(volume.getDiskOfferingId());
 +            s_logger.warn(String.format(
 +                    "You are migrating a volume [id=%s] and changing the disk offering[from id=%s to id=%s] to reflect this migration. However, the sizes of the volume and the new disk offering are different.",
 +                    volume.getUuid(), oldDiskOffering.getUuid(), newDiskOffering.getUuid()));
 +        }
 +        s_logger.info(String.format("Changing disk offering to [uuid=%s] while migrating volume [uuid=%s, name=%s].", newDiskOffering.getUuid(), volume.getUuid(), volume.getName()));
 +    }
 +
 +    /**
 +     *  Checks if the target storage supports the new disk offering.
 +     *  This validation is consistent with the mechanism used to select a storage pool to deploy a volume when a virtual machine is deployed or when a new data disk is allocated.
 +     *
 +     *  The scenarios when this method returns true or false is presented in the following table.
 +     *
 +     *   <table border="1">
 +     *      <tr>
 +     *          <th>#</th><th>Disk offering tags</th><th>Storage tags</th><th>Does the storage support the disk offering?</th>
 +     *      </tr>
 +     *      <body>
 +     *      <tr>
 +     *          <td>1</td><td>A,B</td><td>A</td><td>NO</td>
 +     *      </tr>
 +     *      <tr>
 +     *          <td>2</td><td>A,B,C</td><td>A,B,C,D,X</td><td>YES</td>
 +     *      </tr>
 +     *      <tr>
 +     *          <td>3</td><td>A,B,C</td><td>X,Y,Z</td><td>NO</td>
 +     *      </tr>
 +     *      <tr>
 +     *          <td>4</td><td>null</td><td>A,S,D</td><td>YES</td>
 +     *      </tr>
 +     *      <tr>
 +     *          <td>5</td><td>A</td><td>null</td><td>NO</td>
 +     *      </tr>
 +     *      <tr>
 +     *          <td>6</td><td>null</td><td>null</td><td>YES</td>
 +     *      </tr>
 +     *      </body>
 +     *   </table>
 +     */
 +    protected boolean doesTargetStorageSupportNewDiskOffering(StoragePool destPool, DiskOfferingVO newDiskOffering) {
 +        String newDiskOfferingTags = newDiskOffering.getTags();
 +        return doesTargetStorageSupportDiskOffering(destPool, newDiskOfferingTags);
 +    }
 +
 +    @Override
 +    public boolean doesTargetStorageSupportDiskOffering(StoragePool destPool, String diskOfferingTags) {
 +        if (org.apache.commons.lang.StringUtils.isBlank(diskOfferingTags)) {
 +            return true;
 +        }
 +        String storagePoolTags = getStoragePoolTags(destPool);
 +        if (org.apache.commons.lang.StringUtils.isBlank(storagePoolTags)) {
 +            return false;
 +        }
 +        String[] storageTagsAsStringArray = org.apache.commons.lang.StringUtils.split(storagePoolTags, ",");
 +        String[] newDiskOfferingTagsAsStringArray = org.apache.commons.lang.StringUtils.split(diskOfferingTags, ",");
 +
 +        return CollectionUtils.isSubCollection(Arrays.asList(newDiskOfferingTagsAsStringArray), Arrays.asList(storageTagsAsStringArray));
 +    }
 +
 +    /**
 +     *  Retrieves the storage pool tags as a {@link String}. If the storage pool does not have tags we return a null value.
 +     */
 +    protected String getStoragePoolTags(StoragePool destPool) {
 +        List<StoragePoolDetailVO> storagePoolDetails = storagePoolDetailsDao.listDetails(destPool.getId());
 +        if (CollectionUtils.isEmpty(storagePoolDetails)) {
 +            return null;
 +        }
 +        String storageTags = "";
 +        for (StoragePoolDetailVO storagePoolDetailVO : storagePoolDetails) {
 +            storageTags = storageTags + storagePoolDetailVO.getName() + ",";
 +        }
 +        return storageTags.substring(0, storageTags.length() - 1);
 +    }
 +
 +    private Volume orchestrateMigrateVolume(VolumeVO volume, StoragePool destPool, boolean liveMigrateVolume, DiskOfferingVO newDiskOffering) {
 +        Volume newVol = null;
 +        try {
 +            if (liveMigrateVolume) {
 +                newVol = liveMigrateVolume(volume, destPool);
 +            } else {
 +                newVol = _volumeMgr.migrateVolume(volume, destPool);
 +            }
 +            if (newDiskOffering != null) {
 +                _volsDao.updateDiskOffering(newVol.getId(), newDiskOffering.getId());
 +            }
 +        } catch (StorageUnavailableException e) {
 +            s_logger.debug("Failed to migrate volume", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        } catch (Exception e) {
 +            s_logger.debug("Failed to migrate volume", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        }
 +        return newVol;
 +    }
 +
 +    @DB
 +    protected Volume liveMigrateVolume(Volume volume, StoragePool destPool) throws StorageUnavailableException {
 +        VolumeInfo vol = volFactory.getVolume(volume.getId());
 +
 +        DataStore dataStoreTarget = dataStoreMgr.getDataStore(destPool.getId(), DataStoreRole.Primary);
 +        AsyncCallFuture<VolumeApiResult> future = volService.migrateVolume(vol, dataStoreTarget);
 +        try {
 +            VolumeApiResult result = future.get();
 +            if (result.isFailed()) {
 +                s_logger.debug("migrate volume failed:" + result.getResult());
 +                throw new StorageUnavailableException("Migrate volume failed: " + result.getResult(), destPool.getId());
 +            }
 +            return result.getVolume();
 +        } catch (InterruptedException e) {
 +            s_logger.debug("migrate volume failed", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        } catch (ExecutionException e) {
 +            s_logger.debug("migrate volume failed", e);
 +            throw new CloudRuntimeException(e.getMessage());
 +        }
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_SNAPSHOT_CREATE, eventDescription = "taking snapshot", async = true)
 +    public Snapshot takeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account, boolean quiescevm, Snapshot.LocationType locationType, boolean asyncBackup)
 +            throws ResourceAllocationException {
 +        VolumeInfo volume = volFactory.getVolume(volumeId);
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
 +        }
 +
 +        if (volume.getState() != Volume.State.Ready) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
 +        }
 +
 +        StoragePoolVO storagePoolVO = _storagePoolDao.findById(volume.getPoolId());
 +
 +        if (storagePoolVO.isManaged() && locationType == null) {
 +            locationType = Snapshot.LocationType.PRIMARY;
 +        }
 +
 +        VMInstanceVO vm = null;
 +        if (volume.getInstanceId() != null) {
 +            vm = _vmInstanceDao.findById(volume.getInstanceId());
 +        }
 +
 +        if (vm != null) {
 +            // serialize VM operation
 +            AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +            if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
 +                // avoid re-entrance
 +
 +                VmWorkJobVO placeHolder = null;
 +                placeHolder = createPlaceHolderWork(vm.getId());
 +                try {
 +                    return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm, locationType, asyncBackup);
 +                } finally {
 +                    _workJobDao.expunge(placeHolder.getId());
 +                }
 +
 +            } else {
 +                Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm, locationType, asyncBackup);
 +
 +                try {
 +                    outcome.get();
 +                } catch (InterruptedException e) {
 +                    throw new RuntimeException("Operation is interrupted", e);
 +                } catch (java.util.concurrent.ExecutionException e) {
 +                    throw new RuntimeException("Execution excetion", e);
 +                }
 +
 +                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
 +                if (jobResult != null) {
 +                    if (jobResult instanceof ConcurrentOperationException) {
 +                        throw (ConcurrentOperationException)jobResult;
 +                    } else if (jobResult instanceof ResourceAllocationException) {
 +                        throw (ResourceAllocationException)jobResult;
 +                    } else if (jobResult instanceof Throwable) {
 +                        throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
 +                    }
 +                }
 +
 +                return _snapshotDao.findById(snapshotId);
 +            }
 +        } else {
 +            CreateSnapshotPayload payload = new CreateSnapshotPayload();
 +            payload.setSnapshotId(snapshotId);
 +            payload.setSnapshotPolicyId(policyId);
 +            payload.setAccount(account);
 +            payload.setQuiescevm(quiescevm);
 +            payload.setAsyncBackup(asyncBackup);
 +            volume.addPayload(payload);
 +            return volService.takeSnapshot(volume);
 +        }
 +    }
 +
 +    private Snapshot orchestrateTakeVolumeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account, boolean quiescevm, Snapshot.LocationType locationType, boolean asyncBackup)
 +            throws ResourceAllocationException {
 +
 +        VolumeInfo volume = volFactory.getVolume(volumeId);
 +
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
 +        }
 +
 +        if (volume.getState() != Volume.State.Ready) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
 +        }
 +
 +        CreateSnapshotPayload payload = new CreateSnapshotPayload();
 +
 +        payload.setSnapshotId(snapshotId);
 +        payload.setSnapshotPolicyId(policyId);
 +        payload.setAccount(account);
 +        payload.setQuiescevm(quiescevm);
 +        payload.setLocationType(locationType);
 +        payload.setAsyncBackup(asyncBackup);
 +        volume.addPayload(payload);
 +
 +        return volService.takeSnapshot(volume);
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_SNAPSHOT_CREATE, eventDescription = "allocating snapshot", create = true)
 +    public Snapshot allocSnapshot(Long volumeId, Long policyId, String snapshotName, Snapshot.LocationType locationType) throws ResourceAllocationException {
 +        Account caller = CallContext.current().getCallingAccount();
 +
 +        VolumeInfo volume = volFactory.getVolume(volumeId);
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
 +        }
 +        DataCenter zone = _dcDao.findById(volume.getDataCenterId());
 +        if (zone == null) {
 +            throw new InvalidParameterValueException("Can't find zone by id " + volume.getDataCenterId());
 +        }
 +
 +        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(caller.getId())) {
 +            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zone.getName());
 +        }
 +
 +        if (volume.getState() != Volume.State.Ready) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
 +        }
 +
 +        if (ImageFormat.DIR.equals(volume.getFormat())) {
 +            throw new InvalidParameterValueException("Snapshot not supported for volume:" + volumeId);
 +        }
 +
 +        if (volume.getTemplateId() != null) {
 +            VMTemplateVO template = _templateDao.findById(volume.getTemplateId());
 +            if (template != null && template.getTemplateType() == Storage.TemplateType.SYSTEM) {
 +                throw new InvalidParameterValueException("VolumeId: " + volumeId + " is for System VM , Creating snapshot against System VM volumes is not supported");
 +            }
 +        }
 +
 +        StoragePoolVO storagePoolVO = _storagePoolDao.findById(volume.getPoolId());
 +
 +        if (!storagePoolVO.isManaged() && locationType != null) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " LocationType is supported only for managed storage");
 +        }
 +
 +        if (storagePoolVO.isManaged() && locationType == null) {
 +            locationType = Snapshot.LocationType.PRIMARY;
 +        }
 +
 +        StoragePool storagePool = (StoragePool)volume.getDataStore();
 +        if (storagePool == null) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " please attach this volume to a VM before create snapshot for it");
 +        }
 +
 +        return snapshotMgr.allocSnapshot(volumeId, policyId, snapshotName, locationType);
 +    }
 +
 +    @Override
 +    public Snapshot allocSnapshotForVm(Long vmId, Long volumeId, String snapshotName) throws ResourceAllocationException {
 +        Account caller = CallContext.current().getCallingAccount();
 +        VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +        if (vm == null) {
 +            throw new InvalidParameterValueException("Creating snapshot failed due to vm:" + vmId + " doesn't exist");
 +        }
 +        _accountMgr.checkAccess(caller, null, true, vm);
 +
 +        VolumeInfo volume = volFactory.getVolume(volumeId);
 +        if (volume == null) {
 +            throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't exist");
 +        }
 +        _accountMgr.checkAccess(caller, null, true, volume);
 +        VirtualMachine attachVM = volume.getAttachedVM();
 +        if (attachVM == null || attachVM.getId() != vm.getId()) {
 +            throw new InvalidParameterValueException("Creating snapshot failed due to volume:" + volumeId + " doesn't attach to vm :" + vm);
 +        }
 +
 +        DataCenter zone = _dcDao.findById(volume.getDataCenterId());
 +        if (zone == null) {
 +            throw new InvalidParameterValueException("Can't find zone by id " + volume.getDataCenterId());
 +        }
 +
 +        if (Grouping.AllocationState.Disabled == zone.getAllocationState() && !_accountMgr.isRootAdmin(caller.getId())) {
 +            throw new PermissionDeniedException("Cannot perform this operation, Zone is currently disabled: " + zone.getName());
 +        }
 +
 +        if (volume.getState() != Volume.State.Ready) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " is not in " + Volume.State.Ready + " state but " + volume.getState() + ". Cannot take snapshot.");
 +        }
 +
 +        if (volume.getTemplateId() != null) {
 +            VMTemplateVO template = _templateDao.findById(volume.getTemplateId());
 +            if (template != null && template.getTemplateType() == Storage.TemplateType.SYSTEM) {
 +                throw new InvalidParameterValueException("VolumeId: " + volumeId + " is for System VM , Creating snapshot against System VM volumes is not supported");
 +            }
 +        }
 +
 +        StoragePool storagePool = (StoragePool)volume.getDataStore();
 +        if (storagePool == null) {
 +            throw new InvalidParameterValueException("VolumeId: " + volumeId + " please attach this volume to a VM before create snapshot for it");
 +        }
 +
 +        return snapshotMgr.allocSnapshot(volumeId, Snapshot.MANUAL_POLICY_ID, snapshotName, null);
 +    }
 +
 +    @Override
 +    @ActionEvent(eventType = EventTypes.EVENT_VOLUME_EXTRACT, eventDescription = "extracting volume", async = true)
 +    public String extractVolume(ExtractVolumeCmd cmd) {
 +        Long volumeId = cmd.getId();
 +        Long zoneId = cmd.getZoneId();
 +        String mode = cmd.getMode();
 +        Account account = CallContext.current().getCallingAccount();
 +
 +        if (!_accountMgr.isRootAdmin(account.getId()) && ApiDBUtils.isExtractionDisabled()) {
 +            throw new PermissionDeniedException("Extraction has been disabled by admin");
 +        }
 +
 +        VolumeVO volume = _volsDao.findById(volumeId);
 +        if (volume == null) {
 +            InvalidParameterValueException ex = new InvalidParameterValueException("Unable to find volume with specified volumeId");
 +            ex.addProxyObject(volumeId.toString(), "volumeId");
 +            throw ex;
 +        }
 +
 +        // perform permission check
 +        _accountMgr.checkAccess(account, null, true, volume);
 +
 +        if (_dcDao.findById(zoneId) == null) {
 +            throw new InvalidParameterValueException("Please specify a valid zone.");
 +        }
 +        if (volume.getPoolId() == null) {
 +            throw new InvalidParameterValueException("The volume doesn't belong to a storage pool so can't extract it");
 +        }
 +        // Extract activity only for detached volumes or for volumes whose
 +        // instance is stopped
 +        if (volume.getInstanceId() != null && ApiDBUtils.findVMInstanceById(volume.getInstanceId()).getState() != State.Stopped) {
 +            s_logger.debug("Invalid state of the volume with ID: " + volumeId + ". It should be either detached or the VM should be in stopped state.");
 +            PermissionDeniedException ex = new PermissionDeniedException("Invalid state of the volume with specified ID. It should be either detached or the VM should be in stopped state.");
 +            ex.addProxyObject(volume.getUuid(), "volumeId");
 +            throw ex;
 +        }
 +
 +        if (volume.getVolumeType() != Volume.Type.DATADISK) {
 +            // Datadisk dont have any template dependence.
 +
 +            VMTemplateVO template = ApiDBUtils.findTemplateById(volume.getTemplateId());
 +            if (template != null) { // For ISO based volumes template = null and
 +                // we allow extraction of all ISO based
 +                // volumes
 +                boolean isExtractable = template.isExtractable() && template.getTemplateType() != Storage.TemplateType.SYSTEM;
 +                if (!isExtractable && account != null && !_accountMgr.isRootAdmin(account.getId())) {
 +                    // Global admins are always allowed to extract
 +                    PermissionDeniedException ex = new PermissionDeniedException("The volume with specified volumeId is not allowed to be extracted");
 +                    ex.addProxyObject(volume.getUuid(), "volumeId");
 +                    throw ex;
 +                }
 +            }
 +        }
 +
 +        if (mode == null || (!mode.equals(Upload.Mode.FTP_UPLOAD.toString()) && !mode.equals(Upload.Mode.HTTP_DOWNLOAD.toString()))) {
 +            throw new InvalidParameterValueException("Please specify a valid extract Mode ");
 +        }
 +
 +        // Check if the url already exists
 +        VolumeDataStoreVO volumeStoreRef = _volumeStoreDao.findByVolume(volumeId);
 +        if (volumeStoreRef != null && volumeStoreRef.getExtractUrl() != null) {
 +            return volumeStoreRef.getExtractUrl();
 +        }
 +
 +        VMInstanceVO vm = null;
 +        if (volume.getInstanceId() != null) {
 +            vm = _vmInstanceDao.findById(volume.getInstanceId());
 +        }
 +
 +        if (vm != null) {
 +            // serialize VM operation
 +            AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
 +            if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
 +                // avoid re-entrance
 +
 +                VmWorkJobVO placeHolder = null;
 +                placeHolder = createPlaceHolderWork(vm.getId());
 +                try {
 +                    return orchestrateExtractVolume(volume.getId(), zoneId);
 +                } finally {
 +                    _workJobDao.expunge(placeHolder.getId());
 +                }
 +
 +            } else {
 +                Outcome<String> outcome = extractVolumeThroughJobQueue(vm.getId(), volume.getId(), zoneId);
 +
 +                try {
 +                    outcome.get();
 +                } catch (InterruptedException e) {
 +                    throw new RuntimeException("Operation is interrupted", e);
 +                } catch (java.util.concurrent.ExecutionException e) {
 +                    throw new RuntimeException("Execution excetion", e);
 +                }
 +
 +                Object jobResult = _jobMgr.unmarshallResultObject(outcome.getJob());
 +                if (jobResult != null) {
 +                    if (jobResult instanceof ConcurrentOperationException) {
 +                        throw (ConcurrentOperationException)jobResult;
 +                    } else if (jobResult instanceof RuntimeException) {
 +                        throw (RuntimeException)jobResult;
 +                    } else if (jobResult instanceof Throwable) {
 +                        throw new RuntimeException("Unexpected exception", (Throwable)jobResult);
 +                    }
 +                }
 +
 +                // retrieve the entity url from job result
 +                if (jobResult != null && jobResult instanceof String) {
 +                    return (String)jobResult;
 +                }
 +                return null;
 +            }
 +        }
 +
 +        return orchestrateExtractVolume(volume.getId(), zoneId);
 +    }
 +
 +    private String orchestrateExtractVolume(long volumeId, long zoneId) {
 +        // get latest volume state to make sure that it is not updated by other parallel operations
 +        VolumeVO volume = _volsDao.findById(volumeId);
 +        if (volume == null || volume.getState() != Volume.State.Ready) {
 +            throw new InvalidParameterValueException("Volume to be extracted has been removed or not in right state!");
 +        }
 +        // perform extraction
 +        ImageStoreEntity secStore = (ImageStoreEntity)dataStoreMgr.getImageStore(zoneId);
 +        String value = _configDao.getValue(Config.CopyVolumeWait.toString());
 +        NumbersUtil.parseInt(value, Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
 +
 +        // Copy volume from primary to secondary storage
 +        VolumeInfo srcVol = volFactory.getVolume(volumeId);
 +        AsyncCallFuture<VolumeApiResult> cvAnswer = volService.copyVolume(srcVol, secStore);
 +        // Check if you got a valid answer.
 +        VolumeApiResult cvResult = null;
 +        try {
 +            cvResult = cvAnswer.get();
 +        } catch (InterruptedException e1) {
 +            s_logger.debug("failed copy volume", e1);
 +            throw new CloudRuntimeException("Failed to copy volume", e1);
 +        } catch (ExecutionException e1) {
 +            s_logger.debug("failed copy volume", e1);
 +            throw new CloudRuntimeException("Failed to copy volume", e1);
 +        }
 +        if (cvResult == null || cvResult.isFailed()) {
 +            String errorString = "Failed to copy the volume from the source primary storage pool to secondary storage.";
 +            throw new CloudRuntimeException(errorString);
 +        }
 +
 +        VolumeInfo vol = cvResult.getVolume();
 +
 +        String extractUrl = secStore.createEntityExtractUrl(vol.getPath(), vol.getFormat(), vol);
 +        VolumeDataStoreVO volumeStoreRef = _volumeStoreDao.findByVolume(volumeId);
 +
 +        volumeStoreRef.setExtractUrl(extractUrl);
 +        volumeStoreRef.setExtractUrlCreated(DateUtil.now());
 +        volumeStoreRef.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOADED);
 +        volumeStoreRef.setDownloadPercent(100);
 +        volumeStoreRef.setZoneId(zoneId);
 +
 +        _volumeStoreDao.update(volumeStoreRef.getId(), volumeStoreRef);
 +
 +        return extractUrl;
 +    }
 +
 +    @Override
 +    public boolean isDisplayResourceEnabled(Long id) {
 +        Volume volume = _volsDao.findById(id);
 +        if (volume == null) {
 +            return true; // bad id given, default to true
 +        }
 +        return volume.isDisplayVolume();
 +    }
 +
 +    private boolean needMoveVolume(VolumeVO existingVolume, VolumeInfo newVolume) {
 +        if (existingVolume == null || existingVolume.getPoolId() == null || newVolume.getPoolId() == null) {
 +            return false;
 +        }
 +
 +        DataStore storeForExistingVol = dataStoreMgr.getPrimaryDataStore(existingVolume.getPoolId());
 +        DataStore storeForNewVol = dataStoreMgr.getPrimaryDataStore(newVolume.getPoolId());
 +
 +        Scope storeForExistingStoreScope = storeForExistingVol.getScope();
 +        if (storeForExistingStoreScope == null) {
 +            throw new CloudRuntimeException("Can't get scope of data store: " + storeForExistingVol.getId());
 +        }
 +
 +        Scope storeForNewStoreScope = storeForNewVol.getScope();
 +        if (storeForNewStoreScope == null) {
 +            throw new CloudRuntimeException("Can't get scope of data store: " + storeForNewVol.getId());
 +        }
 +
 +        if (storeForNewStoreScope.getScopeType() == ScopeType.ZONE) {
 +            return false;
 +        }
 +
 +        if (storeForExistingStoreScope.getScopeType() != storeForNewStoreScope.getScopeType()) {
 +            if (storeForNewStoreScope.getScopeType() == ScopeType.CLUSTER) {
 +                Long vmClusterId = null;
 +                if (storeForExistingStoreScope.getScopeType() == ScopeType.HOST) {
 +                    HostScope hs = (HostScope)storeForExistingStoreScope;
 +                    vmClusterId = hs.getClusterId();
 +                } else if (storeForExistingStoreScope.getScopeType() == ScopeType.ZONE) {
 +                    Long hostId = _vmInstanceDao.findById(existingVolume.getInstanceId()).getHostId();
 +                    if (hostId != null) {
 +                        HostVO host = _hostDao.findById(hostId);
 +                        vmClusterId = host.getClusterId();
 +                    }
 +                }
 +                if (storeForNewStoreScope.getScopeId().equals(vmClusterId)) {
 +                    return false;
 +                } else {
 +                    return true;
 +                }
 +            } else if (storeForNewStoreScope.getScopeType() == ScopeType.HOST
 +                    && (storeForExistingStoreScope.getScopeType() == ScopeType.CLUSTER || storeForExistingStoreScope.getScopeType() == ScopeType.ZONE)) {
 +                Long hostId = _vmInstanceDao.findById(existingVolume.getInstanceId()).getHostId();
 +                if (storeForNewStoreScope.getScopeId().equals(hostId)) {
 +                    return false;
 +                }
 +            }
 +            throw new InvalidParameterValueException("Can't move volume between scope: " + storeForNewStoreScope.getScopeType() + " and " + storeForExistingStoreScope.getScopeType());
 +        }
 +
 +        return !storeForExistingStoreScope.isSameScope(storeForNewStoreScope);
 +    }
 +
 +    private synchronized void checkAndSetAttaching(Long volumeId, Long hostId) {
 +        VolumeInfo volumeToAttach = volFactory.getVolume(volumeId);
 +
 +        if (volumeToAttach.isAttachedVM()) {
 +            throw new CloudRuntimeException("volume: " + volumeToAttach.getName() + " is already attached to a VM: " + volumeToAttach.getAttachedVmName());
 +        }
 +        if (volumeToAttach.getState().equals(Volume.State.Ready)) {
 +            volumeToAttach.stateTransit(Volume.Event.AttachRequested);
 +        } else {
 +            String error = null;
 +            if (hostId == null) {
 +                error = "Please try attach operation after starting VM once";
 +            } else {
 +                error = "Volume: " + volumeToAttach.getName() + " is in " + volumeToAttach.getState() + ". It should be in Ready state";
 +            }
 +            s_logger.error(error);
 +            throw new CloudRuntimeException(error);
 +        }
 +    }
 +
 +    private void verifyManagedStorage(Long storagePoolId, Long hostId) {
 +        if (storagePoolId == null || hostId == null) {
 +            return;
 +        }
 +
 +        StoragePoolVO storagePoolVO = _storagePoolDao.findById(storagePoolId);
 +
 +        if (storagePoolVO == null || !storagePoolVO.isManaged()) {
 +            return;
 +        }
 +
 +        HostVO hostVO = _hostDao.findById(hostId);
 +
 +        if (hostVO == null) {
 +            return;
 +        }
 +
 +        if (!storageUtil.managedStoragePoolCanScale(storagePoolVO, hostVO.getClusterId(), hostVO.getId())) {
 +            throw new CloudRuntimeException("Insufficient number of available " + getNameOfClusteredFileSystem(hostVO));
 +        }
 +    }
 +
 +    private String getNameOfClusteredFileSystem(HostVO hostVO) {
 +        HypervisorType hypervisorType = hostVO.getHypervisorType();
 +
 +        if (HypervisorType.XenServer.equals(hypervisorType)) {
 +            return "SRs";
 +        }
 +
 +        if (HypervisorType.VMware.equals(hypervisorType)) {
 +            return "datastores";
 +        }
 +
 +        return "clustered file systems";
 +    }
 +
 +    private VolumeVO sendAttachVolumeCommand(UserVmVO vm, VolumeVO volumeToAttach, Long deviceId) {
 +        String errorMsg = "Failed to attach volume " + volumeToAttach.getName() + " to VM " + vm.getHostName();
 +        boolean sendCommand = vm.getState() == State.Running;
 +        AttachAnswer answer = null;
 +        Long hostId = vm.getHostId();
 +
 +        if (hostId == null) {
 +            hostId = vm.getLastHostId();
 +
 +            HostVO host = _hostDao.findById(hostId);
 +
 +            if (host != null && host.getHypervisorType() == HypervisorType.VMware) {
 +                sendCommand = true;
 +            }
 +        }
 +
 +        HostVO host = null;
 +        StoragePoolVO volumeToAttachStoragePool = _storagePoolDao.findById(volumeToAttach.getPoolId());
 +
 +        if (hostId != null) {
 +            host = _hostDao.findById(hostId);
 +
 +            if (host != null && host.getHypervisorType() == HypervisorType.XenServer && volumeToAttachStoragePool != null && volumeToAttachStoragePool.isManaged()) {
 +                sendCommand = true;
 +            }
 +        }
 +
 +        verifyManagedStorage(volumeToAttachStoragePool.getId(), hostId);
 +
 +        // volumeToAttachStoragePool should be null if the VM we are attaching the disk to has never been started before
 +        DataStore dataStore = volumeToAttachStoragePool != null ? dataStoreMgr.getDataStore(volumeToAttachStoragePool.getId(), DataStoreRole.Primary) : null;
 +
 +        checkAndSetAttaching(volumeToAttach.getId(), hostId);
 +
 +        boolean attached = false;
 +        try {
 +            // if we don't have a host, the VM we are attaching the disk to has never been started before
 +            if (host != null) {
 +                try {
 +                    volService.grantAccess(volFactory.getVolume(volumeToAttach.getId()), host, dataStore);
 +                } catch (Exception e) {
 +                    volService.revokeAccess(volFactory.getVolume(volumeToAttach.getId()), host, dataStore);
 +
 +                    throw new CloudRuntimeException(e.getMessage());
 +                }
 +            }
 +
 +            if (sendCommand) {
 +                if (host != null && host.getHypervisorType() == HypervisorType.KVM && volumeToAttachStoragePool.isManaged() && volumeToAttach.getPath() == null) {
 +                    volumeToAttach.setPath(volumeToAttach.get_iScsiName());
 +
 +                    _volsDao.update(volumeToAttach.getId(), volumeToAttach);
 +                }
 +
 +                DataTO volTO = volFactory.getVolume(volumeToAttach.getId()).getTO();
 +
 +                deviceId = getDeviceId(vm, deviceId);
 +
 +                DiskTO disk = storageMgr.getDiskWithThrottling(volTO, volumeToAttach.getVolumeType(), deviceId, volumeToAttach.getPath(), vm.getServiceOfferingId(),
 +                        volumeToAttach.getDiskOfferingId());
 +
 +                AttachCommand cmd = new AttachCommand(disk, vm.getInstanceName());
 +
 +                ChapInfo chapInfo = volService.getChapInfo(volFactory.getVolume(volumeToAttach.getId()), dataStore);
 +
 +                Map<String, String> details = new HashMap<String, String>();
 +
 +                disk.setDetails(details);
 +
 +                details.put(DiskTO.MANAGED, String.valueOf(volumeToAttachStoragePool.isManaged()));
 +                details.put(DiskTO.STORAGE_HOST, volumeToAttachStoragePool.getHostAddress());
 +                details.put(DiskTO.STORAGE_PORT, String.valueOf(volumeToAttachStoragePool.getPort()));
 +                details.put(DiskTO.VOLUME_SIZE, String.valueOf(volumeToAttach.getSize()));
 +                details.put(DiskTO.IQN, volumeToAttach.get_iScsiName());
 +                details.put(DiskTO.MOUNT_POINT, volumeToAttach.get_iScsiName());
 +                details.put(DiskTO.PROTOCOL_TYPE, (volumeToAttach.getPoolType() != null) ? volumeToAttach.getPoolType().toString() : null);
 +
 +                if (chapInfo != null) {
 +                    details.put(DiskTO.CHAP_INITIATOR_USERNAME, chapInfo.getInitiatorUsername());
 +                    details.put(DiskTO.CHAP_INITIATOR_SECRET, chapInfo.getInitiatorSecret());
 +                    details.put(DiskTO.CHAP_TARGET_USERNAME, chapInfo.getTargetUsername());
 +                    details.put(DiskTO.CHAP_TARGET_SECRET, chapInfo.getTargetSecret());
 +                }
 +                _userVmDao.loadDetails(vm);
 +                Map<String, String> controllerInfo = new HashMap<String, String>();
 +                controllerInfo.put(VmDetailConstants.ROOT_DISK_CONTROLLER, vm.getDetail(VmDetailConstants.ROOT_DISK_CONTROLLER));
 +                controllerInfo.put(VmDetailConstants.DATA_DISK_CONTROLLER, vm.getDetail(VmDetailConstants.DATA_DISK_CONTROLLER));
 +                cmd.setControllerInfo(controllerInfo);
 +                s_logger.debug("Attach volume id:" + volumeToAttach.getId() + " on VM id:" + vm.getId() + " has controller info:" + controllerInfo);
 +
 +                try {
 +                    answer = (AttachAnswer)_agentMgr.send(hostId, cmd);
 +                } catch (Exception e) {
 +                    if (host != null) {
 +                        volService.revokeAccess(volFactory.getVolume(volumeToAttach.getId()), host, dataStore);
 +                    }
 +                    throw new CloudRuntimeException(errorMsg + " due to: " + e.getMessage());
 +                }
 +            }
 +
 +            if (!sendCommand || (answer != null && answer.getResult())) {
 +                // Mark the volume as attached
 +                if (sendCommand) {
 +                    DiskTO disk = answer.getDisk();
 +
 +                    _volsDao.attachVolume(volumeToAttach.getId(), vm.getId(), disk.getDiskSeq());
 +
 +                    volumeToAttach = _volsDao.findById(volumeToAttach.getId());
 +
 +                    if (volumeToAttachStoragePool.isManaged() && volumeToAttach.getPath() == null) {
 +                        volumeToAttach.setPath(answer.getDisk().getPath());
 +
 +                        _volsDao.update(volumeToAttach.getId(), volumeToAttach);
 +                    }
 +                } else {
 +                    deviceId = getDeviceId(vm, deviceId);
 +
 +                    _volsDao.attachVolume(volumeToAttach.getId(), vm.getId(), deviceId);
 +
 +                    volumeToAttach = _volsDao.findById(volumeToAttach.getId());
 +
 +                    if (vm.getHypervisorType() == HypervisorType.KVM && volumeToAttachStoragePool.isManaged() && volumeToAttach.getPath() == null) {
 +                        volumeToAttach.setPath(volumeToAttach.get_iScsiName());
 +
 +                        _volsDao.update(volumeToAttach.getId(), volumeToAttach);
 +                    }
 +                }
 +
 +                // insert record for disk I/O statistics
 +                VmDiskStatisticsVO diskstats = _vmDiskStatsDao.findBy(vm.getAccountId(), vm.getDataCenterId(), vm.getId(), volumeToAttach.getId());
 +                if (diskstats == null) {
 +                    diskstats = new VmDiskStatisticsVO(vm.getAccountId(), vm.getDataCenterId(), vm.getId(), volumeToAttach.getId());
 +                    _vmDiskStatsDao.persist(diskstats);
 +                }
 +
 +                attached = true;
 +            } else {
 +                if (answer != null) {
 +                    String details = answer.getDetails();
 +                    if (details != null && !details.isEmpty()) {
 +                        errorMsg += "; " + details;
 +                    }
 +                }
 +                if (host != null) {
 +                    volService.revokeAccess(volFactory.getVolume(volumeToAttach.getId()), host, dataStore);
 +                }
 +                throw new CloudRuntimeException(errorMsg);
 +            }
 +        } finally {
 +            Volume.Event ev = Volume.Event.OperationFailed;
 +            VolumeInfo volInfo = volFactory.getVolume(volumeToAttach.getId());
 +            if (attached) {
 +                ev = Volume.Event.OperationSucceeded;
 +                s_logger.debug("Volume: " + volInfo.getName() + " successfully attached to VM: " + volInfo.getAttachedVmName());
 +            } else {
 +                s_logger.debug("Volume: " + volInfo.getName() + " failed to attach to VM: " + volInfo.getAttachedVmName());
 +            }
 +            volInfo.stateTransit(ev);
 +        }
 +        return _volsDao.findById(volumeToAttach.getId());
 +    }
 +
 +    private int getMaxDataVolumesSupported(UserVmVO vm) {
 +        Long hostId = vm.getHostId();
 +        if (hostId == null) {
 +            hostId = vm.getLastHostId();
 +        }
 +        HostVO host = _hostDao.findById(hostId);
 +        Integer maxDataVolumesSupported = null;
 +        if (host != null) {
 +            _hostDao.loadDetails(host);
 +            maxDataVolumesSupported = _hypervisorCapabilitiesDao.getMaxDataVolumesLimit(host.getHypervisorType(), host.getDetail("product_version"));
 +        }
 +        if (maxDataVolumesSupported == null || maxDataVolumesSupported.intValue() <= 0) {
 +            maxDataVolumesSupported = 6; // 6 data disks by default if nothing
 +            // is specified in
 +            // 'hypervisor_capabilities' table
 +        }
 +
 +        return maxDataVolumesSupported.intValue();
 +    }
 +
 +    private Long getDeviceId(UserVmVO vm, Long deviceId) {
 +        // allocate deviceId
 +        int maxDevices = getMaxDataVolumesSupported(vm) + 2; // add 2 to consider devices root volume and cdrom
 +        int maxDeviceId = maxDevices - 1;
 +        List<VolumeVO> vols = _volsDao.findByInstance(vm.getId());
 +        if (deviceId != null) {
 +            if (deviceId.longValue() < 0 || deviceId.longValue() > maxDeviceId || deviceId.longValue() == 3) {
 +                throw new RuntimeException("deviceId should be 0,1,2,4-" + maxDeviceId);
 +            }
 +            for (VolumeVO vol : vols) {
 +                if (vol.getDeviceId().equals(deviceId)) {
 +                    throw new RuntimeException("deviceId " + deviceId + " is used by vm " + vm.getId());
 +                }
 +            }
 +        } else {
 +            // allocate deviceId here
 +            List<String> devIds = new ArrayList<String>();
 +            for (int i = 1; i <= maxDeviceId; i++) {
 +                devIds.add(String.valueOf(i));
 +            }
 +            devIds.remove("3");
 +            for (VolumeVO vol : vols) {
 +                devIds.remove(vol.getDeviceId().toString().trim());
 +            }
 +            if (devIds.isEmpty()) {
 +                throw new RuntimeException("All device Ids are used by vm " + vm.getId());
 +            }
 +            deviceId = Long.parseLong(devIds.iterator().next());
 +        }
 +
 +        return deviceId;
 +    }
 +
 +    @Override
 +    public boolean configure(String name, Map<String, Object> params) {
 +        String maxVolumeSizeInGbString = _configDao.getValue(Config.MaxVolumeSize.toString());
 +        _maxVolumeSizeInGb = NumbersUtil.parseLong(maxVolumeSizeInGbString, 2000);
 +        return true;
 +    }
 +
 +    public List<StoragePoolAllocator> getStoragePoolAllocators() {
 +        return _storagePoolAllocators;
 +    }
 +
 +    @Inject
 +    public void setStoragePoolAllocators(List<StoragePoolAllocator> storagePoolAllocators) {
 +        _storagePoolAllocators = storagePoolAllocators;
 +    }
 +
 +    public class VmJobVolumeUrlOutcome extends OutcomeImpl<String> {
 +        public VmJobVolumeUrlOutcome(final AsyncJob job) {
 +            super(String.class, job, VmJobCheckInterval.value(), new Predicate() {
 +                @Override
 +                public boolean checkCondition() {
 +                    AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
 +                    assert (jobVo != null);
 +                    if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) {
 +                        return true;
 +                    }
 +
 +                    return false;
 +                }
 +            }, AsyncJob.Topics.JOB_STATE);
 +        }
 +    }
 +
 +    public class VmJobVolumeOutcome extends OutcomeImpl<Volume> {
 +        private long _volumeId;
 +
 +        public VmJobVolumeOutcome(final AsyncJob job, final long volumeId) {
 +            super(Volume.class, job, VmJobCheckInterval.value(), new Predicate() {
 +                @Override
 +                public boolean checkCondition() {
 +                    AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
 +                    assert (jobVo != null);
 +                    if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) {
 +                        return true;
 +                    }
 +
 +                    return false;
 +                }
 +            }, AsyncJob.Topics.JOB_STATE);
 +            _volumeId = volumeId;
 +        }
 +
 +        @Override
 +        protected Volume retrieve() {
 +            return _volsDao.findById(_volumeId);
 +        }
 +    }
 +
 +    public class VmJobSnapshotOutcome extends OutcomeImpl<Snapshot> {
 +        private long _snapshotId;
 +
 +        public VmJobSnapshotOutcome(final AsyncJob job, final long snapshotId) {
 +            super(Snapshot.class, job, VmJobCheckInterval.value(), new Predicate() {
 +                @Override
 +                public boolean checkCondition() {
 +                    AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class, job.getId());
 +                    assert (jobVo != null);
 +                    if (jobVo == null || jobVo.getStatus() != JobInfo.Status.IN_PROGRESS) {
 +                        return true;
 +                    }
 +
 +                    return false;
 +                }
 +            }, AsyncJob.Topics.JOB_STATE);
 +            _snapshotId = snapshotId;
 +        }
 +
 +        @Override
 +        protected Snapshot retrieve() {
 +            return _snapshotDao.findById(_snapshotId);
 +        }
 +    }
 +
 +    public Outcome<Volume> attachVolumeToVmThroughJobQueue(final Long vmId, final Long volumeId, final Long deviceId) {
 +
 +        final CallContext context = CallContext.current();
 +        final User callingUser = context.getCallingUser();
 +        final Account callingAccount = context.getCallingAccount();
 +
 +        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +
 +        VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
 +        workJob.setCmd(VmWorkAttachVolume.class.getName());
 +
 +        workJob.setAccountId(callingAccount.getId());
 +        workJob.setUserId(callingUser.getId());
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(vm.getId());
 +        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
 +
 +        // save work context info (there are some duplications)
 +        VmWorkAttachVolume workInfo = new VmWorkAttachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, deviceId);
 +        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 +
 +        _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
 +
 +        AsyncJobVO jobVo = _jobMgr.getAsyncJob(workJob.getId());
 +        s_logger.debug("New job " + workJob.getId() + ", result field: " + jobVo.getResult());
 +
 +        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 +
 +        return new VmJobVolumeOutcome(workJob, volumeId);
 +    }
 +
 +    public Outcome<Volume> detachVolumeFromVmThroughJobQueue(final Long vmId, final Long volumeId) {
 +
 +        final CallContext context = CallContext.current();
 +        final User callingUser = context.getCallingUser();
 +        final Account callingAccount = context.getCallingAccount();
 +
 +        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +
 +        VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
 +        workJob.setCmd(VmWorkDetachVolume.class.getName());
 +
 +        workJob.setAccountId(callingAccount.getId());
 +        workJob.setUserId(callingUser.getId());
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(vm.getId());
 +        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
 +
 +        // save work context info (there are some duplications)
 +        VmWorkDetachVolume workInfo = new VmWorkDetachVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId);
 +        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 +
 +        _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
 +
 +        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 +
 +        return new VmJobVolumeOutcome(workJob, volumeId);
 +    }
 +
 +    public Outcome<Volume> resizeVolumeThroughJobQueue(final Long vmId, final long volumeId, final long currentSize, final long newSize, final Long newMinIops, final Long newMaxIops,
 +            final Integer newHypervisorSnapshotReserve, final Long newServiceOfferingId, final boolean shrinkOk) {
 +        final CallContext context = CallContext.current();
 +        final User callingUser = context.getCallingUser();
 +        final Account callingAccount = context.getCallingAccount();
 +
 +        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +
 +        VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
 +        workJob.setCmd(VmWorkResizeVolume.class.getName());
 +
 +        workJob.setAccountId(callingAccount.getId());
 +        workJob.setUserId(callingUser.getId());
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(vm.getId());
 +        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
 +
 +        // save work context info (there are some duplications)
 +        VmWorkResizeVolume workInfo = new VmWorkResizeVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, currentSize, newSize,
 +                newMinIops, newMaxIops, newHypervisorSnapshotReserve, newServiceOfferingId, shrinkOk);
 +        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 +
 +        _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
 +
 +        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 +
 +        return new VmJobVolumeOutcome(workJob, volumeId);
 +    }
 +
 +    public Outcome<String> extractVolumeThroughJobQueue(final Long vmId, final long volumeId, final long zoneId) {
 +
 +        final CallContext context = CallContext.current();
 +        final User callingUser = context.getCallingUser();
 +        final Account callingAccount = context.getCallingAccount();
 +
 +        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +
 +        VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
 +        workJob.setCmd(VmWorkExtractVolume.class.getName());
 +
 +        workJob.setAccountId(callingAccount.getId());
 +        workJob.setUserId(callingUser.getId());
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(vm.getId());
 +        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
 +
 +        // save work context info (there are some duplications)
 +        VmWorkExtractVolume workInfo = new VmWorkExtractVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, zoneId);
 +        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 +
 +        _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
 +
 +        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 +
 +        return new VmJobVolumeUrlOutcome(workJob);
 +    }
 +
 +    private Outcome<Volume> migrateVolumeThroughJobQueue(VMInstanceVO vm, VolumeVO vol, StoragePool destPool, boolean liveMigrateVolume, DiskOfferingVO newDiskOffering) {
 +        CallContext context = CallContext.current();
 +        User callingUser = context.getCallingUser();
 +        Account callingAccount = context.getCallingAccount();
 +
 +        VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
 +        workJob.setCmd(VmWorkMigrateVolume.class.getName());
 +
 +        workJob.setAccountId(callingAccount.getId());
 +        workJob.setUserId(callingUser.getId());
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(vm.getId());
 +        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
 +
 +        Long newDiskOfferingId = newDiskOffering != null ? newDiskOffering.getId() : null;
 +
 +        // save work context info (there are some duplications)
 +        VmWorkMigrateVolume workInfo = new VmWorkMigrateVolume(callingUser.getId(), callingAccount.getId(), vm.getId(), VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, vol.getId(), destPool.getId(),
 +                liveMigrateVolume, newDiskOfferingId);
 +        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 +
 +        _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
 +
 +        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 +
 +        return new VmJobVolumeOutcome(workJob, vol.getId());
 +    }
 +
 +    public Outcome<Snapshot> takeVolumeSnapshotThroughJobQueue(final Long vmId, final Long volumeId, final Long policyId, final Long snapshotId, final Long accountId, final boolean quiesceVm,
 +            final Snapshot.LocationType locationType, final boolean asyncBackup) {
 +
 +        final CallContext context = CallContext.current();
 +        final User callingUser = context.getCallingUser();
 +        final Account callingAccount = context.getCallingAccount();
 +
 +        final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
 +
 +        VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
 +        workJob.setCmd(VmWorkTakeVolumeSnapshot.class.getName());
 +
 +        workJob.setAccountId(callingAccount.getId());
 +        workJob.setUserId(callingUser.getId());
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(vm.getId());
 +        workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
 +
 +        // save work context info (there are some duplications)
 +        VmWorkTakeVolumeSnapshot workInfo = new VmWorkTakeVolumeSnapshot(callingUser.getId(), accountId != null ? accountId : callingAccount.getId(), vm.getId(),
 +                VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, policyId, snapshotId, quiesceVm, locationType, asyncBackup);
 +        workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
 +
 +        _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
 +
 +        AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
 +
 +        return new VmJobSnapshotOutcome(workJob, snapshotId);
 +    }
 +
 +    @ReflectionUse
 +    private Pair<JobInfo.Status, String> orchestrateExtractVolume(VmWorkExtractVolume work) throws Exception {
 +        String volUrl = orchestrateExtractVolume(work.getVolumeId(), work.getZoneId());
 +        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(volUrl));
 +    }
 +
 +    @ReflectionUse
 +    private Pair<JobInfo.Status, String> orchestrateAttachVolumeToVM(VmWorkAttachVolume work) throws Exception {
 +        Volume vol = orchestrateAttachVolumeToVM(work.getVmId(), work.getVolumeId(), work.getDeviceId());
 +
 +        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(new Long(vol.getId())));
 +    }
 +
 +    @ReflectionUse
 +    private Pair<JobInfo.Status, String> orchestrateDetachVolumeFromVM(VmWorkDetachVolume work) throws Exception {
 +        Volume vol = orchestrateDetachVolumeFromVM(work.getVmId(), work.getVolumeId());
 +        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(new Long(vol.getId())));
 +    }
 +
 +    @ReflectionUse
 +    private Pair<JobInfo.Status, String> orchestrateResizeVolume(VmWorkResizeVolume work) throws Exception {
 +        Volume vol = orchestrateResizeVolume(work.getVolumeId(), work.getCurrentSize(), work.getNewSize(), work.getNewMinIops(), work.getNewMaxIops(), work.getNewHypervisorSnapshotReserve(),
 +                work.getNewServiceOfferingId(), work.isShrinkOk());
 +        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(new Long(vol.getId())));
 +    }
 +
 +    @ReflectionUse
 +    private Pair<JobInfo.Status, String> orchestrateMigrateVolume(VmWorkMigrateVolume work) throws Exception {
 +        VolumeVO volume = _volsDao.findById(work.getVolumeId());
 +        StoragePoolVO targetStoragePool = _storagePoolDao.findById(work.getDestPoolId());
 +        DiskOfferingVO newDiskOffering = _diskOfferingDao.findById(work.getNewDiskOfferingId());
 +
 +        Volume newVol = orchestrateMigrateVolume(volume, targetStoragePool, work.isLiveMigrate(), newDiskOffering);
 +
 +        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(newVol.getId()));
 +    }
 +
 +    @ReflectionUse
 +    private Pair<JobInfo.Status, String> orchestrateTakeVolumeSnapshot(VmWorkTakeVolumeSnapshot work) throws Exception {
 +        Account account = _accountDao.findById(work.getAccountId());
 +        orchestrateTakeVolumeSnapshot(work.getVolumeId(), work.getPolicyId(), work.getSnapshotId(), account, work.isQuiesceVm(), work.getLocationType(), work.isAsyncBackup());
 +        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(work.getSnapshotId()));
 +    }
 +
 +    @Override
 +    public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
 +        return _jobHandlerProxy.handleVmWorkJob(work);
 +    }
 +
 +    private VmWorkJobVO createPlaceHolderWork(long instanceId) {
 +        VmWorkJobVO workJob = new VmWorkJobVO("");
 +
 +        workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
 +        workJob.setCmd("");
 +        workJob.setCmdInfo("");
 +
 +        workJob.setAccountId(0);
 +        workJob.setUserId(0);
 +        workJob.setStep(VmWorkJobVO.Step.Starting);
 +        workJob.setVmType(VirtualMachine.Type.Instance);
 +        workJob.setVmInstanceId(instanceId);
 +        workJob.setInitMsid(ManagementServerNode.getManagementServerId());
 +
 +        _workJobDao.persist(workJob);
 +
 +        return workJob;
 +    }
 +
 +}