You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by nv...@apache.org on 2021/07/17 01:38:11 UTC

[cloudstack] branch main updated: Added disk provisioning type support for VMWare (#4640)

This is an automated email from the ASF dual-hosted git repository.

nvazquez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 96c9c5a  Added disk provisioning type support for VMWare (#4640)
96c9c5a is described below

commit 96c9c5a5e2251174da7da881ab1d0f8e1f323aac
Author: Spaceman1984 <49...@users.noreply.github.com>
AuthorDate: Sat Jul 17 03:37:42 2021 +0200

    Added disk provisioning type support for VMWare (#4640)
    
    * Added disk provisioning type support for VMWare
    
    * Review changes
    
    * Fixed unit test
    
    * Review changes
    
    * Added missing licenses
    
    * Review changes
    
    * Update StoragePoolInfo.java
    
    Removed white space
    
    * Review change - Getting disk provisioning strictness setting using the zone id and not the pool id
    
    * Delete __init__.py
    
    * Merge fix
    
    * Fixed failing test
    
    * Added comment about parameters
    
    * Added error log when update fails
    
    * Added exception when using API
    
    * Ordering storage pool selection to prefer thick disk capable pools if available
    
    * Removed unused parameter
    
    * Reordering changes
    
    * Returning storage pool details after update
    
    * Removed multiple pool update, updated marvin test, removed duplicate enum
    
    * Removed comment
    
    * Removed unused import
    
    * Removed for loop
    
    * Added missing return statements for failed checks
    
    * Class name change
    
    * Null pointer
    
    * Added more info when a deployment fails
    
    * Null pointer
    
    * Update api/src/main/java/org/apache/cloudstack/api/BaseListCmd.java
    
    Co-authored-by: dahn <da...@gmail.com>
    
    * Small bug fix on API response and added missing bracket
    
    * Removed datastore cluster code
    
    * Removed unused imports, added missing signature
    
    * Removed duplicate config key
    
    * Revert "Added more info when a deployment fails"
    
    This reverts commit 2486db78dca8e034d8ad2386174dfb11004ce654.
    
    Co-authored-by: dahn <da...@gmail.com>
---
 api/src/main/java/com/cloud/storage/Storage.java   |  10 ++
 .../java/com/cloud/storage/StorageService.java     |   4 +-
 .../org/apache/cloudstack/api/BaseListCmd.java     |   2 +-
 .../command/admin/storage/ListStoragePoolsCmd.java |   5 +-
 .../storage/UpdateStorageCapabilitiesCmd.java      |  86 ++++++++++++
 .../api/GetStoragePoolCapabilitiesAnswer.java      |  47 +++++++
 .../api/GetStoragePoolCapabilitiesCommand.java     |  37 +++++
 .../cloudstack/storage/to/PrimaryDataStoreTO.java  |   9 ++
 .../api/storage/StoragePoolAllocator.java          |   2 +-
 .../java/com/cloud/storage/StorageManager.java     |   5 +-
 .../engine/orchestration/VolumeOrchestrator.java   |   2 +-
 .../storage/datastore/db/StoragePoolDetailVO.java  |   4 +
 .../storage/motion/AncientDataMotionStrategy.java  |  25 ++--
 .../motion/AncientDataMotionStrategyTest.java      |   9 +-
 .../allocator/AbstractStoragePoolAllocator.java    |  59 +++++++-
 .../datastore/provider/DefaultHostListener.java    |  11 +-
 .../hypervisor/vmware/resource/VmwareResource.java |  66 +++++++++
 .../storage/resource/VmwareStorageProcessor.java   |  47 ++++---
 .../VmwareStorageSubsystemCommandHandler.java      |   4 +
 .../vmware/resource/VmwareResourceTest.java        |   2 +-
 .../CloudStackPrimaryDataStoreLifeCycleImpl.java   |   2 +
 .../java/com/cloud/api/query/QueryManagerImpl.java |  16 ++-
 .../com/cloud/server/ManagementServerImpl.java     |   4 +-
 .../java/com/cloud/storage/StorageManagerImpl.java |  81 ++++++++++-
 test/integration/smoke/test_disk_offerings.py      |   3 +-
 .../smoke/test_disk_provisioning_types.py          | 149 +++++++++++++++++++++
 tools/apidoc/gen_toc.py                            |   2 +-
 tools/marvin/marvin/lib/base.py                    |   6 +
 .../hypervisor/vmware/mo/VirtualMachineMO.java     |  33 ++++-
 .../vmware/mo/VirtualStorageObjectManagerMO.java   |  16 ++-
 30 files changed, 694 insertions(+), 54 deletions(-)

diff --git a/api/src/main/java/com/cloud/storage/Storage.java b/api/src/main/java/com/cloud/storage/Storage.java
index 362cc2c..4079359 100644
--- a/api/src/main/java/com/cloud/storage/Storage.java
+++ b/api/src/main/java/com/cloud/storage/Storage.java
@@ -76,6 +76,16 @@ public class Storage {
 
     }
 
+    public static enum Capability {
+        HARDWARE_ACCELERATION("HARDWARE_ACCELERATION");
+
+        private final String capability;
+
+        private Capability(String capability) {
+            this.capability = capability;
+        }
+    }
+
     public static enum ProvisioningType {
         THIN("thin"),
         SPARSE("sparse"),
diff --git a/api/src/main/java/com/cloud/storage/StorageService.java b/api/src/main/java/com/cloud/storage/StorageService.java
index 4b18739..bb086ad 100644
--- a/api/src/main/java/com/cloud/storage/StorageService.java
+++ b/api/src/main/java/com/cloud/storage/StorageService.java
@@ -26,8 +26,8 @@ import org.apache.cloudstack.api.command.admin.storage.CreateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeleteImageStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeletePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.DeleteSecondaryStagingStoreCmd;
-import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.storage.SyncStoragePoolCmd;
+import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
 
 import com.cloud.exception.DiscoveryException;
 import com.cloud.exception.InsufficientCapacityException;
@@ -105,6 +105,8 @@ public interface StorageService {
 
     ImageStore updateImageStoreStatus(Long id, Boolean readonly);
 
+    void updateStorageCapabilities(Long poolId, boolean failOnChecks);
+
     StoragePool syncStoragePool(SyncStoragePoolCmd cmd);
 
 }
diff --git a/api/src/main/java/org/apache/cloudstack/api/BaseListCmd.java b/api/src/main/java/org/apache/cloudstack/api/BaseListCmd.java
index 36fa36f..bcebbb8 100644
--- a/api/src/main/java/org/apache/cloudstack/api/BaseListCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/BaseListCmd.java
@@ -94,7 +94,7 @@ public abstract class BaseListCmd extends BaseCmd implements IBaseListCmd {
         if (pageSizeInt != null) {
             defaultPageSize = pageSizeInt.longValue();
         }
-        if (defaultPageSize.longValue() == s_pageSizeUnlimited) {
+        if (s_pageSizeUnlimited.equals(defaultPageSize)) {
             defaultPageSize = null;
         }
 
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
index ed123db..2450ac7 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListStoragePoolsCmd.java
@@ -99,7 +99,10 @@ public class ListStoragePoolsCmd extends BaseListCmd {
         return id;
     }
 
-    /////////////////////////////////////////////////////
+    public void setId(Long id) {
+        this.id = id;
+    }
+/////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
 
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStorageCapabilitiesCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStorageCapabilitiesCmd.java
new file mode 100644
index 0000000..b6fb03d
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateStorageCapabilitiesCmd.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.command.admin.storage;
+
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.NetworkRuleConflictException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.exception.ResourceUnavailableException;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.api.response.StoragePoolResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
+
+import java.util.Locale;
+
+@APICommand(name = UpdateStorageCapabilitiesCmd.APINAME, description = "Syncs capabilities of storage pools",
+        responseObject = StoragePoolResponse.class,
+        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.16.0")
+public class UpdateStorageCapabilitiesCmd extends BaseCmd {
+    public static final String APINAME = "updateStorageCapabilities";
+    private static final Logger LOG = Logger.getLogger(UpdateStorageCapabilitiesCmd.class.getName());
+
+    /////////////////////////////////////////////////////
+    //////////////// API parameters /////////////////////
+    /////////////////////////////////////////////////////
+
+    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = StoragePoolResponse.class, required = true, description = "Storage pool id")
+    private Long poolId;
+
+    /////////////////////////////////////////////////////
+    /////////////////// Accessors ///////////////////////
+    /////////////////////////////////////////////////////
+
+    public Long getPoolId() {
+        return poolId;
+    }
+
+    public void setPoolId(Long poolId) {
+        this.poolId = poolId;
+    }
+
+    /////////////////////////////////////////////////////
+    /////////////// API Implementation///////////////////
+    /////////////////////////////////////////////////////
+
+
+    @Override
+    public void execute() throws ResourceUnavailableException, InsufficientCapacityException, ServerApiException, ConcurrentOperationException, ResourceAllocationException, NetworkRuleConflictException {
+        _storageService.updateStorageCapabilities(poolId, true);
+        ListStoragePoolsCmd listStoragePoolCmd = new ListStoragePoolsCmd();
+        listStoragePoolCmd.setId(poolId);
+        ListResponse<StoragePoolResponse> listResponse = _queryService.searchForStoragePools(listStoragePoolCmd);
+        listResponse.setResponseName(getCommandName());
+        this.setResponseObject(listResponse);
+    }
+
+    @Override
+    public String getCommandName() {
+        return APINAME.toLowerCase(Locale.ROOT) + "response" ;
+    }
+
+    @Override
+    public long getEntityOwnerId() {
+        return CallContext.current().getCallingAccountId();
+    }
+}
diff --git a/core/src/main/java/com/cloud/agent/api/GetStoragePoolCapabilitiesAnswer.java b/core/src/main/java/com/cloud/agent/api/GetStoragePoolCapabilitiesAnswer.java
new file mode 100644
index 0000000..65db9b6
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/GetStoragePoolCapabilitiesAnswer.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.api;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GetStoragePoolCapabilitiesAnswer extends Answer {
+
+    private Map<String, String> poolDetails;
+
+    public GetStoragePoolCapabilitiesAnswer(GetStoragePoolCapabilitiesCommand cmd) {
+        super(cmd);
+        poolDetails = new HashMap<>();
+    }
+
+    public void setResult(boolean result){
+        this.result = result;
+    }
+
+    public void setDetails(String details){
+        this.details = details;
+    }
+
+    public Map<String, String> getPoolDetails() {
+        return poolDetails;
+    }
+
+    public void setPoolDetails(Map<String, String> poolDetails) {
+        this.poolDetails = poolDetails;
+    }
+
+}
diff --git a/core/src/main/java/com/cloud/agent/api/GetStoragePoolCapabilitiesCommand.java b/core/src/main/java/com/cloud/agent/api/GetStoragePoolCapabilitiesCommand.java
new file mode 100644
index 0000000..b7dd731
--- /dev/null
+++ b/core/src/main/java/com/cloud/agent/api/GetStoragePoolCapabilitiesCommand.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent.api;
+
+import com.cloud.agent.api.to.StorageFilerTO;
+
+public class GetStoragePoolCapabilitiesCommand extends Command {
+
+    public StorageFilerTO getPool() {
+        return pool;
+    }
+
+    public void setPool(StorageFilerTO pool) {
+        this.pool = pool;
+    }
+
+    private StorageFilerTO pool;
+
+    @Override
+    public boolean executeInSequence() {
+        return false;
+    }
+}
diff --git a/core/src/main/java/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java b/core/src/main/java/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
index 0bb5b79..9df2a6c 100644
--- a/core/src/main/java/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
+++ b/core/src/main/java/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
@@ -52,6 +52,7 @@ public class PrimaryDataStoreTO implements DataStoreTO {
     private Map<String, String> details;
     private static final String pathSeparator = "/";
     private Boolean fullCloneFlag;
+    private Boolean diskProvisioningStrictnessFlag;
     private final boolean isManaged;
 
     public PrimaryDataStoreTO(PrimaryDataStore dataStore) {
@@ -163,4 +164,12 @@ public class PrimaryDataStoreTO implements DataStoreTO {
     public boolean isManaged() {
         return isManaged;
     }
+
+    public Boolean getDiskProvisioningStrictnessFlag() {
+        return diskProvisioningStrictnessFlag;
+    }
+
+    public void setDiskProvisioningStrictnessFlag(Boolean diskProvisioningStrictnessFlag) {
+        this.diskProvisioningStrictnessFlag = diskProvisioningStrictnessFlag;
+    }
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StoragePoolAllocator.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StoragePoolAllocator.java
index c8fcf5f..fde71fe 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StoragePoolAllocator.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/StoragePoolAllocator.java
@@ -60,5 +60,5 @@ public interface StoragePoolAllocator extends Adapter {
 
     static int RETURN_UPTO_ALL = -1;
 
-    List<StoragePool> reorderPools(List<StoragePool> pools, VirtualMachineProfile vmProfile, DeploymentPlan plan);
+    List<StoragePool> reorderPools(List<StoragePool> pools, VirtualMachineProfile vmProfile, DeploymentPlan plan, DiskProfile dskCh);
 }
diff --git a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
index b9a45f0..3d6d062 100644
--- a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
+++ b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
@@ -145,7 +145,10 @@ public interface StorageManager extends StorageService {
 
     ConfigKey<Integer> MaxDataMigrationWaitTime = new ConfigKey<Integer>("Advanced", Integer.class, "max.data.migration.wait.time", "15",
             "Maximum wait time for a data migration task before spawning a new SSVM", false, ConfigKey.Scope.Global);
-
+    ConfigKey<Boolean> DiskProvisioningStrictness = new ConfigKey<Boolean>("Storage", Boolean.class, "disk.provisioning.type.strictness", "false",
+            "If set to true, the disk is created only when there is a suitable storage pool that supports the disk provisioning type specified by the service/disk offering. " +
+                    "If set to false, the disk is created with a disk provisioning type supported by the pool. Default value is false, and this is currently supported for VMware only.",
+            true, ConfigKey.Scope.Zone);
     /**
      * Returns a comma separated list of tags for the specified storage pool
      * @param poolId
diff --git a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
index 1c4aead..5a42b3a 100644
--- a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
+++ b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/VolumeOrchestrator.java
@@ -349,7 +349,7 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
         VirtualMachineProfile profile = new VirtualMachineProfileImpl(vm);
         for (StoragePoolAllocator allocator : _storagePoolAllocators) {
             DataCenterDeployment plan = new DataCenterDeployment(dc.getId(), podId, clusterId, hostId, null, null);
-            final List<StoragePool> poolList = allocator.reorderPools(suitablePools, profile, plan);
+            final List<StoragePool> poolList = allocator.reorderPools(suitablePools, profile, plan, null);
 
             if (poolList != null && !poolList.isEmpty()) {
                 return (StoragePool)dataStoreMgr.getDataStore(poolList.get(0).getId(), DataStoreRole.Primary);
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/StoragePoolDetailVO.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/StoragePoolDetailVO.java
index 8a746ff..8c1428b 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/StoragePoolDetailVO.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/StoragePoolDetailVO.java
@@ -70,6 +70,10 @@ public class StoragePoolDetailVO implements ResourceDetail {
         return name;
     }
 
+    public void setValue(String value) {
+        this.value = value;
+    }
+
     @Override
     public String getValue() {
         return value;
diff --git a/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java b/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java
index c49ffba..e9c6fc1 100644
--- a/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java
+++ b/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategy.java
@@ -86,6 +86,9 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
     @Inject
     StorageCacheManager cacheMgr;
 
+    @Inject
+    StorageManager storageManager;
+
     @Override
     public StrategyPriority canHandle(DataObject srcData, DataObject destData) {
         return StrategyPriority.DEFAULT;
@@ -156,7 +159,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
                 srcForCopy = cacheData = cacheMgr.createCacheObject(srcData, destScope);
             }
 
-            CopyCommand cmd = new CopyCommand(srcForCopy.getTO(), addFullCloneFlagOnVMwareDest(destData.getTO()), primaryStorageDownloadWait,
+            CopyCommand cmd = new CopyCommand(srcForCopy.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(destData.getTO()), primaryStorageDownloadWait,
                     VirtualMachineManager.ExecuteInSequence.value());
             EndPoint ep = destHost != null ? RemoteHostEndPoint.getHypervisorHostEndPoint(destHost) : selector.select(srcForCopy, destData);
             if (ep == null) {
@@ -210,18 +213,20 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
      * @param dataTO Dest data store TO
      * @return dataTO including fullCloneFlag, if provided
      */
-    protected DataTO addFullCloneFlagOnVMwareDest(DataTO dataTO) {
+    protected DataTO addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(DataTO dataTO) {
         if (dataTO != null && dataTO.getHypervisorType().equals(Hypervisor.HypervisorType.VMware)){
             DataStoreTO dataStoreTO = dataTO.getDataStore();
             if (dataStoreTO != null && dataStoreTO instanceof PrimaryDataStoreTO){
                 PrimaryDataStoreTO primaryDataStoreTO = (PrimaryDataStoreTO) dataStoreTO;
-                Boolean value = CapacityManager.VmwareCreateCloneFull.valueIn(primaryDataStoreTO.getId());
-                primaryDataStoreTO.setFullCloneFlag(value);
+                primaryDataStoreTO.setFullCloneFlag(CapacityManager.VmwareCreateCloneFull.valueIn(primaryDataStoreTO.getId()));
+                StoragePool pool = storageManager.getStoragePool(primaryDataStoreTO.getId());
+                primaryDataStoreTO.setDiskProvisioningStrictnessFlag(storageManager.DiskProvisioningStrictness.valueIn(pool.getDataCenterId()));
             }
         }
         return dataTO;
     }
 
+
     protected Answer copyObject(DataObject srcData, DataObject destData) {
         return copyObject(srcData, destData, null);
     }
@@ -278,7 +283,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
                 ep = selector.select(srcData, volObj);
             }
 
-            CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneFlagOnVMwareDest(volObj.getTO()), _createVolumeFromSnapshotWait, VirtualMachineManager.ExecuteInSequence.value());
+            CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(volObj.getTO()), _createVolumeFromSnapshotWait, VirtualMachineManager.ExecuteInSequence.value());
             Answer answer = null;
             if (ep == null) {
                 String errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
@@ -301,7 +306,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
     }
 
     protected Answer cloneVolume(DataObject template, DataObject volume) {
-        CopyCommand cmd = new CopyCommand(template.getTO(), addFullCloneFlagOnVMwareDest(volume.getTO()), 0, VirtualMachineManager.ExecuteInSequence.value());
+        CopyCommand cmd = new CopyCommand(template.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(volume.getTO()), 0, VirtualMachineManager.ExecuteInSequence.value());
         try {
             EndPoint ep = selector.select(volume.getDataStore());
             Answer answer = null;
@@ -373,7 +378,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
 
                 objOnImageStore.processEvent(Event.CopyingRequested);
 
-                CopyCommand cmd = new CopyCommand(objOnImageStore.getTO(), addFullCloneFlagOnVMwareDest(destData.getTO()), _copyvolumewait, VirtualMachineManager.ExecuteInSequence.value());
+                CopyCommand cmd = new CopyCommand(objOnImageStore.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(destData.getTO()), _copyvolumewait, VirtualMachineManager.ExecuteInSequence.value());
                 EndPoint ep = selector.select(objOnImageStore, destData);
                 if (ep == null) {
                     String errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
@@ -526,7 +531,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
             ep = selector.select(srcData, destData);
         }
 
-        CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneFlagOnVMwareDest(destData.getTO()), _createprivatetemplatefromsnapshotwait, VirtualMachineManager.ExecuteInSequence.value());
+        CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(destData.getTO()), _createprivatetemplatefromsnapshotwait, VirtualMachineManager.ExecuteInSequence.value());
         Answer answer = null;
         if (ep == null) {
             String errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
@@ -562,7 +567,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
                 Scope selectedScope = pickCacheScopeForCopy(srcData, destData);
                 cacheData = cacheMgr.getCacheObject(srcData, selectedScope);
 
-                CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneFlagOnVMwareDest(destData.getTO()), _backupsnapshotwait, VirtualMachineManager.ExecuteInSequence.value());
+                CopyCommand cmd = new CopyCommand(srcData.getTO(), addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(destData.getTO()), _backupsnapshotwait, VirtualMachineManager.ExecuteInSequence.value());
                 cmd.setCacheTO(cacheData.getTO());
                 cmd.setOptions(options);
                 EndPoint ep = selector.select(srcData, destData);
@@ -574,7 +579,7 @@ public class AncientDataMotionStrategy implements DataMotionStrategy {
                     answer = ep.sendMessage(cmd);
                 }
             } else {
-                addFullCloneFlagOnVMwareDest(destData.getTO());
+                addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(destData.getTO());
                 CopyCommand cmd = new CopyCommand(srcData.getTO(), destData.getTO(), _backupsnapshotwait, VirtualMachineManager.ExecuteInSequence.value());
                 cmd.setOptions(options);
                 EndPoint ep = selector.select(srcData, destData, StorageAction.BACKUPSNAPSHOT);
diff --git a/engine/storage/datamotion/src/test/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategyTest.java b/engine/storage/datamotion/src/test/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategyTest.java
index dccb6b4..cd46fc5 100755
--- a/engine/storage/datamotion/src/test/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategyTest.java
+++ b/engine/storage/datamotion/src/test/java/org/apache/cloudstack/storage/motion/AncientDataMotionStrategyTest.java
@@ -27,6 +27,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.any;
 
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePool;
 import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
 import org.junit.Before;
@@ -57,6 +59,10 @@ public class AncientDataMotionStrategyTest {
     PrimaryDataStoreTO dataStoreTO;
     @Mock
     ConfigKey<Boolean> vmwareKey;
+    @Mock
+    StorageManager storageManager;
+    @Mock
+    StoragePool storagePool;
 
     private static final long POOL_ID = 1l;
     private static final Boolean FULL_CLONE_FLAG = true;
@@ -72,6 +78,7 @@ public class AncientDataMotionStrategyTest {
         when(dataTO.getHypervisorType()).thenReturn(HypervisorType.VMware);
         when(dataTO.getDataStore()).thenReturn(dataStoreTO);
         when(dataStoreTO.getId()).thenReturn(POOL_ID);
+        when(storageManager.getStoragePool(POOL_ID)).thenReturn(storagePool);
     }
 
     private void replaceVmwareCreateCloneFullField() throws Exception {
@@ -86,7 +93,7 @@ public class AncientDataMotionStrategyTest {
 
     @Test
     public void testAddFullCloneFlagOnVMwareDest(){
-        strategy.addFullCloneFlagOnVMwareDest(dataTO);
+        strategy.addFullCloneAndDiskprovisiongStrictnessFlagOnVMwareDest(dataTO);
         verify(dataStoreTO).setFullCloneFlag(FULL_CLONE_FLAG);
     }
 
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java
index 10d39ee..3b07fe9 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/allocator/AbstractStoragePoolAllocator.java
@@ -26,12 +26,17 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
+import com.cloud.exception.StorageUnavailableException;
+import com.cloud.storage.StoragePoolStatus;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.log4j.Logger;
+
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
-import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
-import org.apache.log4j.Logger;
 
 import com.cloud.capacity.Capacity;
 import com.cloud.capacity.dao.CapacityDao;
@@ -39,12 +44,10 @@ import com.cloud.dc.ClusterVO;
 import com.cloud.dc.dao.ClusterDao;
 import com.cloud.deploy.DeploymentPlan;
 import com.cloud.deploy.DeploymentPlanner.ExcludeList;
-import com.cloud.exception.StorageUnavailableException;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.storage.Storage;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePool;
-import com.cloud.storage.StoragePoolStatus;
 import com.cloud.storage.StorageUtil;
 import com.cloud.storage.Volume;
 import com.cloud.storage.dao.VolumeDao;
@@ -68,6 +71,7 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
     @Inject private ClusterDao clusterDao;
     @Inject private StorageManager storageMgr;
     @Inject private StorageUtil storageUtil;
+    @Inject private StoragePoolDetailsDao storagePoolDetailsDao;
 
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
@@ -96,7 +100,7 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
     @Override
     public List<StoragePool> allocateToPool(DiskProfile dskCh, VirtualMachineProfile vmProfile, DeploymentPlan plan, ExcludeList avoid, int returnUpTo, boolean bypassStorageTypeCheck) {
         List<StoragePool> pools = select(dskCh, vmProfile, plan, avoid, returnUpTo, bypassStorageTypeCheck);
-        return reorderPools(pools, vmProfile, plan);
+        return reorderPools(pools, vmProfile, plan, dskCh);
     }
 
     protected List<StoragePool> reorderPoolsByCapacity(DeploymentPlan plan,
@@ -163,7 +167,7 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
     }
 
     @Override
-    public List<StoragePool> reorderPools(List<StoragePool> pools, VirtualMachineProfile vmProfile, DeploymentPlan plan) {
+    public List<StoragePool> reorderPools(List<StoragePool> pools, VirtualMachineProfile vmProfile, DeploymentPlan plan, DiskProfile dskCh) {
         if (pools == null) {
             return null;
         }
@@ -180,9 +184,36 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
         } else if(allocationAlgorithm.equals("firstfitleastconsumed")){
             pools = reorderPoolsByCapacity(plan, pools);
         }
+
+        if (vmProfile.getHypervisorType() == HypervisorType.VMware &&
+                !storageMgr.DiskProvisioningStrictness.valueIn(plan.getDataCenterId())) {
+            pools = reorderPoolsByDiskProvisioningType(pools, dskCh);
+        }
+
         return pools;
     }
 
+    private List<StoragePool> reorderPoolsByDiskProvisioningType(List<StoragePool> pools, DiskProfile diskProfile) {
+        if (diskProfile != null && diskProfile.getProvisioningType() != null && !diskProfile.getProvisioningType().equals(Storage.ProvisioningType.THIN)) {
+            List<StoragePool> reorderedPools = new ArrayList<>();
+            int preferredIndex = 0;
+            for (StoragePool pool : pools) {
+                StoragePoolDetailVO hardwareAcceleration = storagePoolDetailsDao.findDetail(pool.getId(), Storage.Capability.HARDWARE_ACCELERATION.toString());
+                if (pool.getPoolType() == Storage.StoragePoolType.NetworkFilesystem &&
+                        (hardwareAcceleration == null || !hardwareAcceleration.getValue().equals("true"))) {
+                    // add to the bottom of the list
+                    reorderedPools.add(pool);
+                } else {
+                    // add to the top of the list
+                    reorderedPools.add(preferredIndex++, pool);
+                }
+            }
+            return reorderedPools;
+        } else {
+            return pools;
+        }
+    }
+
     protected boolean filter(ExcludeList avoid, StoragePool pool, DiskProfile dskCh, DeploymentPlan plan) {
 
         if (s_logger.isDebugEnabled()) {
@@ -211,6 +242,10 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
             return false;
         }
 
+        if (!checkDiskProvisioningSupport(dskCh, pool)) {
+            return false;
+        }
+
         if(!checkHypervisorCompatibility(dskCh.getHypervisorType(), dskCh.getType(), pool.getPoolType())){
             return false;
         }
@@ -253,6 +288,18 @@ public abstract class AbstractStoragePoolAllocator extends AdapterBase implement
         return storageMgr.storagePoolHasEnoughIops(requestVolumes, pool) && storageMgr.storagePoolHasEnoughSpace(requestVolumes, pool, plan.getClusterId());
     }
 
+    private boolean checkDiskProvisioningSupport(DiskProfile dskCh, StoragePool pool) {
+        if (dskCh.getHypervisorType() != null && dskCh.getHypervisorType() == HypervisorType.VMware && pool.getPoolType() == Storage.StoragePoolType.NetworkFilesystem &&
+                storageMgr.DiskProvisioningStrictness.valueIn(pool.getDataCenterId())) {
+            StoragePoolDetailVO hardwareAcceleration = storagePoolDetailsDao.findDetail(pool.getId(), Storage.Capability.HARDWARE_ACCELERATION.toString());
+            if (dskCh.getProvisioningType() == null || !dskCh.getProvisioningType().equals(Storage.ProvisioningType.THIN) &&
+                    (hardwareAcceleration == null || hardwareAcceleration.getValue() == null || !hardwareAcceleration.getValue().equals("true"))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /*
     Check StoragePool and Volume type compatibility for the hypervisor
      */
diff --git a/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/provider/DefaultHostListener.java b/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/provider/DefaultHostListener.java
index eb2262f..30cd7ac 100644
--- a/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/provider/DefaultHostListener.java
+++ b/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/datastore/provider/DefaultHostListener.java
@@ -26,10 +26,11 @@ import com.cloud.alert.AlertManager;
 import com.cloud.exception.StorageConflictException;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.Storage;
+import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.StorageService;
 import com.cloud.storage.dao.StoragePoolHostDao;
-import com.cloud.storage.StorageManager;
 import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.HypervisorHostListener;
@@ -59,6 +60,8 @@ public class DefaultHostListener implements HypervisorHostListener {
     StoragePoolDetailsDao storagePoolDetailsDao;
     @Inject
     StorageManager storageManager;
+    @Inject
+    StorageService storageService;
 
     @Override
     public boolean hostAdded(long hostId) {
@@ -67,7 +70,7 @@ public class DefaultHostListener implements HypervisorHostListener {
 
     @Override
     public boolean hostConnect(long hostId, long poolId) throws StorageConflictException {
-        StoragePool pool = (StoragePool)this.dataStoreMgr.getDataStore(poolId, DataStoreRole.Primary);
+        StoragePool pool = (StoragePool) this.dataStoreMgr.getDataStore(poolId, DataStoreRole.Primary);
         ModifyStoragePoolCommand cmd = new ModifyStoragePoolCommand(true, pool);
         final Answer answer = agentMgr.easySend(hostId, cmd);
 
@@ -84,7 +87,7 @@ public class DefaultHostListener implements HypervisorHostListener {
 
         assert (answer instanceof ModifyStoragePoolAnswer) : "Well, now why won't you actually return the ModifyStoragePoolAnswer when it's ModifyStoragePoolCommand? Pool=" +
             pool.getId() + "Host=" + hostId;
-        ModifyStoragePoolAnswer mspAnswer = (ModifyStoragePoolAnswer)answer;
+        ModifyStoragePoolAnswer mspAnswer = (ModifyStoragePoolAnswer) answer;
         if (mspAnswer.getLocalDatastoreName() != null && pool.isShared()) {
             String datastoreName = mspAnswer.getLocalDatastoreName();
             List<StoragePoolVO> localStoragePools = this.primaryStoreDao.listLocalStoragePoolByPath(pool.getDataCenterId(), datastoreName);
@@ -103,6 +106,8 @@ public class DefaultHostListener implements HypervisorHostListener {
             storageManager.syncDatastoreClusterStoragePool(poolId, ((ModifyStoragePoolAnswer) answer).getDatastoreClusterChildren(), hostId);
         }
 
+        storageService.updateStorageCapabilities(poolId, false);
+
         s_logger.info("Connection established between storage pool " + pool + " and host " + hostId);
         return true;
     }
diff --git a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
index e2a7969..3e234f2 100644
--- a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
+++ b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
@@ -48,6 +48,8 @@ import java.util.stream.Collectors;
 import javax.naming.ConfigurationException;
 import javax.xml.datatype.XMLGregorianCalendar;
 
+import com.cloud.agent.api.GetStoragePoolCapabilitiesAnswer;
+import com.cloud.agent.api.GetStoragePoolCapabilitiesCommand;
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.storage.command.CopyCommand;
 import org.apache.cloudstack.storage.command.StorageSubSystemCommand;
@@ -296,6 +298,8 @@ import com.vmware.vim25.DynamicProperty;
 import com.vmware.vim25.GuestInfo;
 import com.vmware.vim25.GuestNicInfo;
 import com.vmware.vim25.HostCapability;
+import com.vmware.vim25.HostConfigInfo;
+import com.vmware.vim25.HostFileSystemMountInfo;
 import com.vmware.vim25.HostHostBusAdapter;
 import com.vmware.vim25.HostInternetScsiHba;
 import com.vmware.vim25.HostPortGroupSpec;
@@ -505,6 +509,8 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
                 answer = execute((ModifyTargetsCommand) cmd);
             } else if (clz == ModifyStoragePoolCommand.class) {
                 answer = execute((ModifyStoragePoolCommand) cmd);
+            } else if (clz == GetStoragePoolCapabilitiesCommand.class) {
+                answer = execute((GetStoragePoolCapabilitiesCommand) cmd);
             } else if (clz == DeleteStoragePoolCommand.class) {
                 answer = execute((DeleteStoragePoolCommand) cmd);
             } else if (clz == CopyVolumeCommand.class) {
@@ -694,6 +700,9 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
                 if (dest.isFullCloneFlag() != null) {
                     paramsCopy.put(VmwareStorageProcessorConfigurableFields.FULL_CLONE_FLAG, dest.isFullCloneFlag().booleanValue());
                 }
+                if (dest.getDiskProvisioningStrictnessFlag() != null) {
+                    paramsCopy.put(VmwareStorageProcessorConfigurableFields.DISK_PROVISIONING_STRICTNESS, dest.getDiskProvisioningStrictnessFlag().booleanValue());
+                }
             }
         }
         return paramsCopy;
@@ -5045,6 +5054,63 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
         }
     }
 
+    protected Answer execute(GetStoragePoolCapabilitiesCommand cmd) {
+
+        try {
+
+            VmwareHypervisorHost hyperHost = getHyperHost(getServiceContext());
+
+            HostMO host = (HostMO) hyperHost;
+
+            StorageFilerTO pool = cmd.getPool();
+
+            ManagedObjectReference morDatastore = HypervisorHostHelper.findDatastoreWithBackwardsCompatibility(hyperHost, pool.getUuid());
+
+            if (morDatastore == null) {
+                morDatastore = hyperHost.mountDatastore((pool.getType() == StoragePoolType.VMFS || pool.getType() == StoragePoolType.PreSetup || pool.getType() == StoragePoolType.DatastoreCluster), pool.getHost(), pool.getPort(), pool.getPath(), pool.getUuid().replace("-", ""), true);
+            }
+
+            assert (morDatastore != null);
+
+            DatastoreMO dsMo = new DatastoreMO(getServiceContext(), morDatastore);
+
+            GetStoragePoolCapabilitiesAnswer answer = new GetStoragePoolCapabilitiesAnswer(cmd);
+
+            boolean hardwareAccelerationSupportForDataStore = getHardwareAccelerationSupportForDataStore(host.getMor(), dsMo.getName());
+            Map<String, String> poolDetails = answer.getPoolDetails();
+            poolDetails.put(Storage.Capability.HARDWARE_ACCELERATION.toString(), String.valueOf(hardwareAccelerationSupportForDataStore));
+            answer.setPoolDetails(poolDetails);
+            answer.setResult(true);
+
+            return answer;
+        } catch (Throwable e) {
+            if (e instanceof RemoteException) {
+                s_logger.warn("Encounter remote exception to vCenter, invalidate VMware session context");
+
+                invalidateServiceContext();
+            }
+
+            String msg = "GetStoragePoolCapabilitiesCommand failed due to " + VmwareHelper.getExceptionMessage(e);
+
+            s_logger.error(msg, e);
+            GetStoragePoolCapabilitiesAnswer answer = new GetStoragePoolCapabilitiesAnswer(cmd);
+            answer.setResult(false);
+            answer.setDetails(msg);
+            return answer;
+        }
+    }
+
+    private boolean getHardwareAccelerationSupportForDataStore(ManagedObjectReference host, String dataStoreName) throws Exception {
+        HostConfigInfo config = getServiceContext().getVimClient().getDynamicProperty(host, "config");
+        List<HostFileSystemMountInfo> mountInfoList = config.getFileSystemVolume().getMountInfo();
+        for (HostFileSystemMountInfo hostFileSystemMountInfo: mountInfoList) {
+            if ( hostFileSystemMountInfo.getVolume().getName().equals(dataStoreName) ) {
+                return hostFileSystemMountInfo.getVStorageSupport().equals("vStorageSupported");
+            }
+        }
+        return false;
+    }
+
     private void handleTargets(boolean add, ModifyTargetsCommand.TargetTypeToRemove targetTypeToRemove, boolean isRemoveAsync,
                                List<Map<String, String>> targets, List<HostMO> hosts) {
         if (targets != null && targets.size() > 0) {
diff --git a/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageProcessor.java b/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageProcessor.java
index da67137..7884a03 100644
--- a/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageProcessor.java
+++ b/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageProcessor.java
@@ -92,6 +92,7 @@ import com.cloud.serializer.GsonHelper;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.JavaStorageLayer;
 import com.cloud.storage.Storage.ImageFormat;
+import com.cloud.storage.Storage.ProvisioningType;
 import com.cloud.storage.StorageLayer;
 import com.cloud.storage.Volume;
 import com.cloud.storage.template.OVAProcessor;
@@ -128,7 +129,6 @@ import com.vmware.vim25.VirtualDeviceConfigSpec;
 import com.vmware.vim25.VirtualDeviceConfigSpecOperation;
 import com.vmware.vim25.VirtualDisk;
 import com.vmware.vim25.VirtualDiskFlatVer2BackingInfo;
-import com.vmware.vim25.VirtualDiskType;
 import com.vmware.vim25.VirtualMachineConfigSpec;
 import com.vmware.vim25.VmConfigInfo;
 import com.vmware.vim25.VmfsDatastoreExpandSpec;
@@ -137,7 +137,7 @@ import com.vmware.vim25.VmfsDatastoreOption;
 public class VmwareStorageProcessor implements StorageProcessor {
 
     public enum VmwareStorageProcessorConfigurableFields {
-        NFS_VERSION("nfsVersion"), FULL_CLONE_FLAG("fullCloneFlag");
+        NFS_VERSION("nfsVersion"), FULL_CLONE_FLAG("fullCloneFlag"), DISK_PROVISIONING_STRICTNESS("diskProvisioningStrictness");
 
         private String name;
 
@@ -156,6 +156,7 @@ public class VmwareStorageProcessor implements StorageProcessor {
 
     private final VmwareHostService hostService;
     private boolean _fullCloneFlag;
+    private boolean _diskProvisioningStrictness;
     private final VmwareStorageMount mountService;
     private final VmwareResource resource;
     private final Integer _timeout;
@@ -775,10 +776,10 @@ public class VmwareStorageProcessor implements StorageProcessor {
     }
 
     private boolean createVMFullClone(VirtualMachineMO vmTemplate, DatacenterMO dcMo, DatastoreMO dsMo, String vmdkName, ManagedObjectReference morDatastore,
-                                      ManagedObjectReference morPool) throws Exception {
+                                      ManagedObjectReference morPool, ProvisioningType diskProvisioningType) throws Exception {
         s_logger.info("creating full clone from template");
 
-        if (!vmTemplate.createFullClone(vmdkName, dcMo.getVmFolder(), morPool, morDatastore)) {
+        if (!vmTemplate.createFullClone(vmdkName, dcMo.getVmFolder(), morPool, morDatastore, diskProvisioningType)) {
             String msg = "Unable to create full clone from the template";
 
             s_logger.error(msg);
@@ -866,7 +867,7 @@ public class VmwareStorageProcessor implements StorageProcessor {
                     if (dsMo.getDatastoreType().equalsIgnoreCase("VVOL")) {
                         vmdkFileBaseName = cloneVMforVvols(context, hyperHost, template, vmTemplate, volume, dcMo, dsMo);
                     } else {
-                        vmdkFileBaseName = createVMFolderWithVMName(context, hyperHost, template, vmTemplate, volume, dcMo, dsMo, searchExcludedFolders);
+                        vmdkFileBaseName = createVMAndFolderWithVMName(context, hyperHost, template, vmTemplate, volume, dcMo, dsMo, searchExcludedFolders);
                     }
                 }
                 // restoreVM - move the new ROOT disk into corresponding VM folder
@@ -915,9 +916,12 @@ public class VmwareStorageProcessor implements StorageProcessor {
         if (volume.getVolumeType() == Volume.Type.DATADISK)
             vmName = volume.getName();
         if (!_fullCloneFlag) {
+            if (_diskProvisioningStrictness && volume.getProvisioningType() != ProvisioningType.THIN) {
+                throw new CloudRuntimeException("Unable to create linked clones with strict disk provisioning enabled");
+            }
             createVMLinkedClone(vmTemplate, dcMo, vmName, morDatastore, morPool);
         } else {
-            createVMFullClone(vmTemplate, dcMo, dsMo, vmName, morDatastore, morPool);
+            createVMFullClone(vmTemplate, dcMo, dsMo, vmName, morDatastore, morPool, volume.getProvisioningType());
         }
 
         VirtualMachineMO vmMo = new ClusterMO(context, morCluster).findVmOnHyperHost(vmName);
@@ -931,21 +935,24 @@ public class VmwareStorageProcessor implements StorageProcessor {
         return vmdkFileBaseName;
     }
 
-    private String createVMFolderWithVMName(VmwareContext context, VmwareHypervisorHost hyperHost, TemplateObjectTO template,
-                                            VirtualMachineMO vmTemplate, VolumeObjectTO volume, DatacenterMO dcMo, DatastoreMO dsMo,
-                                            String searchExcludedFolders) throws Exception {
+    private String createVMAndFolderWithVMName(VmwareContext context, VmwareHypervisorHost hyperHost, TemplateObjectTO template,
+                                               VirtualMachineMO vmTemplate, VolumeObjectTO volume, DatacenterMO dcMo, DatastoreMO dsMo,
+                                               String searchExcludedFolders) throws Exception {
         String vmdkName = volume.getName();
         try {
             ManagedObjectReference morDatastore = dsMo.getMor();
             ManagedObjectReference morPool = hyperHost.getHyperHostOwnerResourcePool();
             ManagedObjectReference morCluster = hyperHost.getHyperHostCluster();
-            if (template.getSize() != null){
+            if (template.getSize() != null) {
                 _fullCloneFlag = volume.getSize() > template.getSize() ? true : _fullCloneFlag;
             }
             if (!_fullCloneFlag) {
+                if (_diskProvisioningStrictness && volume.getProvisioningType() != ProvisioningType.THIN) {
+                    throw new CloudRuntimeException("Unable to create linked clones with strict disk provisioning enabled");
+                }
                 createVMLinkedClone(vmTemplate, dcMo, vmdkName, morDatastore, morPool);
             } else {
-                createVMFullClone(vmTemplate, dcMo, dsMo, vmdkName, morDatastore, morPool);
+                createVMFullClone(vmTemplate, dcMo, dsMo, vmdkName, morDatastore, morPool, volume.getProvisioningType());
             }
 
             VirtualMachineMO vmMo = new ClusterMO(context, morCluster).findVmOnHyperHost(vmdkName);
@@ -956,7 +963,7 @@ public class VmwareStorageProcessor implements StorageProcessor {
             String[] vmwareLayoutFilePair = VmwareStorageLayoutHelper.getVmdkFilePairDatastorePath(dsMo, vmdkName, vmdkFileBaseName, VmwareStorageLayoutType.VMWARE, !_fullCloneFlag);
             String[] legacyCloudStackLayoutFilePair = VmwareStorageLayoutHelper.getVmdkFilePairDatastorePath(dsMo, vmdkName, vmdkFileBaseName, VmwareStorageLayoutType.CLOUDSTACK_LEGACY, !_fullCloneFlag);
 
-            for (int i=0; i<vmwareLayoutFilePair.length; i++) {
+            for (int i = 0; i < vmwareLayoutFilePair.length; i++) {
                 dsMo.moveDatastoreFile(vmwareLayoutFilePair[i], dcMo.getMor(), dsMo.getMor(), legacyCloudStackLayoutFilePair[i], dcMo.getMor(), true);
             }
 
@@ -999,9 +1006,12 @@ public class VmwareStorageProcessor implements StorageProcessor {
             _fullCloneFlag = volume.getSize() > template.getSize() || _fullCloneFlag;
         }
         if (!_fullCloneFlag) {
+            if (_diskProvisioningStrictness && volume.getProvisioningType() != ProvisioningType.THIN) {
+                throw new CloudRuntimeException("Unable to create linked clones with strict disk provisioning enabled");
+            }
             createVMLinkedClone(vmMo, dcMo, cloneName, morDatastore, morPool);
         } else {
-            createVMFullClone(vmMo, dcMo, dsMo, cloneName, morDatastore, morPool);
+            createVMFullClone(vmMo, dcMo, dsMo, cloneName, morDatastore, morPool, volume.getProvisioningType());
         }
     }
 
@@ -2534,7 +2544,7 @@ public class VmwareStorageProcessor implements StorageProcessor {
 
             try {
                 VirtualStorageObjectManagerMO vStorageObjectManagerMO = new VirtualStorageObjectManagerMO(context);
-                VStorageObject virtualDisk = vStorageObjectManagerMO.createDisk(morDatastore, VirtualDiskType.THIN, volume.getSize(), volumeDatastorePath, volumeUuid);
+                VStorageObject virtualDisk = vStorageObjectManagerMO.createDisk(morDatastore, volume.getProvisioningType(), volume.getSize(), volumeDatastorePath, volumeUuid);
                 DatastoreFile file = new DatastoreFile(((BaseConfigInfoDiskFileBackingInfo)virtualDisk.getConfig().getBacking()).getFilePath());
                 newVol.setPath(file.getFileBaseName());
                 newVol.setSize(volume.getSize());
@@ -3889,6 +3899,11 @@ public class VmwareStorageProcessor implements StorageProcessor {
         s_logger.debug("VmwareProcessor instance - create full clone = " + (value ? "TRUE" : "FALSE"));
     }
 
+    void setDiskProvisioningStrictness(boolean value){
+        this._diskProvisioningStrictness = value;
+        s_logger.debug("VmwareProcessor instance - diskProvisioningStrictness = " + (value ? "TRUE" : "FALSE"));
+    }
+
     @Override
     public Answer handleDownloadTemplateToPrimaryStorage(DirectDownloadCommand cmd) {
         return null;
@@ -3950,9 +3965,9 @@ public class VmwareStorageProcessor implements StorageProcessor {
             }
             s_logger.info("Cloning VM " + cloneName + " from template " + templateName + " into datastore " + templatePrimaryStoreUuid);
             if (!_fullCloneFlag) {
-                createVMLinkedClone(templateMo, dcMo, cloneName, morDatastore, morPool);
+                createVMLinkedClone(templateMo, dcMo, cloneName, morDatastore, morPool, null);
             } else {
-                createVMFullClone(templateMo, dcMo, dsMo, cloneName, morDatastore, morPool);
+                createVMFullClone(templateMo, dcMo, dsMo, cloneName, morDatastore, morPool, null);
             }
             VirtualMachineMO vm = dcMo.findVm(cloneName);
             if (vm == null) {
diff --git a/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageSubsystemCommandHandler.java b/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageSubsystemCommandHandler.java
index 122a034..15caa1d 100644
--- a/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageSubsystemCommandHandler.java
+++ b/plugins/hypervisors/vmware/src/main/java/com/cloud/storage/resource/VmwareStorageSubsystemCommandHandler.java
@@ -83,6 +83,10 @@ public class VmwareStorageSubsystemCommandHandler extends StorageSubsystemComman
                 boolean fullClone = (boolean) params.get(key);
                 processor.setFullCloneFlag(fullClone);
                 break;
+            case DISK_PROVISIONING_STRICTNESS:
+                boolean diskProvisioningStrictness = (boolean) params.get(key);
+                processor.setDiskProvisioningStrictness(diskProvisioningStrictness);
+                break;
             default:
                 s_logger.error("Unknown reconfigurable field " + key.getName() + " for VmwareStorageProcessor");
                 return false;
diff --git a/plugins/hypervisors/vmware/src/test/java/com/cloud/hypervisor/vmware/resource/VmwareResourceTest.java b/plugins/hypervisors/vmware/src/test/java/com/cloud/hypervisor/vmware/resource/VmwareResourceTest.java
index 3dc8d01..7a2a45d 100644
--- a/plugins/hypervisors/vmware/src/test/java/com/cloud/hypervisor/vmware/resource/VmwareResourceTest.java
+++ b/plugins/hypervisors/vmware/src/test/java/com/cloud/hypervisor/vmware/resource/VmwareResourceTest.java
@@ -363,7 +363,7 @@ public class VmwareResourceTest {
         EnumMap<VmwareStorageProcessorConfigurableFields, Object> params2 = _resource.examineStorageSubSystemCommandFullCloneFlagForVmware(storageCmd, params);
         verify(destDataTO).getDataStore();
         verify(destDataStoreTO, times(2)).isFullCloneFlag();
-        assertEquals(1, params2.size());
+        assertEquals(2, params2.size());
         assertEquals(FULL_CLONE_FLAG, params2.get(VmwareStorageProcessorConfigurableFields.FULL_CLONE_FLAG));
     }
 
diff --git a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
index 1b2e41a..f39b170 100644
--- a/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
+++ b/plugins/storage/volume/default/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/CloudStackPrimaryDataStoreLifeCycleImpl.java
@@ -359,6 +359,7 @@ public class CloudStackPrimaryDataStoreLifeCycleImpl implements PrimaryDataStore
         parameters.setName(poolName);
         parameters.setClusterId(clusterId);
         parameters.setProviderName(providerName);
+        parameters.setHypervisorType(hypervisorType);
 
         return dataStoreHelper.createPrimaryDataStore(parameters);
     }
@@ -404,6 +405,7 @@ public class CloudStackPrimaryDataStoreLifeCycleImpl implements PrimaryDataStore
         CreateStoragePoolCommand cmd = new CreateStoragePoolCommand(true, pool);
         final Answer answer = agentMgr.easySend(hostId, cmd);
         if (answer != null && answer.getResult()) {
+            storageMgr.updateStorageCapabilities(pool.getId(), false);
             return true;
         } else {
             primaryDataStoreDao.expunge(pool.getId());
diff --git a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
index 0c8a7f7..d034c4e 100644
--- a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
+++ b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
@@ -119,6 +119,8 @@ import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.query.QueryService;
 import org.apache.cloudstack.resourcedetail.dao.DiskOfferingDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.commons.collections.CollectionUtils;
@@ -422,6 +424,9 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
     private PrimaryDataStoreDao _storagePoolDao;
 
     @Inject
+    private StoragePoolDetailsDao _storagePoolDetailsDao;
+
+    @Inject
     private ProjectInvitationDao projectInvitationDao;
 
     @Inject
@@ -2420,7 +2425,16 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
             if (store != null) {
                 DataStoreDriver driver = store.getDriver();
                 if (driver != null && driver.getCapabilities() != null) {
-                    poolResponse.setCaps(driver.getCapabilities());
+                    Map<String, String> caps = driver.getCapabilities();
+                    if (Storage.StoragePoolType.NetworkFilesystem.toString().equals(poolResponse.getType()) &&
+                        HypervisorType.VMware.toString().equals(poolResponse.getHypervisor())) {
+                        StoragePoolVO pool = _storagePoolDao.findPoolByUUID(poolResponse.getId());
+                        StoragePoolDetailVO detail = _storagePoolDetailsDao.findDetail(pool.getId(), Storage.Capability.HARDWARE_ACCELERATION.toString());
+                        if (detail != null) {
+                            caps.put(Storage.Capability.HARDWARE_ACCELERATION.toString(), detail.getValue());
+                        }
+                    }
+                    poolResponse.setCaps(caps);
                 }
             }
         }
diff --git a/server/src/main/java/com/cloud/server/ManagementServerImpl.java b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
index 1d8c740..c6f7c44 100644
--- a/server/src/main/java/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
@@ -206,8 +206,8 @@ import org.apache.cloudstack.api.command.admin.storage.MigrateSecondaryStorageDa
 import org.apache.cloudstack.api.command.admin.storage.PreparePrimaryStorageForMaintenanceCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateCloudToUseObjectStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateImageStoreCmd;
+import org.apache.cloudstack.api.command.admin.storage.UpdateStorageCapabilitiesCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
-import org.apache.cloudstack.api.command.admin.storage.SyncStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd;
 import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd;
 import org.apache.cloudstack.api.command.admin.systemvm.DestroySystemVmCmd;
@@ -3037,7 +3037,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
         cmdList.add(FindStoragePoolsForMigrationCmd.class);
         cmdList.add(PreparePrimaryStorageForMaintenanceCmd.class);
         cmdList.add(UpdateStoragePoolCmd.class);
-        cmdList.add(SyncStoragePoolCmd.class);
+        cmdList.add(UpdateStorageCapabilitiesCmd.class);
         cmdList.add(UpdateImageStoreCmd.class);
         cmdList.add(DestroySystemVmCmd.class);
         cmdList.add(ListSystemVMsCmd.class);
diff --git a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
index 743e90f..8cb586d 100644
--- a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
@@ -43,6 +43,8 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 
+import com.cloud.agent.api.GetStoragePoolCapabilitiesAnswer;
+import com.cloud.agent.api.GetStoragePoolCapabilitiesCommand;
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.command.admin.storage.CancelPrimaryStorageMaintenanceCmd;
 import org.apache.cloudstack.api.command.admin.storage.CreateSecondaryStagingStoreCmd;
@@ -98,6 +100,7 @@ import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
@@ -316,8 +319,12 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     @Inject
     SnapshotService _snapshotService;
     @Inject
+    public StorageService storageService;
+    @Inject
     StoragePoolTagsDao _storagePoolTagsDao;
     @Inject
+    PrimaryDataStoreDao primaryStoreDao;
+    @Inject
     DiskOfferingDetailsDao _diskOfferingDetailsDao;
     @Inject
     ServiceOfferingDetailsDao _serviceOfferingDetailsDao;
@@ -2773,6 +2780,77 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         return imageStoreVO;
     }
 
+    /**
+     * @param poolId - Storage pool id for pool to update.
+     * @param failOnChecks - If true, throw an error if pool type and state checks fail.
+     */
+    @Override
+    public void updateStorageCapabilities(Long poolId, boolean failOnChecks) {
+        StoragePoolVO pool = _storagePoolDao.findById(poolId);
+
+        if (pool == null) {
+            throw new CloudRuntimeException("Primary storage not found for id: " + poolId);
+        }
+
+        // Only checking NFS for now - required for disk provisioning type support for vmware.
+        if (pool.getPoolType() != StoragePoolType.NetworkFilesystem) {
+            if (failOnChecks) {
+                throw new CloudRuntimeException("Storage capabilities update only supported on NFS storage mounted.");
+            }
+            return;
+        }
+
+        if (pool.getStatus() != StoragePoolStatus.Initialized && pool.getStatus() != StoragePoolStatus.Up) {
+            if (failOnChecks){
+                throw new CloudRuntimeException("Primary storage is not in the right state to update capabilities");
+            }
+            return;
+        }
+
+        HypervisorType hypervisor = pool.getHypervisor();
+
+        if (hypervisor == null){
+            if (pool.getClusterId() != null) {
+                ClusterVO cluster = _clusterDao.findById(pool.getClusterId());
+                hypervisor = cluster.getHypervisorType();
+            }
+        }
+
+        if (!HypervisorType.VMware.equals(hypervisor)) {
+            if (failOnChecks) {
+                throw new CloudRuntimeException("Storage capabilities update only supported on VMWare.");
+            }
+            return;
+        }
+
+        // find the host
+        List<Long> poolIds = new ArrayList<Long>();
+        poolIds.add(pool.getId());
+        List<Long> hosts = _storagePoolHostDao.findHostsConnectedToPools(poolIds);
+        if (hosts.size() > 0) {
+            GetStoragePoolCapabilitiesCommand cmd = new GetStoragePoolCapabilitiesCommand();
+            cmd.setPool(new StorageFilerTO(pool));
+            GetStoragePoolCapabilitiesAnswer answer = (GetStoragePoolCapabilitiesAnswer) _agentMgr.easySend(hosts.get(0), cmd);
+            if (answer.getPoolDetails() != null && answer.getPoolDetails().containsKey(Storage.Capability.HARDWARE_ACCELERATION.toString())) {
+                StoragePoolDetailVO hardwareAccelerationSupported = _storagePoolDetailsDao.findDetail(pool.getId(), Storage.Capability.HARDWARE_ACCELERATION.toString());
+                if (hardwareAccelerationSupported == null) {
+                    StoragePoolDetailVO storagePoolDetailVO = new StoragePoolDetailVO(pool.getId(), Storage.Capability.HARDWARE_ACCELERATION.toString(), answer.getPoolDetails().get(Storage.Capability.HARDWARE_ACCELERATION.toString()), false);
+                    _storagePoolDetailsDao.persist(storagePoolDetailVO);
+                } else {
+                    hardwareAccelerationSupported.setValue(answer.getPoolDetails().get(Storage.Capability.HARDWARE_ACCELERATION.toString()));
+                    _storagePoolDetailsDao.update(hardwareAccelerationSupported.getId(), hardwareAccelerationSupported);
+                }
+            } else {
+                if (answer != null && !answer.getResult()) {
+                    s_logger.error("Failed to update storage pool capabilities: " + answer.getDetails());
+                    if (failOnChecks) {
+                        throw new CloudRuntimeException(answer.getDetails());
+                    }
+                }
+            }
+        }
+    }
+
     private void duplicateCacheStoreRecordsToRegionStore(long storeId) {
         _templateStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
         _snapshotStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
@@ -3132,7 +3210,8 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
                 STORAGE_POOL_CLIENT_MAX_CONNECTIONS,
                 PRIMARY_STORAGE_DOWNLOAD_WAIT,
                 SecStorageMaxMigrateSessions,
-                MaxDataMigrationWaitTime
+                MaxDataMigrationWaitTime,
+                DiskProvisioningStrictness
         };
     }
 
diff --git a/test/integration/smoke/test_disk_offerings.py b/test/integration/smoke/test_disk_offerings.py
index d0d3433..660dd30 100644
--- a/test/integration/smoke/test_disk_offerings.py
+++ b/test/integration/smoke/test_disk_offerings.py
@@ -19,7 +19,6 @@
 #Import Local Modules
 import marvin
 from marvin.cloudstackTestCase import *
-from marvin.cloudstackAPI import *
 from marvin.lib.utils import *
 from marvin.lib.base import *
 from marvin.lib.common import *
@@ -134,7 +133,7 @@ class TestCreateDiskOffering(cloudstackTestCase):
     @attr(hypervisor="kvm")
     @attr(tags = ["advanced", "basic", "eip", "sg", "advancedns", "simulator", "smoke"])
     def test_04_create_fat_type_disk_offering(self):
-        """Test to create  a sparse type disk offering"""
+        """Test to create a sparse type disk offering"""
 
         # Validate the following:
         # 1. createDiskOfferings should return valid info for new offering
diff --git a/test/integration/smoke/test_disk_provisioning_types.py b/test/integration/smoke/test_disk_provisioning_types.py
new file mode 100644
index 0000000..c87b2e4
--- /dev/null
+++ b/test/integration/smoke/test_disk_provisioning_types.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from marvin.cloudstackTestCase import cloudstackTestCase, unittest
+from marvin.lib.utils import cleanup_resources
+from marvin.lib.base import DiskOffering, Iso, Account, VirtualMachine, ServiceOffering, Volume
+from marvin.codes import FAILED
+from marvin.lib.common import list_disk_offering, get_zone, get_suitable_test_template, get_domain
+from marvin.cloudstackAPI import listStoragePools, updateStorageCapabilities
+from nose.plugins.attrib import attr
+
+
+class TestDiskProvisioningTypes(cloudstackTestCase):
+
+    def setUp(self):
+
+        if self.testClient.getHypervisorInfo().lower() != "vmware":
+            raise unittest.SkipTest("VMWare tests only valid on VMWare hypervisor")
+
+        self.services = self.testClient.getParsedTestDataConfig()
+        self.apiclient = self.testClient.getApiClient()
+        self.dbclient = self.testClient.getDbConnection()
+        self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
+        self.domain = get_domain(self.apiclient)
+        self.services['mode'] = self.zone.networktype
+        self.hypervisor = self.hypervisor = self.testClient.getHypervisorInfo()
+
+        template = get_suitable_test_template(
+            self.apiclient,
+            self.zone.id,
+            self.services["ostype"],
+            self.hypervisor
+        )
+
+        if template == FAILED:
+            assert False, "get_suitable_test_template() failed to return template with description %s" % self.services["ostype"]
+
+        self.account = Account.create(
+            self.apiclient,
+            self.services["account"],
+            domainid=self.domain.id
+        )
+
+        self.services["small"]["zoneid"] = self.zone.id
+        self.services["small"]["template"] = template.id
+
+        self.services["iso1"]["zoneid"] = self.zone.id
+
+        iso = Iso.create(
+            self.apiclient,
+            self.services["iso1"],
+            account=self.account.name,
+            domainid=self.account.domainid
+        )
+
+        self.cleanup = [
+            self.account
+        ]
+
+
+    def tearDown(self):
+        cleanup_resources(self.apiclient, self.cleanup)
+
+    @attr(tags=["advanced", "basic", "eip", "sg", "advancedns", "smoke"], required_hardware="false")
+    def test_01_vm_with_thin_disk_offering(self):
+        self.runner("thin")
+
+    @attr(tags=["advanced", "basic", "eip", "sg", "advancedns", "smoke"], required_hardware="false")
+    def test_02_vm_with_fat_disk_offering(self):
+        self.runner("fat")
+
+    @attr(tags=["advanced", "basic", "eip", "sg", "advancedns", "smoke"], required_hardware="false")
+    def test_03_vm_with_sparse_disk_offering(self):
+        self.runner("sparse")
+
+    @attr(tags=["advanced", "basic", "eip", "sg", "advancedns", "smoke"], required_hardware="false")
+    def test_05_update_cmd(self):
+        cmd = listStoragePools.listStoragePoolsCmd()
+        storagePools = self.apiclient.listStoragePools(cmd)
+
+        for pool in storagePools:
+            if pool.type == 'NetworkFilesystem':
+                cmd = updateStorageCapabilities.updateStorageCapabilitiesCmd()
+                cmd.id = pool.id
+                response = self.apiclient.updateStorageCapabilities(cmd)
+                acceleration = getattr(response[0].storagecapabilities, "HARDWARE_ACCELERATION")
+                self.assertNotEqual(
+                    acceleration,
+                    None,
+                    "Check Updated storage pool capabilities"
+                )
+
+    def runner(self, provisioning_type):
+        self.services["disk_offering"]['provisioningtype'] = provisioning_type
+        self.services["small"]['size'] = "1"
+        disk_offering = DiskOffering.create(
+            self.apiclient,
+            self.services["disk_offering"],
+            custom=True,
+        )
+        self.cleanup.append(disk_offering)
+
+        self.debug("Created Disk offering with ID: %s" % disk_offering.id)
+
+        self.services["service_offerings"]["small"]["provisioningtype"] = provisioning_type
+        small_offering = ServiceOffering.create(
+            self.apiclient,
+            self.services["service_offerings"]["small"]
+        )
+
+        self.cleanup.append(small_offering)
+
+        self.debug("Created service offering with ID: %s" % small_offering.id)
+
+        virtual_machine = VirtualMachine.create(
+            self.apiclient,
+            self.services["small"],
+            accountid=self.account.name,
+            domainid=self.account.domainid,
+            serviceofferingid=small_offering.id,
+            diskofferingid=disk_offering.id,
+            mode=self.services["mode"]
+        )
+
+        self.debug("Created virtual machine with ID: %s" % virtual_machine.id)
+
+        volumes = Volume.list(self.apiclient, virtualMachineId=virtual_machine.id, listAll='true')
+
+        for volume in volumes:
+            if volume["type"] == "DATADISK":
+                VirtualMachine.detach_volume(virtual_machine, self.apiclient, volume)
+                currentVolume = Volume({})
+                currentVolume.id = volume.id
+                Volume.resize(currentVolume, self.apiclient, size='2')
+                VirtualMachine.attach_volume(virtual_machine, self.apiclient, volume)
diff --git a/tools/apidoc/gen_toc.py b/tools/apidoc/gen_toc.py
index 9107b7c..6d7841e 100644
--- a/tools/apidoc/gen_toc.py
+++ b/tools/apidoc/gen_toc.py
@@ -95,7 +95,7 @@ known_categories = {
     'StorageMaintenance': 'Storage Pool',
     'StoragePool': 'Storage Pool',
     'StorageProvider': 'Storage Pool',
-    'syncStoragePool': 'Storage Pool',
+    'updateStorageCapabilities' : 'Storage Pool',
     'SecurityGroup': 'Security Group',
     'SSH': 'SSH',
     'register': 'Registration',
diff --git a/tools/marvin/marvin/lib/base.py b/tools/marvin/marvin/lib/base.py
index 916af64..37ef928 100755
--- a/tools/marvin/marvin/lib/base.py
+++ b/tools/marvin/marvin/lib/base.py
@@ -635,6 +635,9 @@ class VirtualMachine:
         if rootdiskcontroller:
             cmd.details[0]["rootDiskController"] = rootdiskcontroller
 
+        if "size" in services:
+            cmd.size = services["size"]
+
         if group:
             cmd.group = group
 
@@ -2296,6 +2299,9 @@ class ServiceOffering:
         if "offerha" in services:
             cmd.offerha = services["offerha"]
 
+        if "provisioningtype" in services:
+            cmd.provisioningtype = services["provisioningtype"]
+
         if "dynamicscalingenabled" in services:
             cmd.dynamicscalingenabled = services["dynamicscalingenabled"]
 
diff --git a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualMachineMO.java b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualMachineMO.java
index 0d01931..04fe65d 100644
--- a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualMachineMO.java
+++ b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualMachineMO.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.cloud.storage.Storage;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.vmware.vim25.InvalidStateFaultMsg;
 import com.vmware.vim25.RuntimeFaultFaultMsg;
@@ -777,7 +778,7 @@ public class VirtualMachineMO extends BaseMO {
         return false;
     }
 
-    public boolean createFullClone(String cloneName, ManagedObjectReference morFolder, ManagedObjectReference morResourcePool, ManagedObjectReference morDs)
+    public boolean createFullClone(String cloneName, ManagedObjectReference morFolder, ManagedObjectReference morResourcePool, ManagedObjectReference morDs, Storage.ProvisioningType diskProvisioningType)
             throws Exception {
 
         VirtualMachineCloneSpec cloneSpec = new VirtualMachineCloneSpec();
@@ -788,6 +789,9 @@ public class VirtualMachineMO extends BaseMO {
 
         relocSpec.setDatastore(morDs);
         relocSpec.setPool(morResourcePool);
+
+        setDiskProvisioningType(relocSpec, morDs, diskProvisioningType);
+
         ManagedObjectReference morTask = _context.getService().cloneVMTask(_mor, morFolder, cloneName, cloneSpec);
 
         boolean result = _context.getVimClient().waitForTask(morTask);
@@ -801,6 +805,33 @@ public class VirtualMachineMO extends BaseMO {
         return false;
     }
 
+    private void setDiskProvisioningType(VirtualMachineRelocateSpec relocSpec, ManagedObjectReference morDs,
+                                         Storage.ProvisioningType diskProvisioningType) throws Exception {
+        if (diskProvisioningType == null){
+            return;
+        }
+        List<VirtualMachineRelocateSpecDiskLocator> relocateDisks = relocSpec.getDisk();
+        List<VirtualDisk> disks = this.getVirtualDisks();
+        for (VirtualDisk disk: disks){
+            VirtualDiskFlatVer2BackingInfo backing = (VirtualDiskFlatVer2BackingInfo) disk.getBacking();
+            if (diskProvisioningType == Storage.ProvisioningType.FAT) {
+                backing.setThinProvisioned(false);
+                backing.setEagerlyScrub(true);
+            } else if (diskProvisioningType == Storage.ProvisioningType.THIN) {
+                backing.setThinProvisioned(true);
+            } else if (diskProvisioningType == Storage.ProvisioningType.SPARSE) {
+                backing.setThinProvisioned(false);
+                backing.setEagerlyScrub(false);
+            }
+
+            VirtualMachineRelocateSpecDiskLocator diskLocator = new VirtualMachineRelocateSpecDiskLocator();
+            diskLocator.setDiskId(disk.getKey());
+            diskLocator.setDiskBackingInfo(backing);
+            diskLocator.setDatastore(morDs);
+            relocateDisks.add(diskLocator);
+        }
+    }
+
     public boolean createLinkedClone(String cloneName, ManagedObjectReference morBaseSnapshot, ManagedObjectReference morFolder, ManagedObjectReference morResourcePool,
             ManagedObjectReference morDs) throws Exception {
 
diff --git a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualStorageObjectManagerMO.java b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualStorageObjectManagerMO.java
index d5f4eb3..c4c93a0 100644
--- a/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualStorageObjectManagerMO.java
+++ b/vmware-base/src/main/java/com/cloud/hypervisor/vmware/mo/VirtualStorageObjectManagerMO.java
@@ -16,10 +16,11 @@
 // under the License.
 package com.cloud.hypervisor.vmware.mo;
 
+import com.cloud.storage.Storage;
 import com.vmware.vim25.ID;
 import com.vmware.vim25.TaskInfo;
 import com.vmware.vim25.VStorageObject;
-import com.vmware.vim25.VirtualDiskType;
+import com.vmware.vim25.BaseConfigInfoDiskFileBackingInfoProvisioningType;
 import com.vmware.vim25.VslmCreateSpec;
 import com.vmware.vim25.VslmCreateSpecDiskFileBackingSpec;
 import org.apache.log4j.Logger;
@@ -60,12 +61,21 @@ public class VirtualStorageObjectManagerMO extends BaseMO {
         return _context.getService().retrieveVStorageObject(_mor, id, morDS);
     }
 
-    public VStorageObject createDisk(ManagedObjectReference morDS, VirtualDiskType diskType, long currentSizeInBytes, String datastoreFilepath, String filename) throws Exception {
+    public VStorageObject createDisk(ManagedObjectReference morDS, Storage.ProvisioningType diskProvisioningType, long currentSizeInBytes, String datastoreFilepath, String filename) throws Exception {
         long currentSizeInMB = currentSizeInBytes/(1024*1024);
 
         VslmCreateSpecDiskFileBackingSpec diskFileBackingSpec = new VslmCreateSpecDiskFileBackingSpec();
         diskFileBackingSpec.setDatastore(morDS);
-        diskFileBackingSpec.setProvisioningType(diskType.value());
+        if (diskProvisioningType != null) {
+            if (diskProvisioningType == Storage.ProvisioningType.FAT) {
+                diskFileBackingSpec.setProvisioningType(BaseConfigInfoDiskFileBackingInfoProvisioningType.EAGER_ZEROED_THICK.value());
+            } else if (diskProvisioningType == Storage.ProvisioningType.THIN) {
+                diskFileBackingSpec.setProvisioningType(BaseConfigInfoDiskFileBackingInfoProvisioningType.THIN.value());
+            } else if (diskProvisioningType == Storage.ProvisioningType.SPARSE) {
+                diskFileBackingSpec.setProvisioningType(BaseConfigInfoDiskFileBackingInfoProvisioningType.LAZY_ZEROED_THICK.value());
+            }
+        }
+
         // path should be just the folder name. For example, instead of '[datastore1] folder1/filename.vmdk' you would just do 'folder1'.
         // path is introduced from 6.7. In 6.5 disk will be created in the default folder "fcd"
         diskFileBackingSpec.setPath(null);