You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ro...@apache.org on 2019/07/25 08:43:16 UTC

[cloudstack] branch master updated: storage: Datera storage plugin (#3470)

This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/master by this push:
     new e3d70b7  storage: Datera storage plugin (#3470)
e3d70b7 is described below

commit e3d70b7dcc1a319fac7bb30d6e8e5e79e236ad67
Author: manojkverma <31...@users.noreply.github.com>
AuthorDate: Thu Jul 25 01:43:04 2019 -0700

    storage: Datera storage plugin (#3470)
    
    Features:
    
    Zone-wide and cluster-wide primary storage support
    VM template caching automatically on Datera, the subsequent VMs can be created instantaneously by fast cloning the root volume.
    Rapid storage-native snapshot
    Multiple managed primary storages can be created with a single Datera cluster to provide better management of
    Total provisioned capacity
    Default storage QoS values
    Replica size ( 1 to 5 )
    IP pool assignment for iSCSI target
    Volume Placement ( hybrid, single_flash, all_flash )
    Volume snapshot to VM template
    Volume to VM template
    Volume size increase using service policy
    Volume QoS change using service policy
    Enabled KVM support
    New Datera app_instance name format to include ACS volume name
    VM live migration
---
 client/pom.xml                                     |    5 +
 plugins/pom.xml                                    |    1 +
 plugins/storage/volume/datera/pom.xml              |   66 +
 .../driver/DateraPrimaryDataStoreDriver.java       | 1828 ++++++++++++++++++++
 .../lifecycle/DateraPrimaryDataStoreLifeCycle.java |  420 +++++
 .../datastore/provider/DateraHostListener.java     |  342 ++++
 .../provider/DateraPrimaryDataStoreProvider.java   |   83 +
 .../storage/datastore/util/DateraObject.java       |  469 +++++
 .../storage/datastore/util/DateraUtil.java         | 1028 +++++++++++
 .../storage-volume-datera/module.properties        |   18 +
 .../spring-storage-volume-datera-context.xml       |   33 +
 test/integration/plugins/datera/TestVolumes.py     | 1127 ++++++++++++
 12 files changed, 5420 insertions(+)

diff --git a/client/pom.xml b/client/pom.xml
index 5c58635..d486458 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -85,6 +85,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cloudstack</groupId>
+            <artifactId>cloud-plugin-storage-volume-datera</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cloudstack</groupId>
             <artifactId>cloud-server</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/plugins/pom.xml b/plugins/pom.xml
index ff84955..cea6899 100755
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -112,6 +112,7 @@
         <module>storage/image/sample</module>
         <module>storage/image/swift</module>
         <module>storage/volume/cloudbyte</module>
+        <module>storage/volume/datera</module>
         <module>storage/volume/default</module>
         <module>storage/volume/nexenta</module>
         <module>storage/volume/sample</module>
diff --git a/plugins/storage/volume/datera/pom.xml b/plugins/storage/volume/datera/pom.xml
new file mode 100644
index 0000000..ccd1f0c
--- /dev/null
+++ b/plugins/storage/volume/datera/pom.xml
@@ -0,0 +1,66 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+  license agreements. See the NOTICE file distributed with this work for additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  you under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>cloud-plugin-storage-volume-datera</artifactId>
+  <name>Apache CloudStack Plugin - Storage Volume Datera Provider</name>
+  <parent>
+    <groupId>org.apache.cloudstack</groupId>
+    <artifactId>cloudstack-plugins</artifactId>
+    <version>4.13.0.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-plugin-storage-volume-default</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-engine-storage-volume</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.aspectj</groupId>
+      <artifactId>aspectjtools</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>1.17.0-rc</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <skipTests>true</skipTests>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/driver/DateraPrimaryDataStoreDriver.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/driver/DateraPrimaryDataStoreDriver.java
new file mode 100644
index 0000000..9a9a165
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/driver/DateraPrimaryDataStoreDriver.java
@@ -0,0 +1,1828 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.driver;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.to.DataObjectType;
+import com.cloud.agent.api.to.DataStoreTO;
+import com.cloud.agent.api.to.DataTO;
+import com.cloud.agent.api.to.DiskTO;
+import com.cloud.dc.ClusterDetailsDao;
+import com.cloud.dc.ClusterDetailsVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.ResizeVolumePayload;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.Storage;
+import com.cloud.storage.StoragePool;
+import com.cloud.storage.VMTemplateStoragePoolVO;
+import com.cloud.storage.VolumeDetailVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.SnapshotDetailsDao;
+import com.cloud.storage.dao.SnapshotDetailsVO;
+import com.cloud.storage.dao.VMTemplatePoolDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.dao.VolumeDetailsDao;
+import com.cloud.utils.StringUtils;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.CreateCmdResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreDriver;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
+import org.apache.cloudstack.storage.command.CommandResult;
+import org.apache.cloudstack.storage.command.CreateObjectAnswer;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.datastore.util.DateraObject;
+import org.apache.cloudstack.storage.datastore.util.DateraUtil;
+import org.apache.cloudstack.storage.to.SnapshotObjectTO;
+import org.apache.log4j.Logger;
+
+import javax.inject.Inject;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DateraPrimaryDataStoreDriver implements PrimaryDataStoreDriver {
+    private static final Logger s_logger = Logger.getLogger(DateraPrimaryDataStoreDriver.class);
+    private static final int s_lockTimeInSeconds = 300;
+    private static final int s_lowestHypervisorSnapshotReserve = 10;
+
+    @Inject
+    private ClusterDao _clusterDao;
+    @Inject
+    private ClusterDetailsDao _clusterDetailsDao;
+    @Inject
+    private HostDao _hostDao;
+    @Inject
+    private SnapshotDao _snapshotDao;
+    @Inject
+    private SnapshotDetailsDao _snapshotDetailsDao;
+    @Inject
+    private PrimaryDataStoreDao _storagePoolDao;
+    @Inject
+    private StoragePoolDetailsDao _storagePoolDetailsDao;
+    @Inject
+    private VolumeDao _volumeDao;
+    @Inject
+    private VMTemplatePoolDao tmpltPoolDao;
+    @Inject
+    private PrimaryDataStoreDao storagePoolDao;
+    @Inject
+    private VolumeDetailsDao volumeDetailsDao;
+    @Inject
+    private SnapshotDetailsDao snapshotDetailsDao;
+    @Inject
+    private VolumeDataFactory volumeDataFactory;
+
+    /**
+     * Returns a map which lists the capabilities that this storage device can
+     * offer. Currently supported STORAGE_SYSTEM_SNAPSHOT: Has the ability to create
+     * native snapshots CAN_CREATE_VOLUME_FROM_SNAPSHOT: Can create new volumes from
+     * native snapshots. CAN_CREATE_VOLUME_FROM_VOLUME: Device can clone volumes.
+     * This is used for template caching.
+     * @return a Map<String,String> which determines the capabilities of the driver
+     */
+    @Override
+    public Map<String, String> getCapabilities() {
+        Map<String, String> mapCapabilities = new HashMap<>();
+
+        mapCapabilities.put(DataStoreCapabilities.STORAGE_SYSTEM_SNAPSHOT.toString(), Boolean.TRUE.toString());
+        mapCapabilities.put(DataStoreCapabilities.CAN_CREATE_VOLUME_FROM_SNAPSHOT.toString(), Boolean.TRUE.toString());
+        mapCapabilities.put(DataStoreCapabilities.CAN_CREATE_VOLUME_FROM_VOLUME.toString(), Boolean.TRUE.toString());
+        mapCapabilities.put(DataStoreCapabilities.CAN_REVERT_VOLUME_TO_SNAPSHOT.toString(), Boolean.TRUE.toString());
+
+        return mapCapabilities;
+    }
+
+    @Override
+    public DataTO getTO(DataObject data) {
+        return null;
+    }
+
+    @Override
+    public DataStoreTO getStoreTO(DataStore store) {
+        return null;
+    }
+
+    @Override
+    public ChapInfo getChapInfo(DataObject dataObject) {
+        // We don't support auth yet
+        return null;
+    }
+
+    /**
+     * Fetches an App Instance from Datera, throws exception if it doesn't find it
+     * @param conn            Datera Connection
+     * @param appInstanceName Name of the Aplication Instance
+     * @return application instance
+     */
+    public DateraObject.AppInstance getDateraAppInstance(DateraObject.DateraConnection conn, String appInstanceName) {
+
+        DateraObject.AppInstance appInstance = null;
+        try {
+            appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+        } catch (DateraObject.DateraError dateraError) {
+            s_logger.warn("Error getting appInstance " + appInstanceName, dateraError);
+            throw new CloudRuntimeException(dateraError.getMessage());
+        }
+
+        if (appInstance == null) {
+            throw new CloudRuntimeException("App instance not found " + appInstanceName);
+        }
+
+        return appInstance;
+    }
+
+    /**
+     * Given a {@code dataObject} this function makes sure that the {@code host} has
+     * access to it. All hosts which are in the same cluster are added to an
+     * initiator group and that group is assigned to the appInstance. If an
+     * initiator group does not exist, it is created. If the host does not have an
+     * initiator registered on dataera, that is created and added to the initiator
+     * group
+     * @param dataObject The volume that needs to be accessed
+     * @param host       The host which needs to access the volume
+     * @param dataStore  Identifies which primary storage the volume resides in
+     * @return True if access is granted. False otherwise
+     */
+    @Override
+    public boolean grantAccess(DataObject dataObject, Host host, DataStore dataStore) {
+
+        s_logger.debug("grantAccess() called");
+
+        Preconditions.checkArgument(dataObject != null, "'dataObject' should not be 'null'");
+        Preconditions.checkArgument(host != null, "'host' should not be 'null'");
+        Preconditions.checkArgument(dataStore != null, "'dataStore' should not be 'null'");
+
+        long storagePoolId = dataStore.getId();
+
+        DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+        String appInstanceName = getAppInstanceName(dataObject);
+        DateraObject.AppInstance appInstance = getDateraAppInstance(conn, appInstanceName);
+
+        Preconditions.checkArgument(appInstance != null);
+
+        long clusterId = host.getClusterId();
+
+        ClusterVO cluster = _clusterDao.findById(clusterId);
+
+        GlobalLock lock = GlobalLock.getInternLock(cluster.getUuid());
+
+        if (!lock.lock(s_lockTimeInSeconds)) {
+            s_logger.debug("Couldn't lock the DB (in grantAccess) on the following string: " + cluster.getUuid());
+        }
+
+        try {
+
+            DateraObject.InitiatorGroup initiatorGroup = null;
+            String initiatorGroupKey = DateraUtil.getInitiatorGroupKey(storagePoolId);
+
+            List<HostVO> hosts = _hostDao.findByClusterId(clusterId);
+
+            if (!DateraUtil.hostsSupport_iScsi(hosts)) {
+                s_logger.debug("hostsSupport_iScsi() :Host does NOT support iscsci");
+                return false;
+            }
+
+            // We don't have the initiator group, create one
+            String initiatorGroupName = DateraUtil.INITIATOR_GROUP_PREFIX + "-" + cluster.getUuid();
+            s_logger.debug("Will use initiator group " + String.valueOf(initiatorGroupName));
+
+            initiatorGroup = DateraUtil.getInitiatorGroup(conn, initiatorGroupName);
+
+            if (initiatorGroup == null) {
+                s_logger.debug("create initiator group " + String.valueOf(initiatorGroupName));
+                initiatorGroup = DateraUtil.createInitiatorGroup(conn, initiatorGroupName);
+                // Save it to the DB
+                ClusterDetailsVO clusterDetail = new ClusterDetailsVO(clusterId, initiatorGroupKey, initiatorGroupName);
+                _clusterDetailsDao.persist(clusterDetail);
+
+            } else {
+                initiatorGroup = DateraUtil.getInitiatorGroup(conn, initiatorGroupName);
+            }
+
+            Preconditions.checkNotNull(initiatorGroup, "initiatorGroup should not be Null");
+
+            // We create an initiator for every host in this cluster and add it to the
+            // initator group
+            addClusterHostsToInitiatorGroup(conn, clusterId, initiatorGroupName);
+
+            // assgin the initiatorgroup to appInstance
+
+            if (!isInitiatorGroupAssignedToAppInstance(conn, initiatorGroup, appInstance)) {
+                DateraUtil.assignGroupToAppInstance(conn, initiatorGroupName, appInstanceName);
+                int retries = DateraUtil.DEFAULT_RETRIES;
+                while (!isInitiatorGroupAssignedToAppInstance(conn, initiatorGroup, appInstance) && retries > 0) {
+                    Thread.sleep(DateraUtil.POLL_TIMEOUT_MS);
+                    retries--;
+                }
+
+                Preconditions.checkArgument(isInitiatorGroupAssignedToAppInstance(conn, initiatorGroup, appInstance),
+                        "Initgroup is not assigned to appinstance");
+                // FIXME: Sleep anyways
+                s_logger.debug("sleep " + String.valueOf(DateraUtil.POLL_TIMEOUT_MS) + " msec for ACL to be applied");
+
+                Thread.sleep(DateraUtil.POLL_TIMEOUT_MS); // ms
+                s_logger.debug(
+                        "Initiator group " + String.valueOf(initiatorGroupName) + " is assigned to " + appInstanceName);
+
+            }
+
+            return true;
+        } catch (DateraObject.DateraError | UnsupportedEncodingException | InterruptedException dateraError) {
+            s_logger.warn(dateraError.getMessage(), dateraError);
+            throw new CloudRuntimeException("Unable to grant access to volume " + dateraError.getMessage());
+        } finally {
+            lock.unlock();
+            lock.releaseRef();
+        }
+    }
+
+    private void addClusterHostsToInitiatorGroup(DateraObject.DateraConnection conn, long clusterId,
+            String initiatorGroupName) throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        List<HostVO> clusterHosts = _hostDao.findByClusterId(clusterId);
+        DateraObject.InitiatorGroup initiatorGroup = DateraUtil.getInitiatorGroup(conn, initiatorGroupName);
+
+        for (HostVO host : clusterHosts) {
+
+            // check if we have an initiator for the host
+            String iqn = host.getStorageUrl();
+
+            DateraObject.Initiator initiator = DateraUtil.getInitiator(conn, iqn);
+            String initiatorName = "";
+            // initiator can not be found, create it
+            if (initiator == null) {
+
+                initiatorName = DateraUtil.INITIATOR_PREFIX + "-" + host.getUuid();
+                initiator = DateraUtil.createInitiator(conn, initiatorName, iqn);
+                s_logger.debug("Initiator " + initiatorName + " with " + iqn + "added ");
+
+            }
+            Preconditions.checkNotNull(initiator);
+
+            if (!DateraUtil.isInitiatorPresentInGroup(initiator, initiatorGroup)) {
+                s_logger.debug("Add " + initiatorName + " to " + initiatorGroupName);
+                DateraUtil.addInitiatorToGroup(conn, initiator.getPath(), initiatorGroupName);
+            }
+        }
+    }
+
+    /**
+     * Checks if an initiator group is assigned to an appInstance
+     * @param conn           Datera connection
+     * @param initiatorGroup Initiator group to check
+     * @param appInstance    App Instance
+     * @return True if initiator group is assigned to app instnace, false otherwise
+     * @throws DateraObject.DateraError
+     */
+
+    private boolean isInitiatorGroupAssignedToAppInstance(DateraObject.DateraConnection conn,
+            DateraObject.InitiatorGroup initiatorGroup, DateraObject.AppInstance appInstance)
+            throws DateraObject.DateraError {
+
+        Map<String, DateraObject.InitiatorGroup> assignedInitiatorGroups = DateraUtil
+                .getAppInstanceInitiatorGroups(conn, appInstance.getName());
+
+        Preconditions.checkNotNull(assignedInitiatorGroups);
+
+        for (DateraObject.InitiatorGroup ig : assignedInitiatorGroups.values()) {
+            if (initiatorGroup.getName().equals(ig.getName())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Removes access of the initiator group to which {@code host} belongs from the
+     * appInstance given by {@code dataObject}
+     * @param dataObject Datera volume
+     * @param host       the host which is currently having access to the volume
+     * @param dataStore  The primary store to which volume belongs
+     */
+    @Override
+    public void revokeAccess(DataObject dataObject, Host host, DataStore dataStore) {
+        s_logger.debug("revokeAccess() called");
+
+        Preconditions.checkArgument(dataObject != null, "'dataObject' should not be 'null'");
+        Preconditions.checkArgument(host != null, "'host' should not be 'null'");
+        Preconditions.checkArgument(dataStore != null, "'dataStore' should not be 'null'");
+
+        String appInstanceName = getAppInstanceName(dataObject);
+        long clusterId = host.getClusterId();
+        long storagePoolId = dataStore.getId();
+
+        ClusterVO cluster = _clusterDao.findById(clusterId);
+
+        GlobalLock lock = GlobalLock.getInternLock(cluster.getUuid());
+
+        if (!lock.lock(s_lockTimeInSeconds)) {
+            s_logger.debug("Couldn't lock the DB (in revokeAccess) on the following string: " + cluster.getUuid());
+        }
+
+        try {
+
+            String initiatorGroupName = DateraUtil.INITIATOR_GROUP_PREFIX + "-" + cluster.getUuid();
+
+            DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+            DateraObject.AppInstance appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+            DateraObject.InitiatorGroup initiatorGroup = DateraUtil.getInitiatorGroup(conn, initiatorGroupName);
+
+            if (initiatorGroup != null && appInstance != null) {
+
+                DateraUtil.removeGroupFromAppInstance(conn, initiatorGroupName, appInstanceName);
+                int retries = DateraUtil.DEFAULT_RETRIES;
+                while (isInitiatorGroupAssignedToAppInstance(conn, initiatorGroup, appInstance) && retries > 0) {
+                    Thread.sleep(DateraUtil.POLL_TIMEOUT_MS);
+                    retries--;
+                }
+            }
+
+        } catch (DateraObject.DateraError | UnsupportedEncodingException | InterruptedException dateraError) {
+            String errMesg = "Error revoking access for Volume : " + dataObject.getId();
+            s_logger.warn(errMesg, dateraError);
+            throw new CloudRuntimeException(errMesg);
+        } finally {
+            lock.unlock();
+            lock.releaseRef();
+        }
+    }
+
+    /**
+     * Returns the size of template on this primary storage. If we already have a
+     * template on this storage, we return 0
+     * @param templateInfo Information about the template
+     * @param storagePool  The pool where we want to store the template
+     * @return Size in bytes
+     */
+    @Override
+    public long getBytesRequiredForTemplate(TemplateInfo templateInfo, StoragePool storagePool) {
+
+        List<VMTemplateStoragePoolVO> lstTemplatePoolRefs = tmpltPoolDao.listByPoolId(storagePool.getId());
+
+        if (lstTemplatePoolRefs != null) {
+            for (VMTemplateStoragePoolVO templatePoolRef : lstTemplatePoolRefs) {
+                if (templatePoolRef.getTemplateId() == templateInfo.getId()) {
+                    // This indicates that we already have this template stored on this primary
+                    // storage, so
+                    // we do not require additional space.
+                    return 0;
+                }
+            }
+        }
+
+        // This indicates that we do not have a copy of this template on this primary
+        // storage, so
+        // we need to take it into consideration from a space standpoint (ex. when a new
+        // VM is spun
+        // up and wants to use this particular template for its root disk).
+        return getDataObjectSizeIncludingHypervisorSnapshotReserve(templateInfo, storagePool);
+    }
+
+    /**
+     * Returns Datera appInstanceName
+     * @param dataObject volume or template
+     * @return Derived Datera appInstanceName based on dataObject, Eg.
+     *         CS-V-ROOT-123-6db58e3f-14c4-45ac-95e9-60e3a00ce7d0
+     */
+    private String getAppInstanceName(DataObject dataObject) {
+
+        ArrayList<String> name = new ArrayList<>();
+
+        name.add(DateraUtil.APPINSTANCE_PREFIX); // CS
+
+        String dataObjectTypeString = dataObject.getType().name(); // TEMPLATE, VOLUME, SNAPSHOT
+        String dataObjectTypeBrief;
+        dataObjectTypeBrief = org.apache.commons.lang.StringUtils.substring(dataObjectTypeString, 0, 1);
+        name.add(dataObjectTypeBrief); // T, V
+
+        switch (dataObject.getType()) {
+        case TEMPLATE:
+            TemplateInfo templateInfo = (TemplateInfo) dataObject;
+
+            name.add(dataObject.getUuid()); // 6db58e3f-14c4-45ac-95e9-60e3a00ce7d0
+
+            // For cached templates, we will also add the storage pool ID
+            name.add(String.valueOf(dataObject.getDataStore().getId()));
+            break;
+
+        case VOLUME:
+            VolumeInfo volumeInfo = (VolumeInfo) dataObject;
+            String volumeName = volumeInfo.getName();
+            name.add(String.valueOf(volumeName));
+            name.add(dataObject.getUuid()); // 6db58e3f-14c4-45ac-95e9-60e3a00ce7d0
+
+            VolumeVO volumeVo = _volumeDao.findById(dataObject.getId());
+            s_logger.debug("volumeName : " + volumeName);
+            break;
+
+        case SNAPSHOT:
+            name.add(dataObject.getUuid()); // 6db58e3f-14c4-45ac-95e9-60e3a00ce7d0
+
+        }
+
+        String appInstanceName = StringUtils.join("-", name.toArray());
+        return org.apache.commons.lang.StringUtils.substring(appInstanceName, 0, DateraUtil.APPINSTANCE_MAX_LENTH);
+    }
+
+    // Not being used right now as Datera doesn't support min IOPS
+    private long getDefaultMinIops(long storagePoolId) {
+        StoragePoolDetailVO storagePoolDetail = _storagePoolDetailsDao.findDetail(storagePoolId,
+                DateraUtil.CLUSTER_DEFAULT_MIN_IOPS);
+
+        String clusterDefaultMinIops = storagePoolDetail.getValue();
+
+        return Long.parseLong(clusterDefaultMinIops);
+    }
+
+    /**
+     * If user doesn't specify the IOPS, use this IOPS
+     * @param storagePoolId the primary storage
+     * @return default max IOPS for this storage configured when the storage is
+     *         added
+     */
+    private long getDefaultMaxIops(long storagePoolId) {
+        StoragePoolDetailVO storagePoolDetail = _storagePoolDetailsDao.findDetail(storagePoolId,
+                DateraUtil.CLUSTER_DEFAULT_MAX_IOPS);
+
+        String clusterDefaultMaxIops = storagePoolDetail.getValue();
+
+        return Long.parseLong(clusterDefaultMaxIops);
+    }
+
+    /**
+     * Return the default number of replicas to use (configured at storage addition
+     * time)
+     * @param storagePoolId the primary storage
+     * @return the number of replicas to use
+     */
+    private int getNumReplicas(long storagePoolId) {
+        StoragePoolDetailVO storagePoolDetail = _storagePoolDetailsDao.findDetail(storagePoolId,
+                DateraUtil.NUM_REPLICAS);
+
+        String clusterDefaultReplicas = storagePoolDetail.getValue();
+
+        return Integer.parseInt(clusterDefaultReplicas);
+
+    }
+
+    /**
+     * Return the default volume placement to use (configured at storage addition
+     * time)
+     * @param storagePoolId the primary storage
+     * @return volume placement string
+     */
+    private String getVolPlacement(long storagePoolId) {
+        StoragePoolDetailVO storagePoolDetail = _storagePoolDetailsDao.findDetail(storagePoolId,
+                DateraUtil.VOL_PLACEMENT);
+
+        String clusterDefaultVolPlacement = storagePoolDetail.getValue();
+
+        return clusterDefaultVolPlacement;
+
+    }
+
+    /**
+     * Return the default IP pool name to use (configured at storage addition time)
+     * @param storagePoolId the primary storage
+     * @return IP pool name
+     */
+    private String getIpPool(long storagePoolId) {
+        String ipPool = DateraUtil.DEFAULT_IP_POOL;
+        StoragePoolDetailVO storagePoolDetail = _storagePoolDetailsDao.findDetail(storagePoolId, DateraUtil.IP_POOL);
+        if (storagePoolDetail != null) {
+            ipPool = storagePoolDetail.getValue();
+        }
+        s_logger.debug("ipPool: " + ipPool);
+        return ipPool;
+
+    }
+
+    @Override
+    public long getUsedBytes(StoragePool storagePool) {
+        return getUsedBytes(storagePool, Long.MIN_VALUE);
+    }
+
+    /**
+     * Get the total space used by all the entities on the storage.
+     * Total space = volume space + snapshot space + template space
+     * @param storagePool      Primary storage
+     * @param volumeIdToIgnore Ignore this volume (used when we delete a volume and
+     *                         want to update the space)
+     * @return size in bytes
+     */
+    private long getUsedBytes(StoragePool storagePool, long volumeIdToIgnore) {
+        long usedSpaceBytes = 0;
+
+        List<VolumeVO> lstVolumes = _volumeDao.findByPoolId(storagePool.getId(), null);
+
+        if (lstVolumes != null) {
+            for (VolumeVO volume : lstVolumes) {
+                if (volume.getId() == volumeIdToIgnore) {
+                    continue;
+                }
+
+                VolumeDetailVO volumeDetail = volumeDetailsDao.findDetail(volume.getId(), DateraUtil.VOLUME_SIZE);
+
+                if (volumeDetail != null && volumeDetail.getValue() != null) {
+                    long volumeSizeGib = Long.parseLong(volumeDetail.getValue());
+                    long volumeSizeBytes = DateraUtil.gibToBytes((int) (volumeSizeGib));
+                    usedSpaceBytes += volumeSizeBytes;
+                } else {
+                    DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePool.getId(),
+                            _storagePoolDetailsDao);
+                    try {
+
+                        String appInstanceName = getAppInstanceName(volumeDataFactory.getVolume(volume.getId()));
+                        DateraObject.AppInstance appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+                        if (appInstance != null) {
+                            usedSpaceBytes += DateraUtil.gibToBytes(appInstance.getSize());
+                        }
+                    } catch (DateraObject.DateraError dateraError) {
+                        String errMesg = "Error getting used bytes for storage pool : " + storagePool.getId();
+                        s_logger.warn(errMesg, dateraError);
+                        throw new CloudRuntimeException(errMesg);
+                    }
+                }
+            }
+        }
+
+        List<SnapshotVO> lstSnapshots = _snapshotDao.listAll();
+
+        if (lstSnapshots != null) {
+            for (SnapshotVO snapshot : lstSnapshots) {
+                SnapshotDetailsVO snapshotDetails = _snapshotDetailsDao.findDetail(snapshot.getId(),
+                        DateraUtil.STORAGE_POOL_ID);
+
+                // if this snapshot belongs to the storagePool that was passed in
+                if (snapshotDetails != null && snapshotDetails.getValue() != null
+                        && Long.parseLong(snapshotDetails.getValue()) == storagePool.getId()) {
+                    snapshotDetails = _snapshotDetailsDao.findDetail(snapshot.getId(), DateraUtil.VOLUME_SIZE);
+
+                    if (snapshotDetails != null && snapshotDetails.getValue() != null) {
+                        long snapshotSize = Long.parseLong(snapshotDetails.getValue());
+
+                        usedSpaceBytes += snapshotSize;
+                    }
+                }
+            }
+        }
+
+        List<VMTemplateStoragePoolVO> lstTemplatePoolRefs = tmpltPoolDao.listByPoolId(storagePool.getId());
+
+        if (lstTemplatePoolRefs != null) {
+            for (VMTemplateStoragePoolVO templatePoolRef : lstTemplatePoolRefs) {
+                usedSpaceBytes += templatePoolRef.getTemplateSize();
+            }
+        }
+        s_logger.debug("usedSpaceBytes: " + String.valueOf(usedSpaceBytes));
+
+        return usedSpaceBytes;
+    }
+
+    /**
+     * Get total IOPS used by the storage array. Since Datera doesn't support min
+     * IOPS, return zero for now
+     * @param storagePool primary storage
+     * @return total IOPS used
+     */
+    @Override
+    public long getUsedIops(StoragePool storagePool) {
+        long usedIops = 0;
+        return usedIops;
+    }
+
+    /**
+     * Rreturns the size of the volume including the hypervisor snapshot reserve
+     * (HSR).
+     * @param dataObject Volume or a Template
+     * @param pool       primary storage where it resides
+     * @return size in bytes
+     */
+
+    @Override
+    public long getDataObjectSizeIncludingHypervisorSnapshotReserve(DataObject dataObject, StoragePool pool) {
+
+        long volumeSize = 0;
+
+        switch (dataObject.getType()) {
+        case VOLUME:
+
+            VolumeInfo volume = (VolumeInfo) dataObject;
+            volumeSize = volume.getSize();
+            Integer hypervisorSnapshotReserve = volume.getHypervisorSnapshotReserve();
+
+            if (hypervisorSnapshotReserve != null) {
+                hypervisorSnapshotReserve = Math.max(hypervisorSnapshotReserve, s_lowestHypervisorSnapshotReserve);
+                volumeSize += volumeSize * (hypervisorSnapshotReserve / 100f);
+            }
+            s_logger.debug("Volume size:" + String.valueOf(volumeSize));
+            break;
+
+        case TEMPLATE:
+
+            TemplateInfo templateInfo = (TemplateInfo) dataObject;
+            long templateSize = templateInfo.getSize() != null ? templateInfo.getSize() : 0;
+
+            if (templateInfo.getHypervisorType() == Hypervisor.HypervisorType.KVM) {
+                volumeSize = templateSize;
+            } else {
+                volumeSize = (long) (templateSize + templateSize * (s_lowestHypervisorSnapshotReserve / 100f));
+            }
+            s_logger.debug("Template volume size:" + String.valueOf(volumeSize));
+
+            break;
+        }
+        return volumeSize;
+    }
+
+    /**
+     * Deletes a volume from Datera. If we are using native snapshots, we first
+     * check if the volume is holding a native snapshot, if it does, then we don't
+     * delete it from Datera but instead mark it so that when the snapshot is
+     * deleted, we delete the volume
+     *
+     * @param volumeInfo    The volume which needs to be deleted
+     * @param storagePoolId Primary storage where volume resides
+     */
+    private void deleteVolume(VolumeInfo volumeInfo, long storagePoolId) {
+
+        DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+        Long volumeStoragePoolId = volumeInfo.getPoolId();
+        long volumeId = volumeInfo.getId();
+
+        if (volumeStoragePoolId == null) {
+            return; // this volume was never assigned to a storage pool, so no SAN volume should
+                    // exist for it
+        }
+
+        try {
+
+            // If there are native snapshots on this appInstance, we want to keep it on
+            // Datera
+            // but remove it from cloudstack
+            if (shouldDeleteVolume(volumeId, null)) {
+                DateraUtil.deleteAppInstance(conn, getAppInstanceName(volumeInfo));
+            }
+
+            volumeDetailsDao.removeDetails(volumeId);
+
+            StoragePoolVO storagePool = storagePoolDao.findById(storagePoolId);
+
+            long usedBytes = getUsedBytes(storagePool, volumeId);
+            storagePool.setUsedBytes(usedBytes < 0 ? 0 : usedBytes);
+            storagePoolDao.update(storagePoolId, storagePool);
+
+        } catch (UnsupportedEncodingException | DateraObject.DateraError e) {
+            String errMesg = "Error deleting app instance for Volume : " + volumeInfo.getId();
+            s_logger.warn(errMesg, e);
+            throw new CloudRuntimeException(errMesg);
+        }
+    }
+
+    /**
+     * given a {@code volumeInfo} and {@code storagePoolId}, creates an App instance
+     * on Datera. Updates the usedBytes count in the DB for this storage pool. A
+     * volume could be created in 3 ways
+     *
+     * 1) A fresh volume with no data: New volume created from Cloudstack
+     *
+     * 2) A volume created from a native snapshot. This is used when creating volume
+     * from snapshot and native snapshots are supported
+     *
+     * 3) A volume created by cloning from another volume: This is used when
+     * creating volume from template or volume from snapshot stored as another
+     * volume when native snapshots are not supported by the hypervisor
+     *
+     *
+     * @param volumeInfo    Info about the volume like size,QoS
+     * @param storagePoolId The pool to create the vo
+     * @return returns the IQN path which will be used by storage substem
+     *
+     */
+
+    private String createVolume(VolumeInfo volumeInfo, long storagePoolId) {
+        s_logger.debug("createVolume() called");
+
+        Preconditions.checkArgument(volumeInfo != null, "volumeInfo cannot be null");
+        Preconditions.checkArgument(storagePoolId > 0, "storagePoolId should be > 0");
+
+        verifySufficientBytesForStoragePool(volumeInfo, storagePoolId);
+
+        DateraObject.AppInstance appInstance;
+
+        DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+        long csSnapshotId = getCsIdForCloning(volumeInfo.getId(), "cloneOfSnapshot");
+        long csTemplateId = getCsIdForCloning(volumeInfo.getId(), "cloneOfTemplate");
+        s_logger.debug("csTemplateId is " + String.valueOf(csTemplateId));
+
+        try {
+
+            if (csSnapshotId > 0) {
+                // creating volume from snapshot. The snapshot could either be a native snapshot
+                // or another volume.
+                s_logger.debug("Creating volume from snapshot ");
+                appInstance = createDateraClone(conn, csSnapshotId, volumeInfo, storagePoolId, DataObjectType.SNAPSHOT);
+
+            } else if (csTemplateId > 0) {
+
+                // create volume from template. Invoked when creating new ROOT volume
+                s_logger.debug("Creating volume from template ");
+
+                appInstance = createDateraClone(conn, csTemplateId, volumeInfo, storagePoolId, DataObjectType.TEMPLATE);
+                String appInstanceName = appInstance.getName();
+
+                long volumeSize = getDataObjectSizeIncludingHypervisorSnapshotReserve(volumeInfo,
+                        storagePoolDao.findById(storagePoolId));
+
+                // expand the template
+                if (volumeSize > DateraUtil.gibToBytes(appInstance.getSize())) {
+
+                    // Expand the volume to include HSR depending on the volume's service offering
+                    DateraUtil.updateAppInstanceSize(conn, appInstanceName, DateraUtil.bytesToGib(volumeSize));
+
+                    // refresh appInstance
+                    appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+
+                    Preconditions.checkNotNull(appInstance);
+                    // update IOPS
+                    if ((volumeInfo.getMaxIops() != null) && (volumeInfo.getMaxIops() != appInstance.getTotalIops())) {
+                        int newIops = Ints.checkedCast(volumeInfo.getMaxIops());
+                        DateraUtil.updateAppInstanceIops(conn, appInstanceName, newIops);
+                    }
+                    // refresh appInstance
+                    appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+                }
+
+            } else {
+                // Just create a standard volume
+                s_logger.debug("Creating a standard volume ");
+                appInstance = createDateraVolume(conn, volumeInfo, storagePoolId);
+            }
+        } catch (UnsupportedEncodingException | DateraObject.DateraError e) {
+            String errMesg = "Unable to create Volume Error: " + e.getMessage();
+            s_logger.warn(errMesg);
+            throw new CloudRuntimeException(errMesg, e);
+        }
+
+        if (appInstance == null) {
+            String errMesg = "appInstance returned null";
+            s_logger.warn(errMesg);
+            throw new CloudRuntimeException(errMesg);
+        }
+
+        Preconditions.checkNotNull(appInstance);
+        String iqn = appInstance.getIqn();
+        String iqnPath = DateraUtil.generateIqnPath(iqn);
+
+        VolumeVO volumeVo = _volumeDao.findById(volumeInfo.getId());
+        s_logger.debug("volume ID : " + volumeInfo.getId());
+        s_logger.debug("volume uuid : " + volumeInfo.getUuid());
+
+        volumeVo.set_iScsiName(iqnPath);
+        volumeVo.setFolder(appInstance.getName());
+        volumeVo.setPoolType(Storage.StoragePoolType.IscsiLUN);
+        volumeVo.setPoolId(storagePoolId);
+
+        _volumeDao.update(volumeVo.getId(), volumeVo);
+
+        updateVolumeDetails(volumeVo.getId(), appInstance.getSize());
+
+        StoragePoolVO storagePool = _storagePoolDao.findById(storagePoolId);
+
+        long capacityBytes = storagePool.getCapacityBytes();
+        long usedBytes = getUsedBytes(storagePool);
+
+        storagePool.setUsedBytes(usedBytes > capacityBytes ? capacityBytes : usedBytes);
+
+        _storagePoolDao.update(storagePoolId, storagePool);
+
+        return appInstance.getIqn();
+    }
+
+    /**
+     * Helper function to create a Datera app instance. Throws an exception if
+     * unsuccessful
+     * @param conn          Datera connection
+     * @param volumeInfo    Volume information
+     * @param storagePoolId primary storage
+     * @return The AppInstance which is created
+     * @throws UnsupportedEncodingException
+     * @throws                              DateraObject.DateraError
+     */
+    private DateraObject.AppInstance createDateraVolume(DateraObject.DateraConnection conn, VolumeInfo volumeInfo,
+            long storagePoolId) throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        s_logger.debug("createDateraVolume() called");
+        DateraObject.AppInstance appInstance = null;
+        try {
+
+            int minIops = Ints.checkedCast(
+                    volumeInfo.getMinIops() != null ? volumeInfo.getMinIops() : getDefaultMinIops(storagePoolId));
+
+            // int minIops = Ints.checkedCast(volumeInfo.getMinIops());
+
+            int maxIops = Ints.checkedCast(
+                    volumeInfo.getMaxIops() != null ? volumeInfo.getMaxIops() : getDefaultMaxIops(storagePoolId));
+
+            // int maxIops = Ints.checkedCast(volumeInfo.getMaxIops());
+
+            if (maxIops <= 0) { // We don't care about min iops for now
+                maxIops = Ints.checkedCast(getDefaultMaxIops(storagePoolId));
+            }
+
+            int replicas = getNumReplicas(storagePoolId);
+            String volumePlacement = getVolPlacement(storagePoolId);
+            String ipPool = getIpPool(storagePoolId);
+
+            long volumeSizeBytes = getDataObjectSizeIncludingHypervisorSnapshotReserve(volumeInfo,
+                    _storagePoolDao.findById(storagePoolId));
+            int volumeSizeGib = DateraUtil.bytesToGib(volumeSizeBytes);
+            if (volumePlacement == null) {
+                appInstance = DateraUtil.createAppInstance(conn, getAppInstanceName(volumeInfo), volumeSizeGib, maxIops,
+                        replicas);
+            } else {
+                appInstance = DateraUtil.createAppInstance(conn, getAppInstanceName(volumeInfo), volumeSizeGib, maxIops,
+                        replicas, volumePlacement, ipPool);
+            }
+        } catch (Exception ex) {
+            s_logger.debug("createDateraVolume() failed");
+            s_logger.error(ex);
+        }
+        return appInstance;
+    }
+
+    /**
+     * This function creates a new AppInstance on datera by cloning. We can clone
+     * either from a volume snapshot (in case of native snapshots) or clone from
+     * another app Instance in case of templates or snapshots as volumes
+     *
+     * @param conn          Datera Connection
+     * @param dataObjectId  The ID of the clone, used to fetch details on how to
+     *                      clone
+     * @param volumeInfo    Information about the clone
+     * @param storagePoolId Primary store to create the clone on
+     * @param dataType      Type of the source (snapshot or template)
+     * @return The cloned AppInstance
+     */
+    private DateraObject.AppInstance createDateraClone(DateraObject.DateraConnection conn, long dataObjectId,
+            VolumeInfo volumeInfo, long storagePoolId, DataObjectType dataType)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        s_logger.debug("createDateraClone() called");
+
+        String clonedAppInstanceName = getAppInstanceName(volumeInfo);
+        String baseAppInstanceName = null;
+        DateraObject.AppInstance appInstance = null;
+        String ipPool = getIpPool(storagePoolId);
+
+        if (dataType == DataObjectType.SNAPSHOT) {
+            SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(dataObjectId, DateraUtil.SNAPSHOT_ID);
+
+            // Clone volume from a snapshot
+            if (snapshotDetails != null && snapshotDetails.getValue() != null) {
+                s_logger.debug("Clone volume from a snapshot");
+
+                appInstance = DateraUtil.cloneAppInstanceFromSnapshot(conn, clonedAppInstanceName,
+                        snapshotDetails.getValue(), ipPool);
+
+                if (volumeInfo.getMaxIops() != null) {
+
+                    int totalIops = Math.min(DateraUtil.MAX_IOPS, Ints.checkedCast(volumeInfo.getMaxIops()));
+                    DateraUtil.updateAppInstanceIops(conn, clonedAppInstanceName, totalIops);
+                    appInstance = DateraUtil.getAppInstance(conn, clonedAppInstanceName);
+                }
+
+                if (appInstance == null) {
+                    throw new CloudRuntimeException("Unable to create an app instance from snapshot "
+                            + volumeInfo.getId() + " type " + dataType);
+                }
+                return appInstance;
+
+            } else {
+
+                // Clone volume from an appInstance
+                s_logger.debug("Clone volume from an appInstance");
+
+                snapshotDetails = snapshotDetailsDao.findDetail(dataObjectId, DateraUtil.VOLUME_ID);
+                baseAppInstanceName = snapshotDetails.getValue();
+
+            }
+        } else if (dataType == DataObjectType.TEMPLATE) {
+            s_logger.debug("Clone volume from a template");
+
+            VMTemplateStoragePoolVO templatePoolRef = tmpltPoolDao.findByPoolTemplate(storagePoolId, dataObjectId);
+
+            if (templatePoolRef != null) {
+                baseAppInstanceName = templatePoolRef.getLocalDownloadPath();
+            }
+        }
+
+        if (baseAppInstanceName == null) {
+            throw new CloudRuntimeException(
+                    "Unable to find a base volume to clone " + volumeInfo.getId() + " type " + dataType);
+        }
+
+        // Clone the app Instance
+        appInstance = DateraUtil.cloneAppInstanceFromVolume(conn, clonedAppInstanceName, baseAppInstanceName, ipPool);
+
+        if (dataType == DataObjectType.TEMPLATE) {
+            // Only update volume parameters if clone from cached template
+            // Update maxIops
+            if (volumeInfo.getMaxIops() != null) {
+
+                int totalIops = Math.min(DateraUtil.MAX_IOPS, Ints.checkedCast(volumeInfo.getMaxIops()));
+
+                DateraUtil.updateAppInstanceIops(conn, clonedAppInstanceName, totalIops);
+                appInstance = DateraUtil.getAppInstance(conn, clonedAppInstanceName);
+            }
+            // Update placementMode
+            String newPlacementMode = getVolPlacement(storagePoolId);
+            if (newPlacementMode != null) {
+                DateraUtil.updateAppInstancePlacement(conn, clonedAppInstanceName, newPlacementMode);
+            }
+            appInstance = DateraUtil.getAppInstance(conn, clonedAppInstanceName);
+        }
+        if (appInstance == null) {
+            throw new CloudRuntimeException("Unable to create an app instance from snapshot or template "
+                    + volumeInfo.getId() + " type " + dataType);
+        }
+        s_logger.debug("Datera - Cloned " + baseAppInstanceName + " to " + clonedAppInstanceName);
+
+        return appInstance;
+    }
+
+    /**
+     * This function gets invoked when you want to do operations on a snapshot. The
+     * snapshot could be a native snapshot and you want to create a template out of
+     * it. Since snapshots don't have an IQN, we create a temp volume for this
+     * snapshot which will be used to carry out further operations. This function
+     * also handles deletion of temp volumes. A flag in the snapshot details table
+     * decides which action is performed.
+     *
+     * @param snapshotInfo  snapshot on Datera
+     * @param storagePoolId primary store ID
+     */
+    private void createTempVolume(SnapshotInfo snapshotInfo, long storagePoolId) {
+        s_logger.debug("createTempVolume() from snapshot called");
+        String ipPool = getIpPool(storagePoolId);
+        long csSnapshotId = snapshotInfo.getId();
+
+        SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.SNAPSHOT_ID);
+
+        if (snapshotDetails == null || snapshotDetails.getValue() == null) {
+            throw new CloudRuntimeException("'createTempVolume(SnapshotInfo, long)' should not be invoked unless "
+                    + DateraUtil.SNAPSHOT_ID + " exists.");
+        }
+
+        DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+        snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, "tempVolume");
+
+        if (snapshotDetails != null && snapshotDetails.getValue() != null
+                && snapshotDetails.getValue().equalsIgnoreCase("create")) {
+
+            snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.SNAPSHOT_ID);
+            String snapshotName = snapshotDetails.getValue();
+
+            String clonedAppInstanceName = getAppInstanceName(snapshotInfo);
+            DateraObject.AppInstance clonedAppInstance;
+
+            try {
+                clonedAppInstance = DateraUtil.cloneAppInstanceFromSnapshot(conn, clonedAppInstanceName, snapshotName,
+                        ipPool);
+                DateraUtil.pollAppInstanceAvailable(conn, clonedAppInstanceName);
+            } catch (DateraObject.DateraError | UnsupportedEncodingException e) {
+                String errMesg = "Unable to create temp volume " + csSnapshotId + "Error:" + e.getMessage();
+                s_logger.error(errMesg, e);
+                throw new CloudRuntimeException(errMesg, e);
+            }
+
+            if (clonedAppInstance == null) {
+                throw new CloudRuntimeException("Unable to clone volume for snapshot " + snapshotName);
+            }
+            s_logger.debug("Temp app_instance " + clonedAppInstanceName + " created");
+            addTempVolumeToDb(csSnapshotId, clonedAppInstanceName);
+            handleSnapshotDetails(csSnapshotId, DiskTO.IQN, DateraUtil.generateIqnPath(clonedAppInstance.getIqn()));
+
+        } else if (snapshotDetails != null && snapshotDetails.getValue() != null
+                && snapshotDetails.getValue().equalsIgnoreCase("delete")) {
+
+            snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.VOLUME_ID);
+            try {
+                s_logger.debug("Deleting temp app_instance " + snapshotDetails.getValue());
+                DateraUtil.deleteAppInstance(conn, snapshotDetails.getValue());
+            } catch (UnsupportedEncodingException | DateraObject.DateraError dateraError) {
+                String errMesg = "Error deleting temp volume " + dateraError.getMessage();
+                throw new CloudRuntimeException(errMesg, dateraError);
+            }
+
+            removeTempVolumeFromDb(csSnapshotId);
+
+            snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DiskTO.IQN);
+            snapshotDetailsDao.remove(snapshotDetails.getId());
+        } else {
+            throw new CloudRuntimeException("Invalid state in 'createTempVolume(SnapshotInfo, long)'");
+        }
+    }
+
+    /**
+     * This function gets invoked when we want to create a volume that caches the
+     * template on the primary storage. This 'template volume' will then be cloned
+     * to create new ROOT volumes.
+     *
+     * @param templateInfo  Information about the template like id, size
+     * @param storagePoolId the primary store to create this volume on
+     * @return IQN of the template volume
+     */
+    public String createTemplateVolume(TemplateInfo templateInfo, long storagePoolId) {
+        s_logger.debug("createTemplateVolume() as cache template called");
+
+        verifySufficientBytesForStoragePool(templateInfo, storagePoolId);
+
+        DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+        String iqn = null;
+        String appInstanceName = null;
+        try {
+
+            long templateSizeBytes = getDataObjectSizeIncludingHypervisorSnapshotReserve(templateInfo,
+                    storagePoolDao.findById(storagePoolId));
+
+            s_logger.debug("cached VM template sizeBytes: " + String.valueOf(templateSizeBytes));
+
+            int templateSizeGib = DateraUtil.bytesToGib(templateSizeBytes);
+
+            int templateIops = DateraUtil.MAX_IOPS;
+            int replicaCount = getNumReplicas(storagePoolId);
+            appInstanceName = getAppInstanceName(templateInfo);
+            String volumePlacement = getVolPlacement(storagePoolId);
+            String ipPool = getIpPool(storagePoolId);
+
+            s_logger.debug("cached VM template app_instance: " + appInstanceName + " ipPool: " + ipPool + " sizeGib: " + String.valueOf(templateSizeGib));
+            DateraObject.AppInstance appInstance = DateraUtil.createAppInstance(conn, appInstanceName, templateSizeGib,
+                    templateIops, replicaCount, volumePlacement, ipPool);
+
+            if (appInstance == null) {
+                throw new CloudRuntimeException("Unable to create Template volume " + templateInfo.getId());
+            }
+
+            iqn = appInstance.getIqn();
+
+            VMTemplateStoragePoolVO templatePoolRef = tmpltPoolDao.findByPoolTemplate(storagePoolId,
+                    templateInfo.getId());
+
+            templatePoolRef.setInstallPath(DateraUtil.generateIqnPath(iqn));
+            templatePoolRef.setLocalDownloadPath(appInstance.getName());
+            templatePoolRef.setTemplateSize(DateraUtil.gibToBytes(appInstance.getSize()));
+
+            tmpltPoolDao.update(templatePoolRef.getId(), templatePoolRef);
+
+            StoragePoolVO storagePool = storagePoolDao.findById(storagePoolId);
+
+            long capacityBytes = storagePool.getCapacityBytes();
+
+            long usedBytes = getUsedBytes(storagePool);
+
+            storagePool.setUsedBytes(usedBytes > capacityBytes ? capacityBytes : usedBytes);
+
+            storagePoolDao.update(storagePoolId, storagePool);
+
+        } catch (UnsupportedEncodingException | DateraObject.DateraError dateraError) {
+            if (DateraObject.DateraErrorTypes.ConflictError.equals(dateraError)) {
+                String errMesg = "template app Instance " + appInstanceName + " exists";
+                s_logger.debug(errMesg, dateraError);
+            } else {
+                String errMesg = "Unable to create template app Instance " + dateraError.getMessage();
+                s_logger.error(errMesg, dateraError);
+                throw new CloudRuntimeException(errMesg, dateraError);
+            }
+        }
+        return DateraUtil.generateIqnPath(iqn);
+    }
+
+    /**
+     * Entry point into the create logic. The storage subsystem call this method to
+     * create various data objects (volume/snapshot/template)
+     *
+     * @param dataStore
+     * @param dataObject
+     * @param callback
+     */
+    @Override
+    public void createAsync(DataStore dataStore, DataObject dataObject,
+            AsyncCompletionCallback<CreateCmdResult> callback) {
+        String iqn = null;
+        String errMsg = null;
+
+        try {
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                s_logger.debug("createAsync - creating volume");
+                iqn = createVolume((VolumeInfo) dataObject, dataStore.getId());
+            } else if (dataObject.getType() == DataObjectType.SNAPSHOT) {
+                s_logger.debug("createAsync - creating snapshot");
+                createTempVolume((SnapshotInfo) dataObject, dataStore.getId());
+            } else if (dataObject.getType() == DataObjectType.TEMPLATE) {
+                s_logger.debug("createAsync - creating template");
+                iqn = createTemplateVolume((TemplateInfo) dataObject, dataStore.getId());
+            } else {
+                errMsg = "Invalid DataObjectType (" + dataObject.getType() + ") passed to createAsync";
+                s_logger.error(errMsg);
+            }
+        } catch (Exception ex) {
+            errMsg = ex.getMessage();
+
+            s_logger.error(errMsg);
+
+            if (callback == null) {
+                throw ex;
+            }
+        }
+
+        if (callback != null) {
+
+            CreateCmdResult result = new CreateCmdResult(iqn, new Answer(null, errMsg == null, errMsg));
+
+            result.setResult(errMsg);
+
+            callback.complete(result);
+        }
+    }
+
+    /**
+     * Helper function which updates volume size in the volume_details table
+     * @param volumeId   Volume information
+     * @param volumeSize Size in GB
+     */
+    private void updateVolumeDetails(long volumeId, long volumeSize) {
+        VolumeDetailVO volumeDetailVo = volumeDetailsDao.findDetail(volumeId, DateraUtil.VOLUME_SIZE);
+
+        if (volumeDetailVo == null || volumeDetailVo.getValue() == null) {
+            volumeDetailVo = new VolumeDetailVO(volumeId, DateraUtil.VOLUME_SIZE, String.valueOf(volumeSize), false);
+
+            volumeDetailsDao.persist(volumeDetailVo);
+        }
+    }
+
+    /**
+     * Entrypoint for delete operations.
+     *
+     * @param dataStore  Primary storage
+     * @param dataObject object to delete
+     * @param callback   used for async, complete the callback after the operation
+     *                   is done.
+     */
+    @Override
+    public void deleteAsync(DataStore dataStore, DataObject dataObject,
+            AsyncCompletionCallback<CommandResult> callback) {
+        String errMsg = null;
+
+        try {
+            if (dataObject.getType() == DataObjectType.VOLUME) {
+                s_logger.debug("deleteAsync - deleting volume");
+                deleteVolume((VolumeInfo) dataObject, dataStore.getId());
+            } else if (dataObject.getType() == DataObjectType.SNAPSHOT) {
+                s_logger.debug("deleteAsync - deleting snapshot");
+                deleteSnapshot((SnapshotInfo) dataObject, dataStore.getId());
+            } else if (dataObject.getType() == DataObjectType.TEMPLATE) {
+                s_logger.debug("deleteAsync - deleting template");
+                deleteTemplate((TemplateInfo) dataObject, dataStore.getId());
+            } else {
+                errMsg = "Invalid DataObjectType (" + dataObject.getType() + ") passed to deleteAsync";
+            }
+        } catch (Exception ex) {
+            errMsg = ex.getMessage();
+
+            s_logger.error(errMsg);
+        }
+
+        CommandResult result = new CommandResult();
+
+        result.setResult(errMsg);
+
+        callback.complete(result);
+
+    }
+
+    @Override
+    public void copyAsync(DataObject srcData, DataObject destData,
+            AsyncCompletionCallback<CopyCommandResult> callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean canCopy(DataObject srcData, DataObject destData) {
+        return false;
+    }
+
+    /**
+     * Entry point for taking a snapshot. A native snpashot is taken if the
+     * hypervisor supports it, otherwise a volume is created and the data is copied
+     * via the hypervisor and Cloudstack will treat this volume as a snapshot.
+     *
+     * @param snapshotInfo Snapshot information
+     * @param callback     Async context
+     */
+    @Override
+    public void takeSnapshot(SnapshotInfo snapshotInfo, AsyncCompletionCallback<CreateCmdResult> callback) {
+        s_logger.debug("takeSnapshot() called");
+
+        CreateCmdResult result;
+
+        try {
+
+            VolumeInfo volumeInfo = snapshotInfo.getBaseVolume();
+            VolumeVO volumeVO = _volumeDao.findById(volumeInfo.getId());
+
+            long storagePoolId = volumeVO.getPoolId();
+
+            DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+            String baseAppInstanceName = getAppInstanceName(volumeInfo);
+
+            DateraObject.AppInstance baseAppInstance = DateraUtil.getAppInstance(conn, baseAppInstanceName);
+
+            Preconditions.checkNotNull(baseAppInstance);
+
+            SnapshotObjectTO snapshotObjectTo = (SnapshotObjectTO) snapshotInfo.getTO();
+
+            if (shouldTakeSnapshot(snapshotInfo.getId())) {
+
+                DateraObject.VolumeSnapshot volumeSnapshot = DateraUtil.takeVolumeSnapshot(conn, baseAppInstanceName);
+                if (volumeSnapshot == null) {
+                    s_logger.error("Unable to take native snapshot appInstance name:" + baseAppInstanceName
+                            + " volume ID " + volumeInfo.getId());
+                    throw new CloudRuntimeException("Unable to take native snapshot for volume " + volumeInfo.getId());
+                }
+
+                String snapshotName = baseAppInstanceName + ":" + volumeSnapshot.getTimestamp();
+                updateSnapshotDetails(snapshotInfo.getId(), baseAppInstanceName, snapshotName, storagePoolId,
+                        baseAppInstance.getSize());
+
+                snapshotObjectTo.setPath("DateraSnapshotId=" + snapshotName);
+                s_logger.info(" snapshot taken: " + snapshotName);
+
+            } else {
+
+                StoragePoolVO storagePool = _storagePoolDao.findById(storagePoolId);
+
+                long capacityBytes = storagePool.getCapacityBytes();
+                long usedBytes = getUsedBytes(storagePool);
+                int volumeSizeGib = baseAppInstance.getSize();
+                long volumeSizeBytes = DateraUtil.gibToBytes(volumeSizeGib);
+                String volumePlacement = getVolPlacement(storagePoolId);
+                String ipPool = getIpPool(storagePoolId);
+
+                usedBytes += volumeSizeBytes;
+
+                if (usedBytes > capacityBytes) {
+                    throw new CloudRuntimeException(
+                            "Insufficient amount of space remains in this primary storage to create a snapshot volume");
+                }
+
+                String appInstanceName = getAppInstanceName(snapshotInfo);
+                DateraObject.AppInstance snapshotAppInstance = DateraUtil.createAppInstance(conn, appInstanceName,
+                        volumeSizeGib, DateraUtil.MAX_IOPS, getNumReplicas(storagePoolId), volumePlacement, ipPool);
+
+                snapshotObjectTo.setPath(snapshotAppInstance.getName());
+                String iqnPath = DateraUtil.generateIqnPath(snapshotAppInstance.getIqn());
+                updateSnapshotDetails(snapshotInfo.getId(), snapshotAppInstance.getName(), storagePoolId,
+                        snapshotAppInstance.getSize(), iqnPath);
+
+                snapshotObjectTo.setPath("DateraVolumeId=" + snapshotAppInstance.getName());
+
+                storagePool.setUsedBytes(usedBytes);
+                // update size in storage pool
+                _storagePoolDao.update(storagePoolId, storagePool);
+            }
+
+            CreateObjectAnswer createObjectAnswer = new CreateObjectAnswer(snapshotObjectTo);
+
+            result = new CreateCmdResult(null, createObjectAnswer);
+
+            result.setResult(null);
+        } catch (Exception ex) {
+            s_logger.debug("Failed to take CloudStack snapshot: " + snapshotInfo.getId(), ex);
+
+            result = new CreateCmdResult(null, new CreateObjectAnswer(ex.toString()));
+
+            result.setResult(ex.toString());
+        }
+
+        callback.complete(result);
+    }
+
+    /**
+     * If a native snapshot is used, this function updates the snapshot_detauls
+     * table with the correct attributes
+     *
+     * @param csSnapshotId  Cloudstack snapshot ID
+     * @param volumeId      Base volume ID
+     * @param newSnapshotId SnapshotID on Datera (appInstanceName:Timestamp)
+     * @param storagePoolId Primary storage
+     * @param newVolumeSize VolumeSize in GB
+     */
+    private void updateSnapshotDetails(long csSnapshotId, String volumeId, String newSnapshotId, long storagePoolId,
+            long newVolumeSize) {
+        SnapshotDetailsVO snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.VOLUME_ID,
+                String.valueOf(volumeId), false);
+
+        snapshotDetailsDao.persist(snapshotDetail);
+
+        snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.SNAPSHOT_ID, String.valueOf(newSnapshotId),
+                false);
+
+        snapshotDetailsDao.persist(snapshotDetail);
+
+        snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.STORAGE_POOL_ID, String.valueOf(storagePoolId),
+                false);
+
+        snapshotDetailsDao.persist(snapshotDetail);
+
+        snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.VOLUME_SIZE, String.valueOf(newVolumeSize),
+                false);
+
+        snapshotDetailsDao.persist(snapshotDetail);
+    }
+
+    /**
+     * If a snapshot is represented as a volume, this function updates the
+     * snapshot_details table with the right attributes so that Cloudstack knows
+     * that this snapshot is a volume on the backend
+     *
+     * @param csSnapshotId            Snapshot ID on Cloudstack
+     * @param snapshotAppInstanceName snapshot name on Datera
+     *                                <appInstanceName>:<Timestamp>
+     * @param storagePoolId           primary storage
+     * @param snapshotSizeGb          snapshotSize
+     * @param snapshotIqn             IQN of snapshot
+     */
+    private void updateSnapshotDetails(long csSnapshotId, String snapshotAppInstanceName, long storagePoolId,
+            long snapshotSizeGb, String snapshotIqn) {
+        SnapshotDetailsVO snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.VOLUME_ID,
+                String.valueOf(snapshotAppInstanceName), false);
+
+        _snapshotDetailsDao.persist(snapshotDetail);
+
+        snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.STORAGE_POOL_ID, String.valueOf(storagePoolId),
+                false);
+
+        _snapshotDetailsDao.persist(snapshotDetail);
+
+        snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DateraUtil.VOLUME_SIZE, String.valueOf(snapshotSizeGb),
+                false);
+
+        _snapshotDetailsDao.persist(snapshotDetail);
+
+        snapshotDetail = new SnapshotDetailsVO(csSnapshotId, DiskTO.IQN, snapshotIqn, false);
+
+        _snapshotDetailsDao.persist(snapshotDetail);
+    }
+
+    /**
+     * Deletes snapshot on Datera
+     * @param snapshotInfo  snapshot information
+     * @param storagePoolId primary storage
+     * @throws UnsupportedEncodingException
+     * @throws                              DateraObject.DateraError
+     */
+    private void deleteSnapshot(SnapshotInfo snapshotInfo, long storagePoolId)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        long csSnapshotId = snapshotInfo.getId();
+
+        try {
+            DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+            SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.SNAPSHOT_ID);
+
+            if (snapshotDetails != null && snapshotDetails.getValue() != null) {
+                // Native snapshot being used, delete that
+
+                String snapshotName = snapshotDetails.getValue();
+
+                DateraUtil.deleteVolumeSnapshot(conn, snapshotName);
+
+                // check if the underlying volume needs to be deleted
+                SnapshotVO snapshot = _snapshotDao.findById(csSnapshotId);
+                VolumeVO volume = _volumeDao.findById(snapshot.getVolumeId());
+
+                if (volume == null) {
+
+                    // deleted from Cloudstack. Check if other snapshots are using this volume
+                    volume = _volumeDao.findByIdIncludingRemoved(snapshot.getVolumeId());
+
+                    if (shouldDeleteVolume(snapshot.getVolumeId(), snapshot.getId())) {
+                        DateraUtil.deleteAppInstance(conn, volume.getFolder());
+                    }
+                }
+            } else {
+
+                // An App Instance is being used to support the CloudStack volume snapshot.
+
+                snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.VOLUME_ID);
+                String appInstanceName = snapshotDetails.getValue();
+
+                DateraUtil.deleteAppInstance(conn, appInstanceName);
+            }
+
+            snapshotDetailsDao.removeDetails(csSnapshotId);
+
+            StoragePoolVO storagePool = storagePoolDao.findById(storagePoolId);
+
+            // getUsedBytes(StoragePool) will not include the snapshot to delete because it
+            // has already been deleted by this point
+            long usedBytes = getUsedBytes(storagePool);
+
+            storagePool.setUsedBytes(usedBytes < 0 ? 0 : usedBytes);
+
+            storagePoolDao.update(storagePoolId, storagePool);
+        } catch (Exception ex) {
+            s_logger.debug("Error in 'deleteSnapshot(SnapshotInfo, long)'. CloudStack snapshot ID: " + csSnapshotId,
+                    ex);
+            throw ex;
+        }
+    }
+
+    /**
+     * Deletes a template from Datera
+     * @param templateInfo  Information about Template
+     * @param storagePoolId Primary storage
+     * @throws UnsupportedEncodingException
+     * @throws                              DateraObject.DateraError
+     */
+    private void deleteTemplate(TemplateInfo templateInfo, long storagePoolId)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+        try {
+            DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+            String appInstanceName = getAppInstanceName(templateInfo);
+
+            DateraUtil.deleteAppInstance(conn, appInstanceName);
+
+            VMTemplateStoragePoolVO templatePoolRef = tmpltPoolDao.findByPoolTemplate(storagePoolId,
+                    templateInfo.getId());
+
+            tmpltPoolDao.remove(templatePoolRef.getId());
+
+            StoragePoolVO storagePool = storagePoolDao.findById(storagePoolId);
+
+            // getUsedBytes(StoragePool) will not include the template to delete because the
+            // "template_spool_ref" table has already been updated by this point
+            long usedBytes = getUsedBytes(storagePool);
+
+            storagePool.setUsedBytes(usedBytes < 0 ? 0 : usedBytes);
+
+            storagePoolDao.update(storagePoolId, storagePool);
+        } catch (Exception ex) {
+            s_logger.debug("Failed to delete template volume. CloudStack template ID: " + templateInfo.getId(), ex);
+
+            throw ex;
+        }
+    }
+
+    /**
+     * Revert snapshot for a volume
+     * @param snapshotInfo           Information about volume snapshot
+     * @param snapshotOnPrimaryStore Not used
+     * @throws CloudRuntimeException
+     */
+    @Override
+    public void revertSnapshot(SnapshotInfo snapshotInfo, SnapshotInfo snapshotOnPrimaryStore,
+            AsyncCompletionCallback<CommandResult> callback) {
+
+        VolumeInfo volumeInfo = snapshotInfo.getBaseVolume();
+        VolumeVO volumeVO = _volumeDao.findById(volumeInfo.getId());
+
+        long storagePoolId = volumeVO.getPoolId();
+        long csSnapshotId = snapshotInfo.getId();
+        s_logger.info("Datera - restoreVolumeSnapshot from snapshotId " + String.valueOf(csSnapshotId) + " to volume"
+                + volumeVO.getName());
+
+        DateraObject.AppInstance appInstance;
+
+        try {
+
+            if (volumeVO == null || volumeVO.getRemoved() != null) {
+                String errMsg = "The volume that the snapshot belongs to no longer exists.";
+
+                CommandResult commandResult = new CommandResult();
+
+                commandResult.setResult(errMsg);
+
+                callback.complete(commandResult);
+
+                return;
+            }
+
+            DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+            SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.SNAPSHOT_ID);
+
+            if (snapshotDetails != null && snapshotDetails.getValue() != null) {
+                // Native snapshot being used, restore snapshot from Datera AppInstance
+
+                String snapshotName = snapshotDetails.getValue();
+
+                s_logger.info("Datera - restoreVolumeSnapshot: " + snapshotName);
+
+                appInstance = DateraUtil.restoreVolumeSnapshot(conn, snapshotName);
+
+                Preconditions.checkNotNull(appInstance);
+
+                updateVolumeDetails(volumeInfo.getId(), appInstance.getSize());
+            }
+
+            CommandResult commandResult = new CommandResult();
+
+            callback.complete(commandResult);
+
+        } catch (Exception ex) {
+            s_logger.debug("Error in 'revertSnapshot()'. CloudStack snapshot ID: " + csSnapshotId, ex);
+            throw new CloudRuntimeException(ex.getMessage());
+        }
+
+    }
+
+    /**
+     * Resizes a volume on Datera, shrinking is not allowed. Resize also takes into
+     * account the HSR
+     * @param dataObject volume to resize
+     * @param callback   async context
+     */
+    @Override
+    public void resize(DataObject dataObject, AsyncCompletionCallback<CreateCmdResult> callback) {
+        String iqn = null;
+        String errMsg = null;
+
+        if (dataObject.getType() == DataObjectType.VOLUME) {
+            VolumeInfo volumeInfo = (VolumeInfo) dataObject;
+            String iqnPath = volumeInfo.get_iScsiName();
+            iqn = DateraUtil.extractIqn(iqnPath);
+
+            long storagePoolId = volumeInfo.getPoolId();
+            ResizeVolumePayload payload = (ResizeVolumePayload) volumeInfo.getpayload();
+            String appInstanceName = getAppInstanceName(volumeInfo);
+            long newSizeBytes = payload.newSize;
+
+            Integer hsr = volumeInfo.getHypervisorSnapshotReserve();
+
+            if (payload.newSize != null || payload.newHypervisorSnapshotReserve != null) {
+                if (payload.newHypervisorSnapshotReserve != null) {
+                    if (hsr != null) {
+                        if (payload.newHypervisorSnapshotReserve > hsr) {
+                            hsr = payload.newHypervisorSnapshotReserve;
+                        }
+                    } else {
+                        hsr = payload.newHypervisorSnapshotReserve;
+                    }
+                }
+
+                newSizeBytes = getVolumeSizeIncludingHypervisorSnapshotReserve(payload.newSize, hsr);
+            }
+
+            int newSize = DateraUtil.bytesToGib(newSizeBytes);
+
+            DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePoolId, _storagePoolDetailsDao);
+
+            try {
+
+                DateraObject.AppInstance appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+
+                Preconditions.checkNotNull(appInstance);
+
+                if (appInstance.getSize() < newSize) {
+                    DateraUtil.updateAppInstanceSize(conn, appInstanceName, Ints.checkedCast(newSize));
+                }
+
+                if (payload.newMaxIops != null && appInstance.getTotalIops() != payload.newMaxIops) {
+                    DateraUtil.updateAppInstanceIops(conn, appInstanceName, Ints.checkedCast(payload.newMaxIops));
+                }
+
+                appInstance = DateraUtil.getAppInstance(conn, appInstanceName);
+
+                Preconditions.checkNotNull(appInstance);
+
+                VolumeVO volume = _volumeDao.findById(volumeInfo.getId());
+
+                volume.setMinIops(payload.newMinIops);
+                volume.setMaxIops(payload.newMaxIops);
+
+                _volumeDao.update(volume.getId(), volume);
+
+                updateVolumeDetails(volume.getId(), appInstance.getSize());
+
+                Preconditions.checkNotNull(appInstance);
+
+            } catch (DateraObject.DateraError | UnsupportedEncodingException dateraError) {
+                dateraError.printStackTrace();
+            }
+
+        } else {
+            errMsg = "Invalid DataObjectType (" + dataObject.getType() + ") passed to resize";
+        }
+
+        CreateCmdResult result = new CreateCmdResult(iqn, new Answer(null, errMsg == null, errMsg));
+
+        result.setResult(errMsg);
+
+        callback.complete(result);
+    }
+
+    /**
+     * Adding temp volume to the snapshot_details table. This is used if we are
+     * using a native snapshot and we want to create a template out of the snapshot
+     *
+     * @param csSnapshotId   Source snasphot
+     * @param tempVolumeName temp volume app instance on Datera
+     */
+    private void addTempVolumeToDb(long csSnapshotId, String tempVolumeName) {
+        SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.VOLUME_ID);
+
+        if (snapshotDetails == null || snapshotDetails.getValue() == null) {
+            throw new CloudRuntimeException(
+                    "'addTempVolumeId' should not be invoked unless " + DateraUtil.VOLUME_ID + " exists.");
+        }
+
+        String originalVolumeId = snapshotDetails.getValue();
+
+        handleSnapshotDetails(csSnapshotId, DateraUtil.TEMP_VOLUME_ID, originalVolumeId);
+        handleSnapshotDetails(csSnapshotId, DateraUtil.VOLUME_ID, tempVolumeName);
+    }
+
+    private void removeTempVolumeFromDb(long csSnapshotId) {
+        SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(csSnapshotId, DateraUtil.TEMP_VOLUME_ID);
+
+        if (snapshotDetails == null || snapshotDetails.getValue() == null) {
+            throw new CloudRuntimeException(
+                    "'removeTempVolumeId' should not be invoked unless " + DateraUtil.TEMP_VOLUME_ID + " exists.");
+        }
+
+        String originalVolumeId = snapshotDetails.getValue();
+
+        handleSnapshotDetails(csSnapshotId, DateraUtil.VOLUME_ID, originalVolumeId);
+
+        snapshotDetailsDao.remove(snapshotDetails.getId());
+    }
+
+    /**
+     * Helper function to update snapshot_details table
+     *
+     * @param csSnapshotId Snapshot
+     * @param name         attribute name
+     * @param value        attribute value
+     */
+    private void handleSnapshotDetails(long csSnapshotId, String name, String value) {
+        snapshotDetailsDao.removeDetail(csSnapshotId, name);
+        SnapshotDetailsVO snapshotDetails = new SnapshotDetailsVO(csSnapshotId, name, value, false);
+        snapshotDetailsDao.persist(snapshotDetails);
+    }
+
+    private void verifySufficientBytesForStoragePool(DataObject dataObject, long storagePoolId) {
+        StoragePoolVO storagePool = storagePoolDao.findById(storagePoolId);
+
+        long requestedBytes = getDataObjectSizeIncludingHypervisorSnapshotReserve(dataObject, storagePool);
+
+        verifySufficientBytesForStoragePool(requestedBytes, storagePoolId);
+    }
+
+    private void verifySufficientBytesForStoragePool(long requestedBytes, long storagePoolId) {
+        StoragePoolVO storagePool = storagePoolDao.findById(storagePoolId);
+
+        long capacityBytes = storagePool.getCapacityBytes();
+        long usedBytes = getUsedBytes(storagePool);
+
+        usedBytes += requestedBytes;
+
+        if (usedBytes > capacityBytes) {
+            throw new CloudRuntimeException("Insufficient amount of space remains in this primary storage");
+        }
+    }
+
+    /**
+     * Returns true if we can take a native snapshot else returns false. Set in
+     * StorageSystemSnapshotStrategy
+     * @param snapshotId Snapshot
+     * @return true if native snapshot, false otherwise
+     */
+    private boolean shouldTakeSnapshot(long snapshotId) {
+        SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(snapshotId, "takeSnapshot");
+
+        if (snapshotDetails != null && snapshotDetails.getValue() != null) {
+            return new Boolean(snapshotDetails.getValue());
+        }
+
+        return false;
+    }
+
+    private long getCsIdForCloning(long volumeId, String cloneOf) {
+        VolumeDetailVO volumeDetail = volumeDetailsDao.findDetail(volumeId, cloneOf);
+
+        if (volumeDetail != null && volumeDetail.getValue() != null) {
+            return new Long(volumeDetail.getValue());
+        }
+
+        return Long.MIN_VALUE;
+    }
+
+    /**
+     * Checks if the volume can be safely deleted. ie it has no native snapshots
+     * @param csVolumeId         Volume
+     * @param snapshotToIgnoreId Used to check if this is the only snapshot on the
+     *                           volume
+     * @return true if we can delete, false otherwise
+     */
+    private boolean shouldDeleteVolume(Long csVolumeId, Long snapshotToIgnoreId) {
+
+        List<SnapshotVO> lstSnapshots = getNonDestroyedSnapshots(csVolumeId);
+
+        for (SnapshotVO snapshot : lstSnapshots) {
+            if (snapshotToIgnoreId != null && snapshot.getId() == snapshotToIgnoreId) {
+                continue;
+            }
+            SnapshotDetailsVO snapshotDetails = snapshotDetailsDao.findDetail(snapshot.getId(), DateraUtil.SNAPSHOT_ID);
+
+            if (snapshotDetails != null && snapshotDetails.getValue() != null) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private List<SnapshotVO> getNonDestroyedSnapshots(long csVolumeId) {
+        List<SnapshotVO> lstSnapshots = _snapshotDao.listByVolumeId(csVolumeId);
+
+        if (lstSnapshots == null) {
+            lstSnapshots = new ArrayList<>();
+        }
+
+        List<SnapshotVO> lstSnapshots2 = new ArrayList<>();
+
+        for (SnapshotVO snapshot : lstSnapshots) {
+            if (!Snapshot.State.Destroyed.equals(snapshot.getState())) {
+                lstSnapshots2.add(snapshot);
+            }
+        }
+
+        return lstSnapshots2;
+    }
+
+    private long getVolumeSizeIncludingHypervisorSnapshotReserve(long volumeSize, Integer hypervisorSnapshotReserve) {
+        if (hypervisorSnapshotReserve != null) {
+            hypervisorSnapshotReserve = Math.max(hypervisorSnapshotReserve, s_lowestHypervisorSnapshotReserve);
+
+            volumeSize += volumeSize * (hypervisorSnapshotReserve / 100f);
+        }
+
+        return volumeSize;
+    }
+
+    @Override
+    public void handleQualityOfServiceForVolumeMigration(VolumeInfo volumeInfo,
+            QualityOfServiceState qualityOfServiceState) {
+
+    }
+}
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
new file mode 100644
index 0000000..ff253fc
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/lifecycle/DateraPrimaryDataStoreLifeCycle.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.datastore.lifecycle;
+
+import com.cloud.agent.api.StoragePoolInfo;
+import com.cloud.capacity.CapacityManager;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.DataCenterVO;
+import com.cloud.dc.dao.DataCenterDao;
+import com.cloud.dc.ClusterDetailsDao;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.resource.ResourceManager;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.Storage.StoragePoolType;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePool;
+import com.cloud.storage.StoragePoolAutomation;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.SnapshotDetailsDao;
+import com.cloud.storage.dao.SnapshotDetailsVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.utils.exception.CloudRuntimeException;
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreLifeCycle;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreParameters;
+import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.datastore.util.DateraUtil;
+import org.apache.cloudstack.storage.volume.datastore.PrimaryDataStoreHelper;
+import org.apache.log4j.Logger;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DateraPrimaryDataStoreLifeCycle implements PrimaryDataStoreLifeCycle {
+    private static final Logger s_logger = Logger.getLogger(DateraPrimaryDataStoreLifeCycle.class);
+
+    @Inject
+    private CapacityManager _capacityMgr;
+    @Inject
+    private DataCenterDao zoneDao;
+    @Inject
+    private ClusterDao _clusterDao;
+    @Inject
+    private ClusterDetailsDao _clusterDetailsDao;
+    @Inject
+    private PrimaryDataStoreDao storagePoolDao;
+    @Inject
+    private HostDao _hostDao;
+    @Inject
+    private PrimaryDataStoreHelper dataStoreHelper;
+    @Inject
+    private ResourceManager _resourceMgr;
+    @Inject
+    private SnapshotDao _snapshotDao;
+    @Inject
+    private SnapshotDetailsDao _snapshotDetailsDao;
+    @Inject
+    private StorageManager _storageMgr;
+    @Inject
+    private StoragePoolHostDao _storagePoolHostDao;
+    @Inject
+    private StoragePoolAutomation storagePoolAutomation;
+
+    @Override
+    public DataStore initialize(Map<String, Object> dsInfos) {
+
+        String url = (String) dsInfos.get("url");
+        Long zoneId = (Long) dsInfos.get("zoneId");
+        Long podId = (Long) dsInfos.get("podId");
+        Long clusterId = (Long) dsInfos.get("clusterId");
+        String storagePoolName = (String) dsInfos.get("name");
+        String providerName = (String) dsInfos.get("providerName");
+        Long capacityBytes = (Long) dsInfos.get("capacityBytes");
+        Long capacityIops = (Long) dsInfos.get("capacityIops");
+        String tags = (String) dsInfos.get("tags");
+        @SuppressWarnings("unchecked")
+        Map<String, String> details = (Map<String, String>) dsInfos.get("details");
+        String domainName = details.get("domainname");
+
+        String storageVip = DateraUtil.getStorageVip(url);
+
+        int storagePort = DateraUtil.getStoragePort(url);
+        int numReplicas = DateraUtil.getNumReplicas(url);
+        String volPlacement = DateraUtil.getVolPlacement(url);
+        String clusterAdminUsername = DateraUtil.getValue(DateraUtil.CLUSTER_ADMIN_USERNAME, url);
+        String clusterAdminPassword = DateraUtil.getValue(DateraUtil.CLUSTER_ADMIN_PASSWORD, url);
+        String uuid;
+        String randomString;
+
+        PrimaryDataStoreParameters parameters = new PrimaryDataStoreParameters();
+
+        // checks if primary datastore is clusterwide. If so, uses the clusterId to set
+        // the uuid and then sets the podId and clusterId parameters
+        if (clusterId != null) {
+            if (podId == null) {
+                throw new CloudRuntimeException("The Pod ID must be specified.");
+            }
+            if (zoneId == null) {
+                throw new CloudRuntimeException("The Zone ID must be specified.");
+            }
+            ClusterVO cluster = _clusterDao.findById(clusterId);
+            String clusterUuid = cluster.getUuid();
+            randomString = DateraUtil.generateUUID(clusterUuid);
+            // uuid = DateraUtil.PROVIDER_NAME + "_" + cluster.getUuid() + "_" + storageVip
+            // + "_" + clusterAdminUsername + "_" + numReplicas + "_" + volPlacement;
+            uuid = DateraUtil.PROVIDER_NAME + "_" + clusterUuid + "_" + randomString;
+            s_logger.debug("Datera - Setting Datera cluster-wide primary storage uuid to " + uuid);
+            parameters.setPodId(podId);
+            parameters.setClusterId(clusterId);
+
+            HypervisorType hypervisorType = getHypervisorTypeForCluster(clusterId);
+
+            if (!isSupportedHypervisorType(hypervisorType)) {
+                throw new CloudRuntimeException(hypervisorType + " is not a supported hypervisor type.");
+            }
+
+        }
+        // sets the uuid with zoneid in it
+        else {
+            DataCenterVO zone = zoneDao.findById(zoneId);
+            String zoneUuid = zone.getUuid();
+            randomString = DateraUtil.generateUUID(zoneUuid);
+            // uuid = DateraUtil.PROVIDER_NAME + "_" + zone.getUuid() + "_" + storageVip +
+            // "_" + clusterAdminUsername + "_" + numReplicas + "_" + volPlacement;
+            uuid = DateraUtil.PROVIDER_NAME + "_" + zoneUuid + "_" + randomString;
+
+            s_logger.debug("Datera - Setting Datera zone-wide primary storage uuid to " + uuid);
+        }
+        if (capacityBytes == null || capacityBytes <= 0) {
+            throw new IllegalArgumentException("'capacityBytes' must be present and greater than 0.");
+        }
+
+        if (capacityIops == null || capacityIops <= 0) {
+            throw new IllegalArgumentException("'capacityIops' must be present and greater than 0.");
+        }
+
+        if (domainName == null) {
+            domainName = "ROOT";
+            s_logger.debug("setting the domain to ROOT");
+        }
+        s_logger.debug("Datera - domainName: " + domainName);
+
+        parameters.setHost(storageVip);
+        parameters.setPort(storagePort);
+        parameters.setPath(DateraUtil.getModifiedUrl(url));
+        parameters.setType(StoragePoolType.Iscsi);
+        parameters.setUuid(uuid);
+        parameters.setZoneId(zoneId);
+        parameters.setName(storagePoolName);
+        parameters.setProviderName(providerName);
+        parameters.setManaged(true);
+        parameters.setCapacityBytes(capacityBytes);
+        parameters.setUsedBytes(0);
+        parameters.setCapacityIops(capacityIops);
+        parameters.setHypervisorType(HypervisorType.Any);
+        parameters.setTags(tags);
+        parameters.setDetails(details);
+
+        String managementVip = DateraUtil.getManagementVip(url);
+        int managementPort = DateraUtil.getManagementPort(url);
+
+        details.put(DateraUtil.MANAGEMENT_VIP, managementVip);
+        details.put(DateraUtil.MANAGEMENT_PORT, String.valueOf(managementPort));
+        details.put(DateraUtil.CLUSTER_ADMIN_USERNAME, clusterAdminUsername);
+        details.put(DateraUtil.CLUSTER_ADMIN_PASSWORD, clusterAdminPassword);
+
+        long lClusterDefaultMinIops = 100;
+        long lClusterDefaultMaxIops = 15000;
+
+        try {
+            String clusterDefaultMinIops = DateraUtil.getValue(DateraUtil.CLUSTER_DEFAULT_MIN_IOPS, url);
+
+            if (clusterDefaultMinIops != null && clusterDefaultMinIops.trim().length() > 0) {
+                lClusterDefaultMinIops = Long.parseLong(clusterDefaultMinIops);
+            }
+        } catch (NumberFormatException ex) {
+            s_logger.warn("Cannot parse the setting of " + DateraUtil.CLUSTER_DEFAULT_MIN_IOPS
+                    + ", using default value: " + lClusterDefaultMinIops + ". Exception: " + ex);
+        }
+
+        try {
+            String clusterDefaultMaxIops = DateraUtil.getValue(DateraUtil.CLUSTER_DEFAULT_MAX_IOPS, url);
+
+            if (clusterDefaultMaxIops != null && clusterDefaultMaxIops.trim().length() > 0) {
+                lClusterDefaultMaxIops = Long.parseLong(clusterDefaultMaxIops);
+            }
+        } catch (NumberFormatException ex) {
+            s_logger.warn("Cannot parse the setting of " + DateraUtil.CLUSTER_DEFAULT_MAX_IOPS
+                    + ", using default value: " + lClusterDefaultMaxIops + ". Exception: " + ex);
+        }
+
+        if (lClusterDefaultMinIops > lClusterDefaultMaxIops) {
+            throw new CloudRuntimeException("The parameter '" + DateraUtil.CLUSTER_DEFAULT_MIN_IOPS
+                    + "' must be less than or equal to the parameter '" + DateraUtil.CLUSTER_DEFAULT_MAX_IOPS + "'.");
+        }
+
+        if (numReplicas < DateraUtil.MIN_NUM_REPLICAS || numReplicas > DateraUtil.MAX_NUM_REPLICAS) {
+            throw new CloudRuntimeException("The parameter '" + DateraUtil.NUM_REPLICAS + "' must be between  "
+                    + DateraUtil.CLUSTER_DEFAULT_MAX_IOPS + "' and " + DateraUtil.MAX_NUM_REPLICAS);
+        }
+
+        details.put(DateraUtil.CLUSTER_DEFAULT_MIN_IOPS, String.valueOf(lClusterDefaultMinIops));
+        details.put(DateraUtil.CLUSTER_DEFAULT_MAX_IOPS, String.valueOf(lClusterDefaultMaxIops));
+
+        details.put(DateraUtil.NUM_REPLICAS, String.valueOf(DateraUtil.getNumReplicas(url)));
+        details.put(DateraUtil.VOL_PLACEMENT, String.valueOf(DateraUtil.getVolPlacement(url)));
+        details.put(DateraUtil.IP_POOL, String.valueOf(DateraUtil.getIpPool(url)));
+
+        return dataStoreHelper.createPrimaryDataStore(parameters);
+    }
+
+    @Override
+    public boolean attachHost(DataStore store, HostScope scope, StoragePoolInfo existingInfo) {
+        return true; // should be ignored for zone-wide-only plug-ins like
+    }
+
+    @Override
+    public boolean attachCluster(DataStore datastore, ClusterScope scope) {
+        PrimaryDataStoreInfo primaryDataStoreInfo = (PrimaryDataStoreInfo) datastore;
+
+        // check if there is at least one host up in this cluster
+        List<HostVO> allHosts = _resourceMgr.listAllUpAndEnabledHosts(Host.Type.Routing,
+                primaryDataStoreInfo.getClusterId(), primaryDataStoreInfo.getPodId(),
+                primaryDataStoreInfo.getDataCenterId());
+
+        if (allHosts.isEmpty()) {
+            storagePoolDao.expunge(primaryDataStoreInfo.getId());
+
+            throw new CloudRuntimeException(
+                    "No host up to associate a storage pool with in cluster " + primaryDataStoreInfo.getClusterId());
+        }
+
+        List<HostVO> poolHosts = new ArrayList<HostVO>();
+
+        for (HostVO host : allHosts) {
+            try {
+                _storageMgr.connectHostToSharedPool(host.getId(), primaryDataStoreInfo.getId());
+
+                poolHosts.add(host);
+            } catch (Exception e) {
+                s_logger.warn("Unable to establish a connection between " + host + " and " + primaryDataStoreInfo, e);
+            }
+        }
+
+        if (poolHosts.isEmpty()) {
+            s_logger.warn("No host can access storage pool '" + primaryDataStoreInfo + "' on cluster '"
+                    + primaryDataStoreInfo.getClusterId() + "'.");
+
+            storagePoolDao.expunge(primaryDataStoreInfo.getId());
+
+            throw new CloudRuntimeException("Failed to access storage pool");
+        }
+
+        dataStoreHelper.attachCluster(datastore);
+
+        return true;
+        // throw new UnsupportedOperationException("Only Zone-wide scope is supported
+        // with the Datera Storage driver");
+    }
+
+    @Override
+    public boolean attachZone(DataStore dataStore, ZoneScope scope, HypervisorType hypervisorType) {
+        dataStoreHelper.attachZone(dataStore);
+
+        List<HostVO> xenServerHosts = _resourceMgr
+                .listAllUpAndEnabledHostsInOneZoneByHypervisor(HypervisorType.XenServer, scope.getScopeId());
+        List<HostVO> vmWareServerHosts = _resourceMgr
+                .listAllUpAndEnabledHostsInOneZoneByHypervisor(HypervisorType.VMware, scope.getScopeId());
+        List<HostVO> kvmHosts = _resourceMgr.listAllUpAndEnabledHostsInOneZoneByHypervisor(HypervisorType.KVM,
+                scope.getScopeId());
+        List<HostVO> hosts = new ArrayList<HostVO>();
+
+        hosts.addAll(xenServerHosts);
+        hosts.addAll(vmWareServerHosts);
+        hosts.addAll(kvmHosts);
+
+        for (HostVO host : hosts) {
+            try {
+                _storageMgr.connectHostToSharedPool(host.getId(), dataStore.getId());
+            } catch (Exception e) {
+                s_logger.warn("Unable to establish a connection between " + host + " and " + dataStore, e);
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean maintain(DataStore dataStore) {
+        storagePoolAutomation.maintain(dataStore);
+        dataStoreHelper.maintain(dataStore);
+
+        return true;
+    }
+
+    @Override
+    public boolean cancelMaintain(DataStore store) {
+        dataStoreHelper.cancelMaintain(store);
+        storagePoolAutomation.cancelMaintain(store);
+
+        return true;
+    }
+
+    @Override
+    public boolean deleteDataStore(DataStore store) {
+        List<SnapshotVO> lstSnapshots = _snapshotDao.listAll();
+
+        if (lstSnapshots != null) {
+            for (SnapshotVO snapshot : lstSnapshots) {
+                SnapshotDetailsVO snapshotDetails = _snapshotDetailsDao.findDetail(snapshot.getId(),
+                        DateraUtil.STORAGE_POOL_ID);
+
+                // if this snapshot belongs to the storagePool that was passed in
+                if (snapshotDetails != null && snapshotDetails.getValue() != null
+                        && Long.parseLong(snapshotDetails.getValue()) == store.getId()) {
+                    throw new CloudRuntimeException(
+                            "This primary storage cannot be deleted because it currently contains one or more snapshots.");
+                }
+            }
+        }
+
+        return dataStoreHelper.deletePrimaryDataStore(store);
+    }
+
+    @Override
+    public boolean migrateToObjectStore(DataStore store) {
+        return false;
+    }
+
+    @Override
+    public void updateStoragePool(StoragePool storagePool, Map<String, String> details) {
+        StoragePoolVO storagePoolVo = storagePoolDao.findById(storagePool.getId());
+
+        String strCapacityBytes = details.get(PrimaryDataStoreLifeCycle.CAPACITY_BYTES);
+        Long capacityBytes = strCapacityBytes != null ? Long.parseLong(strCapacityBytes) : null;
+
+        if (capacityBytes != null) {
+            long usedBytes = _capacityMgr.getUsedBytes(storagePoolVo);
+
+            if (capacityBytes < usedBytes) {
+                throw new CloudRuntimeException(
+                        "Cannot reduce the number of bytes for this storage pool as it would lead to an insufficient number of bytes");
+            }
+        }
+
+        String strCapacityIops = details.get(PrimaryDataStoreLifeCycle.CAPACITY_IOPS);
+        Long capacityIops = strCapacityIops != null ? Long.parseLong(strCapacityIops) : null;
+
+        if (capacityIops != null) {
+            long usedIops = _capacityMgr.getUsedIops(storagePoolVo);
+
+            if (capacityIops < usedIops) {
+                throw new CloudRuntimeException(
+                        "Cannot reduce the number of IOPS for this storage pool as it would lead to an insufficient number of IOPS");
+            }
+        }
+    }
+
+    @Override
+    public void enableStoragePool(DataStore dataStore) {
+        dataStoreHelper.enable(dataStore);
+    }
+
+    @Override
+    public void disableStoragePool(DataStore dataStore) {
+        dataStoreHelper.disable(dataStore);
+    }
+
+    private HypervisorType getHypervisorTypeForCluster(long clusterId) {
+        ClusterVO cluster = _clusterDao.findById(clusterId);
+
+        if (cluster == null) {
+            throw new CloudRuntimeException("Cluster ID '" + clusterId + "' was not found in the database.");
+        }
+
+        return cluster.getHypervisorType();
+    }
+
+    private static boolean isSupportedHypervisorType(HypervisorType hypervisorType) {
+        return HypervisorType.XenServer.equals(hypervisorType) || HypervisorType.VMware.equals(hypervisorType)
+                || HypervisorType.KVM.equals(hypervisorType);
+    }
+
+    private HypervisorType getHypervisorType(long hostId) {
+        HostVO host = _hostDao.findById(hostId);
+
+        if (host != null) {
+            return host.getHypervisorType();
+        }
+
+        return HypervisorType.None;
+    }
+}
\ No newline at end of file
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/provider/DateraHostListener.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/provider/DateraHostListener.java
new file mode 100644
index 0000000..2cb4e8c
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/provider/DateraHostListener.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.datastore.provider;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.ModifyStoragePoolAnswer;
+import com.cloud.agent.api.ModifyStoragePoolCommand;
+import com.cloud.agent.api.ModifyTargetsCommand;
+import com.cloud.alert.AlertManager;
+import com.cloud.dc.ClusterDetailsDao;
+import com.cloud.dc.ClusterDetailsVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.StoragePool;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.dao.VMInstanceDao;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.HypervisorHostListener;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.datastore.util.DateraObject;
+import org.apache.cloudstack.storage.datastore.util.DateraUtil;
+import org.apache.log4j.Logger;
+
+import javax.inject.Inject;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DateraHostListener implements HypervisorHostListener {
+    private static final Logger s_logger = Logger.getLogger(DateraHostListener.class);
+
+    @Inject private AgentManager _agentMgr;
+    @Inject private AlertManager _alertMgr;
+    @Inject private ClusterDao _clusterDao;
+    @Inject private ClusterDetailsDao _clusterDetailsDao;
+    @Inject private DataStoreManager _dataStoreMgr;
+    @Inject private HostDao _hostDao;
+    @Inject private PrimaryDataStoreDao _storagePoolDao;
+    @Inject private StoragePoolDetailsDao _storagePoolDetailsDao;
+    @Inject private StoragePoolHostDao storagePoolHostDao;
+    @Inject private VMInstanceDao _vmDao;
+    @Inject private VolumeDao _volumeDao;
+
+    @Override
+    public boolean hostAdded(long hostId) {
+        return true;
+    }
+
+    @Override
+    public boolean hostConnect(long hostId, long storagePoolId) {
+
+        HostVO host = _hostDao.findById(hostId);
+
+        if (host == null) {
+            s_logger.error("Failed to add host by HostListener as host was not found with id : " + hostId);
+            return false;
+        }
+        StoragePoolHostVO storagePoolHost = storagePoolHostDao.findByPoolHost(storagePoolId, hostId);
+
+        if (storagePoolHost == null) {
+            storagePoolHost = new StoragePoolHostVO(storagePoolId, hostId, "");
+
+            storagePoolHostDao.persist(storagePoolHost);
+        }
+
+        if (host.getHypervisorType().equals(HypervisorType.XenServer)) {
+            handleXenServer(host.getClusterId(), host.getId(), storagePoolId);
+        }
+        else if (host.getHypervisorType().equals(HypervisorType.KVM)) {
+            //handleKVM(host.getClusterId(), host.getId(), storagePoolId);
+            handleKVM(hostId, storagePoolId);
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean hostDisconnected(long hostId, long storagePoolId) {
+        StoragePoolHostVO storagePoolHost = storagePoolHostDao.findByPoolHost(storagePoolId, hostId);
+
+        if (storagePoolHost != null) {
+            storagePoolHostDao.deleteStoragePoolHostDetails(hostId, storagePoolId);
+        }
+
+        return true;
+    }
+
+    @Override
+    public boolean hostAboutToBeRemoved(long hostId) {
+        HostVO host = _hostDao.findById(hostId);
+
+        handleVMware(host, false);
+
+        return true;
+
+    }
+
+    @Override
+    public boolean hostRemoved(long hostId, long clusterId) {
+
+        ClusterVO clusterVO = _clusterDao.findById(clusterId);
+        HostVO hostVO = _hostDao.findByIdIncludingRemoved(hostId);
+        String initiatorName = DateraUtil.INITIATOR_PREFIX + "-" + hostVO.getUuid();
+
+        int s_lockTimeInSeconds = 5;
+
+        GlobalLock lock = GlobalLock.getInternLock(clusterVO.getUuid());
+
+        if (!lock.lock(s_lockTimeInSeconds)) {
+            String errMsg = "Couldn't lock the DB on the following string: " + clusterVO.getUuid();
+
+            s_logger.debug(errMsg);
+
+            throw new CloudRuntimeException(errMsg);
+        }
+
+        try {
+            List<StoragePoolVO> storagePools = _storagePoolDao.findPoolsByProvider(DateraUtil.PROVIDER_NAME);
+
+            if (storagePools != null && storagePools.size() > 0) {
+                for (StoragePoolVO storagePool : storagePools) {
+                    ClusterDetailsVO clusterDetail = _clusterDetailsDao.findDetail(clusterId, DateraUtil.getInitiatorGroupKey(storagePool.getId()));
+
+                    String initiatorGroupName = clusterDetail != null ? clusterDetail.getValue() : null;
+
+                    if (initiatorGroupName != null && DateraUtil.hostSupport_iScsi(hostVO) ) {
+                        DateraObject.DateraConnection conn = DateraUtil.getDateraConnection(storagePool.getId(), _storagePoolDetailsDao);
+                        DateraObject.Initiator initiator = DateraUtil.getInitiator(conn, hostVO.getStorageUrl());
+                        DateraObject.InitiatorGroup initiatorGroup = DateraUtil.getInitiatorGroup(conn, initiatorGroupName);
+
+                        if (initiator!= null && DateraUtil.isInitiatorPresentInGroup(initiator, initiatorGroup)) {
+                            DateraUtil.removeInitiatorFromGroup(conn, initiator.getPath(), initiatorGroupName);
+                        }
+                    }
+                }
+            }
+
+        } catch (DateraObject.DateraError | UnsupportedEncodingException e) {
+            s_logger.warn("Error while removing host from initiator groups ", e);
+        } finally {
+            lock.unlock();
+            lock.releaseRef();
+        }
+
+        return true;
+    }
+
+    private void handleXenServer(long clusterId, long hostId, long storagePoolId) {
+        List<String> storagePaths = getStoragePaths(clusterId, storagePoolId);
+
+        StoragePool storagePool = (StoragePool)_dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
+
+        for (String storagePath : storagePaths) {
+            ModifyStoragePoolCommand cmd = new ModifyStoragePoolCommand(true, storagePool);
+
+            cmd.setStoragePath(storagePath);
+
+            sendModifyStoragePoolCommand(cmd, storagePool, hostId);
+        }
+    }
+
+    private void handleVMware(HostVO host, boolean add) {
+        if (HypervisorType.VMware.equals(host.getHypervisorType())) {
+            List<StoragePoolVO> storagePools = _storagePoolDao.findPoolsByProvider(DateraUtil.PROVIDER_NAME);
+
+            if (storagePools != null && storagePools.size() > 0) {
+                List<Map<String, String>> targets = new ArrayList<>();
+
+                for (StoragePoolVO storagePool : storagePools) {
+                    List<Map<String, String>> targetsForClusterAndStoragePool = getTargets(host.getClusterId(), storagePool.getId());
+
+                    targets.addAll(targetsForClusterAndStoragePool);
+                }
+
+                ModifyTargetsCommand cmd = new ModifyTargetsCommand();
+
+                cmd.setAdd(add);
+                cmd.setTargets(targets);
+
+                sendModifyTargetsCommand(cmd, host.getId());
+            }
+        }
+    }
+
+    private void handleKVM(long clusterId, long hostId, long storagePoolId) {
+        List<String> storagePaths = getStoragePaths(clusterId, storagePoolId);
+
+        StoragePool storagePool = (StoragePool)_dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
+
+        for (String storagePath : storagePaths) {
+            ModifyStoragePoolCommand cmd = new ModifyStoragePoolCommand(true, storagePool);
+
+            cmd.setStoragePath(storagePath);
+
+            sendModifyStoragePoolCommand(cmd, storagePool, hostId);
+        }
+    }
+
+    private void handleKVM(long hostId, long storagePoolId) {
+        StoragePool storagePool = (StoragePool)_dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
+
+        ModifyStoragePoolCommand cmd = new ModifyStoragePoolCommand(true, storagePool);
+
+        sendModifyStoragePoolCommand(cmd, storagePool, hostId);
+    }
+
+    private List<String> getStoragePaths(long clusterId, long storagePoolId) {
+        List<String> storagePaths = new ArrayList<>();
+
+        // If you do not pass in null for the second parameter, you only get back applicable ROOT disks.
+        List<VolumeVO> volumes = _volumeDao.findByPoolId(storagePoolId, null);
+
+        if (volumes != null) {
+            for (VolumeVO volume : volumes) {
+                Long instanceId = volume.getInstanceId();
+
+                if (instanceId != null) {
+                    VMInstanceVO vmInstance = _vmDao.findById(instanceId);
+
+                    Long hostIdForVm = vmInstance.getHostId() != null ? vmInstance.getHostId() : vmInstance.getLastHostId();
+
+                    if (hostIdForVm != null ) {
+                        HostVO hostForVm = _hostDao.findById(hostIdForVm);
+
+                        if (hostForVm.getClusterId().equals(clusterId)) {
+                            storagePaths.add(volume.get_iScsiName());
+                        }
+                    }
+                }
+            }
+        }
+
+        return storagePaths;
+    }
+
+    private void sendModifyTargetsCommand(ModifyTargetsCommand cmd, long hostId) {
+        Answer answer = _agentMgr.easySend(hostId, cmd);
+
+        if (answer == null) {
+            throw new CloudRuntimeException("Unable to get an answer to the modify targets command");
+        }
+
+        if (!answer.getResult()) {
+            String msg = "Unable to modify targets on the following host: " + hostId;
+
+            HostVO host = _hostDao.findById(hostId);
+
+            _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), msg, msg);
+
+            throw new CloudRuntimeException(msg);
+        }
+    }
+
+    private void sendModifyStoragePoolCommand(ModifyStoragePoolCommand cmd, StoragePool storagePool, long hostId) {
+        Answer answer = _agentMgr.easySend(hostId, cmd);
+
+        if (answer == null) {
+            throw new CloudRuntimeException("Unable to get an answer to the modify storage pool command (" + storagePool.getId() + ")");
+        }
+
+        if (!answer.getResult()) {
+            String msg = "Unable to attach storage pool " + storagePool.getId() + " to host " + hostId;
+
+            _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, storagePool.getDataCenterId(), storagePool.getPodId(), msg, msg);
+
+            throw new CloudRuntimeException("Unable to establish a connection from agent to storage pool " + storagePool.getId() + " due to " + answer.getDetails() +
+                " (" + storagePool.getId() + ")");
+        }
+
+        assert (answer instanceof ModifyStoragePoolAnswer) : "ModifyStoragePoolAnswer expected ; Pool = " + storagePool.getId() + " Host = " + hostId;
+
+        s_logger.info("Connection established between storage pool " + storagePool + " and host + " + hostId);
+    }
+
+    private List<Map<String, String>> getTargets(long clusterId, long storagePoolId) {
+        List<Map<String, String>> targets = new ArrayList<>();
+
+        StoragePoolVO storagePool = _storagePoolDao.findById(storagePoolId);
+
+        // If you do not pass in null for the second parameter, you only get back applicable ROOT disks.
+        List<VolumeVO> volumes = _volumeDao.findByPoolId(storagePoolId, null);
+
+        if (volumes != null) {
+            for (VolumeVO volume : volumes) {
+                Long instanceId = volume.getInstanceId();
+
+                if (instanceId != null) {
+                    VMInstanceVO vmInstance = _vmDao.findById(instanceId);
+
+                    Long hostIdForVm = vmInstance.getHostId() != null ? vmInstance.getHostId() : vmInstance.getLastHostId();
+
+                    if (hostIdForVm != null) {
+                        HostVO hostForVm = _hostDao.findById(hostIdForVm);
+
+                        if (hostForVm.getClusterId().equals(clusterId)) {
+                            Map<String, String> details = new HashMap<>();
+
+                            details.put(ModifyTargetsCommand.IQN, volume.get_iScsiName());
+                            details.put(ModifyTargetsCommand.STORAGE_HOST, storagePool.getHostAddress());
+                            details.put(ModifyTargetsCommand.STORAGE_PORT, String.valueOf(storagePool.getPort()));
+
+                            targets.add(details);
+                        }
+                    }
+                }
+            }
+        }
+
+        return targets;
+    }
+}
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/provider/DateraPrimaryDataStoreProvider.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/provider/DateraPrimaryDataStoreProvider.java
new file mode 100644
index 0000000..70f1f9c
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/provider/DateraPrimaryDataStoreProvider.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.storage.datastore.provider;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cloudstack.storage.datastore.driver.DateraPrimaryDataStoreDriver;
+import org.apache.cloudstack.storage.datastore.lifecycle.DateraPrimaryDataStoreLifeCycle;
+import org.apache.cloudstack.storage.datastore.util.DateraUtil;
+import org.springframework.stereotype.Component;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreLifeCycle;
+import org.apache.cloudstack.engine.subsystem.api.storage.HypervisorHostListener;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreDriver;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreProvider;
+
+import com.cloud.utils.component.ComponentContext;
+
+@Component
+public class DateraPrimaryDataStoreProvider implements PrimaryDataStoreProvider {
+    private DataStoreLifeCycle lifecycle;
+    private PrimaryDataStoreDriver driver;
+    private HypervisorHostListener listener;
+
+    DateraPrimaryDataStoreProvider() {
+    }
+
+    @Override
+    public String getName() {
+        return DateraUtil.PROVIDER_NAME;
+    }
+
+    @Override
+    public DataStoreLifeCycle getDataStoreLifeCycle() {
+        return lifecycle;
+    }
+
+    @Override
+    public PrimaryDataStoreDriver getDataStoreDriver() {
+        return driver;
+    }
+
+    @Override
+    public HypervisorHostListener getHostListener() {
+        return listener;
+    }
+
+    @Override
+    public boolean configure(Map<String, Object> params) {
+        lifecycle = ComponentContext.inject(DateraPrimaryDataStoreLifeCycle.class);
+        driver = ComponentContext.inject(DateraPrimaryDataStoreDriver.class);
+        listener = ComponentContext.inject(DateraHostListener.class);
+
+        return true;
+    }
+
+    @Override
+    public Set<DataStoreProviderType> getTypes() {
+        Set<DataStoreProviderType> types = new HashSet<DataStoreProviderType>();
+
+        types.add(DataStoreProviderType.PRIMARY);
+
+        return types;
+    }
+}
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/util/DateraObject.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/util/DateraObject.java
new file mode 100644
index 0000000..4d56f4b
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/util/DateraObject.java
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.util;
+
+import com.cloud.utils.StringUtils;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DateraObject {
+
+    public static final String DEFAULT_CREATE_MODE = "cloudstack";
+    public static final String DEFAULT_STORAGE_NAME = "storage-1";
+    public static final String DEFAULT_VOLUME_NAME = "volume-1";
+    public static final String DEFAULT_ACL = "deny_all";
+    public static final String DEFAULT_STORAGE_FORCE_BOOLEAN = "true";
+
+    public enum AppState {
+        ONLINE, OFFLINE;
+
+        @Override
+        public String toString() {
+            return this.name().toLowerCase();
+        }
+    }
+
+    public enum DateraOperation {
+        ADD, REMOVE;
+
+        @Override
+        public String toString() {
+            return this.name().toLowerCase();
+        }
+    }
+
+    public enum DateraErrorTypes {
+        PermissionDeniedError, InvalidRouteError, AuthFailedError, ValidationFailedError, InvalidRequestError,
+        NotFoundError, NotConnectedError, InvalidSessionKeyError, DatabaseError, InternalError,ConflictError;
+
+        public boolean equals(DateraError err) {
+            return this.name().equals(err.getName());
+        }
+    }
+
+    public static class DateraConnection {
+
+        private int managementPort;
+        private String managementIp;
+        private String username;
+        private String password;
+
+        public DateraConnection(String managementIp, int managementPort, String username, String password) {
+            this.managementPort = managementPort;
+            this.managementIp = managementIp;
+            this.username = username;
+            this.password = password;
+        }
+
+        public int getManagementPort() {
+            return managementPort;
+        }
+
+        public String getManagementIp() {
+            return managementIp;
+        }
+
+        public String getUsername() {
+            return username;
+        }
+
+        public String getPassword() {
+            return password;
+        }
+    }
+
+    public static class DateraLogin {
+
+        private final String name;
+        private final String password;
+
+        public DateraLogin(String username, String password) {
+            this.name = username;
+            this.password = password;
+        }
+    }
+
+    public static class DateraLoginResponse {
+
+        private String key;
+
+        public String getKey() {
+            return key;
+        }
+    }
+
+    public class Access {
+        private String iqn;
+        private List<String> ips;
+
+        public Access(String iqn, List<String> ips) {
+            this.iqn = iqn;
+            this.ips = ips;
+        }
+
+        public String getIqn() {
+            return iqn;
+        }
+    }
+
+    public static class PerformancePolicy {
+
+        @SerializedName("total_iops_max")
+        private Integer totalIops;
+
+        public PerformancePolicy(int totalIops) {
+            this.totalIops = totalIops;
+        }
+
+        public Integer getTotalIops() {
+            return totalIops;
+        }
+    }
+
+    public static class Volume {
+
+        private String name;
+        private String path;
+        private Integer size;
+
+        @SerializedName("replica_count")
+        private Integer replicaCount;
+
+        @SerializedName("performance_policy")
+        private PerformancePolicy performancePolicy;
+
+        @SerializedName("placement_mode")
+        private String placementMode;
+
+        @SerializedName("op_state")
+        private String opState;
+
+        public Volume(int size, int totalIops, int replicaCount) {
+            this.name = DEFAULT_VOLUME_NAME;
+            this.size = size;
+            this.replicaCount = replicaCount;
+            this.performancePolicy = new PerformancePolicy(totalIops);
+        }
+
+        public Volume(int size, int totalIops, int replicaCount, String placementMode) {
+            this.name = DEFAULT_VOLUME_NAME;
+            this.size = size;
+            this.replicaCount = replicaCount;
+            this.performancePolicy = new PerformancePolicy(totalIops);
+            this.placementMode = placementMode;
+        }
+
+        public Volume(Integer newSize) {
+            this.size = newSize;
+        }
+
+        public Volume(String newPlacementMode) {
+            this.placementMode = newPlacementMode;
+        }
+
+        public PerformancePolicy getPerformancePolicy() {
+            return performancePolicy;
+        }
+
+        public int getSize() {
+            return size;
+        }
+
+        public String getPlacementMode() {
+            return placementMode;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public String getOpState() {
+            return opState;
+        }
+    }
+
+    public static class StorageInstance {
+
+        private final String name = DEFAULT_STORAGE_NAME;
+        private Map<String, Volume> volumes;
+        private Access access;
+        private String force;
+
+        @SerializedName("ip_pool")
+        private String ipPool;
+
+        public StorageInstance(int size, int totalIops, int replicaCount) {
+            Volume volume = new Volume(size, totalIops, replicaCount);
+            volumes = new HashMap<String, Volume>();
+            volumes.put(DEFAULT_VOLUME_NAME, volume);
+        }
+
+        public StorageInstance(int size, int totalIops, int replicaCount, String placementMode, String ipPool) {
+            Volume volume = new Volume(size, totalIops, replicaCount, placementMode);
+            volumes = new HashMap<String, Volume>();
+            volumes.put(DEFAULT_VOLUME_NAME, volume);
+            this.ipPool = new StringBuilder("/access_network_ip_pools/").append(ipPool).toString();
+        }
+
+        public StorageInstance(int size, int totalIops, int replicaCount, String placementMode, String ipPool, String force) {
+            Volume volume = new Volume(size, totalIops, replicaCount, placementMode);
+            volumes = new HashMap<String, Volume>();
+            volumes.put(DEFAULT_VOLUME_NAME, volume);
+            this.ipPool = new StringBuilder("/access_network_ip_pools/").append(ipPool).toString();
+            this.force = DEFAULT_STORAGE_FORCE_BOOLEAN;
+        }
+
+        public Access getAccess() {
+            return access;
+        }
+
+        public Volume getVolume() {
+            return volumes.get(DEFAULT_VOLUME_NAME);
+        }
+
+        public int getSize() {
+            return getVolume().getSize();
+        }
+
+        public String getForce() {
+            return this.force;
+        }
+
+    }
+
+    public static class AppInstance {
+
+        private String name;
+
+        @SerializedName("access_control_mode")
+        private String accessControlMode;
+
+        @SerializedName("create_mode")
+        private String createMode;
+
+        @SerializedName("storage_instances")
+        private Map<String, StorageInstance> storageInstances;
+
+        @SerializedName("clone_src")
+        private String cloneSrc;
+
+        @SerializedName("admin_state")
+        private String adminState;
+        private Boolean force;
+
+        public AppInstance(String name, int size, int totalIops, int replicaCount) {
+            this.name = name;
+            StorageInstance storageInstance = new StorageInstance(size, totalIops, replicaCount);
+            this.storageInstances = new HashMap<String, StorageInstance>();
+            this.storageInstances.put(DEFAULT_STORAGE_NAME, storageInstance);
+            this.accessControlMode = DEFAULT_ACL;
+            this.createMode = DEFAULT_CREATE_MODE;
+        }
+
+        public AppInstance(String name, int size, int totalIops, int replicaCount, String placementMode,
+                String ipPool) {
+            this.name = name;
+            StorageInstance storageInstance = new StorageInstance(size, totalIops, replicaCount, placementMode, ipPool);
+            this.storageInstances = new HashMap<String, StorageInstance>();
+            this.storageInstances.put(DEFAULT_STORAGE_NAME, storageInstance);
+            this.accessControlMode = DEFAULT_ACL;
+            this.createMode = DEFAULT_CREATE_MODE;
+        }
+
+        public AppInstance(AppState state) {
+            this.adminState = state.toString();
+            this.force = true;
+        }
+
+        public AppInstance(String name, String cloneSrc) {
+            this.name = name;
+            this.cloneSrc = cloneSrc;
+        }
+
+        public String getIqn() {
+            StorageInstance storageInstance = storageInstances.get(DEFAULT_STORAGE_NAME);
+            return storageInstance.getAccess().getIqn();
+        }
+
+        public int getTotalIops() {
+            StorageInstance storageInstance = storageInstances.get(DEFAULT_STORAGE_NAME);
+            return storageInstance.getVolume().getPerformancePolicy().getTotalIops();
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public int getSize() {
+            StorageInstance storageInstance = storageInstances.get(DEFAULT_STORAGE_NAME);
+            return storageInstance.getSize();
+        }
+
+        public String getVolumePath() {
+            StorageInstance storageInstance = storageInstances.get(DEFAULT_STORAGE_NAME);
+            return storageInstance.getVolume().getPath();
+        }
+
+        public String getVolumeOpState() {
+            StorageInstance storageInstance = storageInstances.get(DEFAULT_STORAGE_NAME);
+            return storageInstance.getVolume().getOpState();
+        }
+    }
+
+    public static class AccessNetworkIpPool {
+        @SerializedName("ip_pool")
+        private String ipPool;
+
+        public AccessNetworkIpPool(String ipPool) {
+            this.ipPool = new StringBuilder("/access_network_ip_pools/").append(ipPool).toString();
+        }
+    }
+
+    public static class Initiator {
+
+        private String id; // IQN
+        private String name;
+        private String path;
+        private String op;
+
+        public Initiator(String name, String id) {
+            this.id = id;
+            this.name = name;
+        }
+
+        public Initiator(String path, DateraOperation op) {
+            this.path = path;
+            this.op = op.toString();
+        }
+
+        public String getPath() {
+            return path;
+        }
+    }
+
+    public static class InitiatorGroup {
+
+        private String name;
+        private List<String> members;
+        private String path;
+        private String op;
+
+        public InitiatorGroup(String name, List<String> members) {
+            this.name = name;
+            this.members = members;
+        }
+
+        public InitiatorGroup(String path, DateraOperation op) {
+            this.path = path;
+            this.op = op.toString();
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public List<String> getMembers() {
+            return members;
+        }
+    }
+
+    public static class VolumeSnapshot {
+
+        private String uuid;
+        private String timestamp;
+        private String path;
+
+        @SerializedName("op_state")
+        private String opState;
+
+        VolumeSnapshot(String uuid) {
+            this.uuid = uuid;
+        }
+
+        public String getTimestamp() {
+            return timestamp;
+        }
+
+        public String getOpState() {
+            return opState;
+        }
+
+        public String getPath() {
+            return path;
+        }
+    }
+
+    public static class VolumeSnapshotRestore {
+
+        @SerializedName("restore_point")
+        private String restorePoint;
+
+        VolumeSnapshotRestore(String restorePoint) {
+            this.restorePoint = restorePoint;
+        }
+    }
+
+    public static class DateraError extends Exception {
+
+        private String name;
+        private int code;
+        private List<String> errors;
+        private String message;
+
+        public DateraError(String name, int code, List<String> errors, String message) {
+            this.name = name;
+            this.code = code;
+            this.errors = errors;
+            this.message = message;
+        }
+
+        public List<String> getErrors() {
+            return errors;
+        }
+
+        public boolean isError() {
+            return message != null && name.endsWith("Error");
+        }
+
+        public String getMessage() {
+
+            String errMesg = name + "\n";
+            if (message != null) {
+                errMesg += message + "\n";
+            }
+
+            if (errors != null) {
+                errMesg += StringUtils.join(errors, "\n");
+
+            }
+
+            return errMesg;
+        }
+
+        public String getName() {
+            return name;
+        }
+    }
+}
diff --git a/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/util/DateraUtil.java b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/util/DateraUtil.java
new file mode 100644
index 0000000..01ff893
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/java/org/apache/cloudstack/storage/datastore/util/DateraUtil.java
@@ -0,0 +1,1028 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.datastore.util;
+
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.utils.StringUtils;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolDetailsDao;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.UUID;
+
+public class DateraUtil {
+
+    private static final Logger s_logger = Logger.getLogger(DateraUtil.class);
+    private static final String API_VERSION = "v2";
+
+    public static final String PROVIDER_NAME = "Datera";
+    public static final String DRIVER_VERSION = "4.13.0-v2.0.0";
+
+    private static final String HEADER_AUTH_TOKEN = "auth-token";
+    private static final String HEADER_CONTENT_TYPE = "Content-type";
+    private static final String HEADER_VALUE_JSON = "application/json";
+
+    public static final String MANAGEMENT_VIP = "mVip";
+    public static final String STORAGE_VIP = "sVip";
+
+    public static final String MANAGEMENT_PORT = "mPort";
+    public static final String STORAGE_PORT = "sPort";
+    public static final String DEFAULT_IP_POOL = "default";
+
+    private static final int DEFAULT_MANAGEMENT_PORT = 7717;
+    private static final int DEFAULT_STORAGE_PORT = 3260;
+    private static final int DEFAULT_NUM_REPLICAS = 3;
+
+    private static final long ONEGIB_BYTES = 1073741824;
+
+    private static final String DEFAULT_VOL_PLACEMENT = "hybrid";
+
+    public static final String CLUSTER_ADMIN_USERNAME = "clusterAdminUsername";
+    public static final String CLUSTER_ADMIN_PASSWORD = "clusterAdminPassword";
+
+    public static final String CLUSTER_DEFAULT_MIN_IOPS = "clusterDefaultMinIops";
+    public static final String CLUSTER_DEFAULT_MAX_IOPS = "clusterDefaultMaxIops";
+    public static final String NUM_REPLICAS = "numReplicas";
+    public static final String VOL_PLACEMENT = "volPlacement";
+
+    public static final String STORAGE_POOL_ID = "DateraStoragePoolId";
+    public static final String VOLUME_SIZE = "DateraVolumeSize";
+    public static final String VOLUME_ID = "DateraVolumeId";
+    public static final String SNAPSHOT_ID = "DateraSnapshotId";
+    public static final String TEMP_VOLUME_ID = "tempVolumeId";
+    public static final String IP_POOL = "ipPool";
+
+    public static final int MAX_IOPS = 10000; // max IOPS that can be assigned to a volume
+
+    public static final String INITIATOR_GROUP_PREFIX = "CS-InitiatorGroup";
+    public static final String INITIATOR_PREFIX = "CS-Initiator";
+    public static final String APPINSTANCE_PREFIX = "CS";
+    public static final int APPINSTANCE_MAX_LENTH = 64;
+
+    public static final int MIN_NUM_REPLICAS = 1;
+    public static final int MAX_NUM_REPLICAS = 5;
+
+    public static final int POLL_TIMEOUT_MS = 3000;
+    public static final String STATE_AVAILABLE = "available";
+    public static final int DEFAULT_RETRIES = 10;
+
+    private static Gson gson = new GsonBuilder().create();
+
+    private int managementPort;
+    private String managementIp;
+    private String username;
+    private String password;
+
+    private static final String SCHEME_HTTP = "http";
+    private static final int UUID_LENGTH = 8;
+
+    public DateraUtil(String managementIp, int managementPort, String username, String password) {
+        this.managementPort = managementPort;
+        this.managementIp = managementIp;
+        this.username = username;
+        this.password = password;
+    }
+
+    public static String login(DateraObject.DateraConnection conn)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        DateraObject.DateraLogin loginParams = new DateraObject.DateraLogin(conn.getUsername(), conn.getPassword());
+        HttpPut loginReq = new HttpPut(generateApiUrl("login"));
+
+        StringEntity jsonParams = new StringEntity(gson.toJson(loginParams));
+        loginReq.setEntity(jsonParams);
+
+        String response = executeHttp(conn, loginReq);
+        DateraObject.DateraLoginResponse loginResponse = gson.fromJson(response,
+                DateraObject.DateraLoginResponse.class);
+
+        return loginResponse.getKey();
+
+    }
+
+    public static Map<String, DateraObject.AppInstance> getAppInstances(DateraObject.DateraConnection conn)
+            throws DateraObject.DateraError {
+
+        HttpGet getAppInstancesReq = new HttpGet(generateApiUrl("app_instances"));
+        String response = null;
+
+        response = executeApiRequest(conn, getAppInstancesReq);
+
+        Type responseType = new TypeToken<Map<String, DateraObject.AppInstance>>() {
+        }.getType();
+
+        return gson.fromJson(response, responseType);
+    }
+
+    public static DateraObject.AppInstance getAppInstance(DateraObject.DateraConnection conn, String name)
+            throws DateraObject.DateraError {
+
+        HttpGet url = new HttpGet(generateApiUrl("app_instances", name));
+
+        String response = null;
+        try {
+            response = executeApiRequest(conn, url);
+            return gson.fromJson(response, DateraObject.AppInstance.class);
+        } catch (DateraObject.DateraError dateraError) {
+            if (DateraObject.DateraErrorTypes.NotFoundError.equals(dateraError)) {
+                return null;
+            } else {
+                throw dateraError;
+            }
+        }
+    }
+
+    public static DateraObject.PerformancePolicy getAppInstancePerformancePolicy(DateraObject.DateraConnection conn,
+            String appInstanceName) throws DateraObject.DateraError {
+
+        HttpGet url = new HttpGet(generateApiUrl("app_instances", appInstanceName, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "volumes", DateraObject.DEFAULT_VOLUME_NAME, "performance_policy"));
+
+        try {
+            String response = executeApiRequest(conn, url);
+            return gson.fromJson(response, DateraObject.PerformancePolicy.class);
+        } catch (DateraObject.DateraError dateraError) {
+            if (DateraObject.DateraErrorTypes.NotFoundError.equals(dateraError)) {
+                return null;
+            } else {
+                throw dateraError;
+            }
+        }
+
+    }
+
+    public static DateraObject.PerformancePolicy createAppInstancePerformancePolicy(DateraObject.DateraConnection conn,
+            String appInstanceName, int totalIops) throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        HttpPost url = new HttpPost(generateApiUrl("app_instances", appInstanceName, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "volumes", DateraObject.DEFAULT_VOLUME_NAME, "performance_policy"));
+
+        DateraObject.PerformancePolicy performancePolicy = new DateraObject.PerformancePolicy(totalIops);
+
+        url.setEntity(new StringEntity(gson.toJson(performancePolicy)));
+
+        String response = executeApiRequest(conn, url);
+
+        return gson.fromJson(response, DateraObject.PerformancePolicy.class);
+    }
+
+    public static void updateAppInstanceIops(DateraObject.DateraConnection conn, String appInstance, int totalIops)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        if (getAppInstancePerformancePolicy(conn, appInstance) == null) {
+            createAppInstancePerformancePolicy(conn, appInstance, totalIops);
+        } else {
+
+            HttpPut url = new HttpPut(
+                    generateApiUrl("app_instances", appInstance, "storage_instances", DateraObject.DEFAULT_STORAGE_NAME,
+                            "volumes", DateraObject.DEFAULT_VOLUME_NAME, "performance_policy"));
+
+            DateraObject.PerformancePolicy performancePolicy = new DateraObject.PerformancePolicy(totalIops);
+
+            url.setEntity(new StringEntity(gson.toJson(performancePolicy)));
+            executeApiRequest(conn, url);
+        }
+    }
+
+    public static void updateAppInstanceSize(DateraObject.DateraConnection conn, String appInstanceName, int newSize)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        HttpPut url = new HttpPut(generateApiUrl("app_instances", appInstanceName, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "volumes", DateraObject.DEFAULT_VOLUME_NAME));
+
+        DateraObject.Volume volume = new DateraObject.Volume(newSize);
+        url.setEntity(new StringEntity(gson.toJson(volume)));
+        executeApiRequest(conn, url);
+
+    }
+
+    public static void updateAppInstancePlacement(DateraObject.DateraConnection conn, String appInstanceName,
+            String newPlacementMode) throws UnsupportedEncodingException, DateraObject.DateraError {
+        HttpPut url = new HttpPut(generateApiUrl("app_instances", appInstanceName, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "volumes", DateraObject.DEFAULT_VOLUME_NAME));
+
+        DateraObject.Volume volume = new DateraObject.Volume(newPlacementMode);
+        url.setEntity(new StringEntity(gson.toJson(volume)));
+        executeApiRequest(conn, url);
+
+    }
+
+    private static DateraObject.AppInstance createAppInstance(DateraObject.DateraConnection conn, String name,
+            StringEntity appInstanceEntity) throws DateraObject.DateraError {
+
+        HttpPost createAppInstance = new HttpPost(generateApiUrl("app_instances"));
+        HttpGet getAppInstance = new HttpGet(generateApiUrl("app_instances", name));
+        createAppInstance.setEntity(appInstanceEntity);
+        String response = null;
+
+        executeApiRequest(conn, createAppInstance);
+
+        // create is async, do a get to fetch the IQN
+        executeApiRequest(conn, getAppInstance);
+
+        return pollAppInstanceAvailable(conn, name);
+    }
+
+    public static DateraObject.AppInstance createAppInstance(DateraObject.DateraConnection conn, String name, int size,
+            int totalIops, int replicaCount) throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        DateraObject.AppInstance appInstance = new DateraObject.AppInstance(name, size, totalIops, replicaCount,
+                DEFAULT_VOL_PLACEMENT, DEFAULT_IP_POOL);
+        StringEntity appInstanceEntity = new StringEntity(gson.toJson(appInstance));
+
+        return createAppInstance(conn, name, appInstanceEntity);
+    }
+
+    public static DateraObject.AppInstance createAppInstance(DateraObject.DateraConnection conn, String name, int size,
+            int totalIops, int replicaCount, String placementMode, String ipPool)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        DateraObject.AppInstance appInstance = new DateraObject.AppInstance(name, size, totalIops, replicaCount,
+                placementMode, ipPool);
+        StringEntity appInstanceEntity = new StringEntity(gson.toJson(appInstance));
+
+        return createAppInstance(conn, name, appInstanceEntity);
+    }
+
+    public static DateraObject.AppInstance cloneAppInstanceFromVolume(DateraObject.DateraConnection conn, String name,
+            String srcCloneName) throws UnsupportedEncodingException, DateraObject.DateraError {
+        return null;
+    }
+
+    public static DateraObject.AppInstance cloneAppInstanceFromVolume(DateraObject.DateraConnection conn, String name,
+            String srcCloneName, String ipPool) throws UnsupportedEncodingException, DateraObject.DateraError {
+        s_logger.debug("cloneAppInstanceFromVolume() called");
+        DateraObject.AppInstance srcAppInstance = getAppInstance(conn, srcCloneName);
+
+        if (srcAppInstance == null) {
+            throw new DateraObject.DateraError("NotFoundError", 404, null,
+                    "Unable to find the base app instance to clone from");
+        }
+
+        String srcClonePath = srcAppInstance.getVolumePath();
+
+        DateraObject.AppInstance appInstanceObj = new DateraObject.AppInstance(name, srcClonePath);
+
+        StringEntity appInstanceEntity = new StringEntity(gson.toJson(appInstanceObj));
+        DateraObject.AppInstance appInstance = createAppInstance(conn, name, appInstanceEntity);
+
+        // Update ipPool
+        updateAppInstanceIpPool(conn, name, ipPool);
+
+        // bring it online
+        updateAppInstanceAdminState(conn, name, DateraObject.AppState.ONLINE);
+
+        return getAppInstance(conn, name);
+    }
+
+    public static DateraObject.AppInstance pollAppInstanceAvailable(DateraObject.DateraConnection conn,
+            String appInstanceName) throws DateraObject.DateraError {
+
+        int retries = DateraUtil.DEFAULT_RETRIES;
+        DateraObject.AppInstance appInstance = null;
+        do {
+            appInstance = getAppInstance(conn, appInstanceName);
+            try {
+                Thread.sleep(DateraUtil.POLL_TIMEOUT_MS);
+            } catch (InterruptedException e) {
+                return null;
+            }
+            retries--;
+        } while ((appInstance != null && !Objects.equals(appInstance.getVolumeOpState(), DateraUtil.STATE_AVAILABLE))
+                && retries > 0);
+        return appInstance;
+    }
+
+    public static DateraObject.Initiator createInitiator(DateraObject.DateraConnection conn, String name, String iqn)
+            throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        HttpPost req = new HttpPost(generateApiUrl("initiators"));
+
+        DateraObject.Initiator initiator = new DateraObject.Initiator(name, iqn);
+        StringEntity httpEntity = new StringEntity(gson.toJson(initiator));
+        req.setEntity(httpEntity);
+
+        return gson.fromJson(executeApiRequest(conn, req), DateraObject.Initiator.class);
+    }
+
+    public static DateraObject.Initiator getInitiator(DateraObject.DateraConnection conn, String iqn)
+            throws DateraObject.DateraError {
+
+        try {
+            HttpGet getReq = new HttpGet(generateApiUrl("initiators", iqn));
+            String response = executeApiRequest(conn, getReq);
+            return gson.fromJson(response, DateraObject.Initiator.class);
+        } catch (DateraObject.DateraError dateraError) {
+            if (DateraObject.DateraErrorTypes.NotFoundError.equals(dateraError)) {
+                return null;
+            } else {
+                throw dateraError;
+            }
+        }
+    }
+
+    public static void deleteInitiator(DateraObject.DateraConnection conn, String iqn) throws DateraObject.DateraError {
+
+        HttpDelete req = new HttpDelete(generateApiUrl("initiators", iqn));
+        executeApiRequest(conn, req);
+    }
+
+    public static DateraObject.InitiatorGroup createInitiatorGroup(DateraObject.DateraConnection conn, String name)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        HttpPost createReq = new HttpPost(generateApiUrl("initiator_groups"));
+
+        DateraObject.InitiatorGroup group = new DateraObject.InitiatorGroup(name, Collections.<String>emptyList());
+
+        StringEntity httpEntity = new StringEntity(gson.toJson(group));
+        createReq.setEntity(httpEntity);
+
+        String response = executeApiRequest(conn, createReq);
+        return gson.fromJson(response, DateraObject.InitiatorGroup.class);
+    }
+
+    public static void deleteInitatorGroup(DateraObject.DateraConnection conn, String name)
+            throws DateraObject.DateraError {
+        HttpDelete delReq = new HttpDelete(generateApiUrl("initiator_groups", name));
+        executeApiRequest(conn, delReq);
+    }
+
+    public static DateraObject.InitiatorGroup getInitiatorGroup(DateraObject.DateraConnection conn, String name)
+            throws DateraObject.DateraError {
+        try {
+            HttpGet getReq = new HttpGet(generateApiUrl("initiator_groups", name));
+            String response = executeApiRequest(conn, getReq);
+            return gson.fromJson(response, DateraObject.InitiatorGroup.class);
+
+        } catch (DateraObject.DateraError dateraError) {
+            if (DateraObject.DateraErrorTypes.NotFoundError.equals(dateraError)) {
+                return null;
+            } else {
+                throw dateraError;
+            }
+        }
+    }
+
+    public static void updateInitiatorGroup(DateraObject.DateraConnection conn, String initiatorPath, String groupName,
+            DateraObject.DateraOperation op) throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        DateraObject.InitiatorGroup initiatorGroup = getInitiatorGroup(conn, groupName);
+
+        if (initiatorGroup == null) {
+            throw new CloudRuntimeException("Unable to find initiator group by name " + groupName);
+        }
+
+        HttpPut addReq = new HttpPut(generateApiUrl("initiator_groups", groupName, "members"));
+
+        DateraObject.Initiator initiator = new DateraObject.Initiator(initiatorPath, op);
+
+        addReq.setEntity(new StringEntity(gson.toJson(initiator)));
+        executeApiRequest(conn, addReq);
+    }
+
+    public static void addInitiatorToGroup(DateraObject.DateraConnection conn, String initiatorPath, String groupName)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+        updateInitiatorGroup(conn, initiatorPath, groupName, DateraObject.DateraOperation.ADD);
+    }
+
+    public static void removeInitiatorFromGroup(DateraObject.DateraConnection conn, String initiatorPath,
+            String groupName) throws DateraObject.DateraError, UnsupportedEncodingException {
+        updateInitiatorGroup(conn, initiatorPath, groupName, DateraObject.DateraOperation.REMOVE);
+    }
+
+    public static Map<String, DateraObject.InitiatorGroup> getAppInstanceInitiatorGroups(
+            DateraObject.DateraConnection conn, String appInstance) throws DateraObject.DateraError {
+        HttpGet req = new HttpGet(generateApiUrl("app_instances", appInstance, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "acl_policy", "initiator_groups"));
+
+        String response = executeApiRequest(conn, req);
+
+        if (response == null) {
+            return null;
+        }
+
+        Type responseType = new TypeToken<Map<String, DateraObject.InitiatorGroup>>() {
+        }.getType();
+
+        return gson.fromJson(response, responseType);
+    }
+
+    public static void assignGroupToAppInstance(DateraObject.DateraConnection conn, String group, String appInstance)
+            throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        DateraObject.InitiatorGroup initiatorGroup = getInitiatorGroup(conn, group);
+
+        if (initiatorGroup == null) {
+            throw new CloudRuntimeException("Initator group " + group + " not found ");
+        }
+
+        Map<String, DateraObject.InitiatorGroup> initiatorGroups = getAppInstanceInitiatorGroups(conn, appInstance);
+
+        if (initiatorGroups == null) {
+            throw new CloudRuntimeException("Initator group not found for appInstnace " + appInstance);
+        }
+
+        for (DateraObject.InitiatorGroup ig : initiatorGroups.values()) {
+            if (ig.getName().equals(group)) {
+                // already assigned
+                return;
+            }
+        }
+
+        HttpPut url = new HttpPut(generateApiUrl("app_instances", appInstance, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "acl_policy", "initiator_groups"));
+
+        url.setEntity(new StringEntity(gson
+                .toJson(new DateraObject.InitiatorGroup(initiatorGroup.getPath(), DateraObject.DateraOperation.ADD))));
+
+        executeApiRequest(conn, url);
+    }
+
+    public static void removeGroupFromAppInstance(DateraObject.DateraConnection conn, String group, String appInstance)
+            throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        DateraObject.InitiatorGroup initiatorGroup = getInitiatorGroup(conn, group);
+
+        if (initiatorGroup == null) {
+            throw new CloudRuntimeException("Initator groups not found for appInstnace " + appInstance);
+        }
+
+        Map<String, DateraObject.InitiatorGroup> initiatorGroups = getAppInstanceInitiatorGroups(conn, appInstance);
+
+        if (initiatorGroups == null) {
+            throw new CloudRuntimeException("Initator group not found for appInstnace " + appInstance);
+        }
+
+        boolean groupAssigned = false;
+
+        for (DateraObject.InitiatorGroup ig : initiatorGroups.values()) {
+            if (ig.getName().equals(group)) {
+                groupAssigned = true;
+                break;
+            }
+        }
+
+        if (!groupAssigned) {
+            return; // already removed
+        }
+
+        HttpPut url = new HttpPut(generateApiUrl("app_instances", appInstance, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "acl_policy", "initiator_groups"));
+
+        url.setEntity(new StringEntity(gson.toJson(
+                new DateraObject.InitiatorGroup(initiatorGroup.getPath(), DateraObject.DateraOperation.REMOVE))));
+
+        executeApiRequest(conn, url);
+    }
+
+    public static void updateAppInstanceAdminState(DateraObject.DateraConnection conn, String appInstanceName,
+            DateraObject.AppState appState) throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        DateraObject.AppInstance appInstance = new DateraObject.AppInstance(appState);
+        HttpPut updateAppInstanceReq = new HttpPut(generateApiUrl("app_instances", appInstanceName));
+
+        updateAppInstanceReq.setEntity(new StringEntity(gson.toJson(appInstance)));
+        executeApiRequest(conn, updateAppInstanceReq);
+    }
+
+    public static void updateAppInstanceIpPool(DateraObject.DateraConnection conn, String appInstanceName,
+            String ipPool) throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        HttpPut url = new HttpPut(generateApiUrl("app_instances", appInstanceName, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME));
+
+        url.setEntity(new StringEntity(gson.toJson(new DateraObject.AccessNetworkIpPool(ipPool))));
+
+        executeApiRequest(conn, url);
+    }
+
+    public static void deleteAppInstance(DateraObject.DateraConnection conn, String name)
+            throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        HttpDelete deleteAppInstanceReq = new HttpDelete(generateApiUrl("app_instances", name));
+        updateAppInstanceAdminState(conn, name, DateraObject.AppState.OFFLINE);
+        executeApiRequest(conn, deleteAppInstanceReq);
+    }
+
+    public static DateraObject.AppInstance cloneAppInstanceFromSnapshot(DateraObject.DateraConnection conn,
+            String newAppInstanceName, String snapshotName)
+            throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        return cloneAppInstanceFromSnapshot(conn, newAppInstanceName, snapshotName, DEFAULT_IP_POOL);
+    }
+
+    public static DateraObject.AppInstance cloneAppInstanceFromSnapshot(DateraObject.DateraConnection conn,
+            String newAppInstanceName, String snapshotName, String ipPool)
+            throws DateraObject.DateraError, UnsupportedEncodingException {
+
+        // split the snapshot name to appInstanceName and the snapshot timestamp
+        String[] tokens = snapshotName.split(":");
+        Preconditions.checkArgument(tokens.length == 2);
+
+        // A snapshot is stored in Cloudstack as <AppInstanceName>:<SnapshotTime>
+        String appInstanceName = tokens[0];
+        String snapshotTime = tokens[1];
+
+        // get the snapshot from Datera
+        HttpGet getSnasphotReq = new HttpGet(
+                generateApiUrl("app_instances", appInstanceName, "storage_instances", DateraObject.DEFAULT_STORAGE_NAME,
+                        "volumes", DateraObject.DEFAULT_VOLUME_NAME, "snapshots", snapshotTime));
+
+        String resp = executeApiRequest(conn, getSnasphotReq);
+
+        DateraObject.VolumeSnapshot snapshot = gson.fromJson(resp, DateraObject.VolumeSnapshot.class);
+
+        String snapshotPath = snapshot.getPath();
+
+        DateraObject.AppInstance appInstanceObj = new DateraObject.AppInstance(newAppInstanceName, snapshotPath);
+
+        StringEntity appInstanceEntity = new StringEntity(gson.toJson(appInstanceObj));
+
+        DateraObject.AppInstance appInstance = createAppInstance(conn, newAppInstanceName, appInstanceEntity);
+
+        // Update ipPool
+        updateAppInstanceIpPool(conn, newAppInstanceName, ipPool);
+
+        // bring it online
+        updateAppInstanceAdminState(conn, newAppInstanceName, DateraObject.AppState.ONLINE);
+
+        return getAppInstance(conn, newAppInstanceName);
+    }
+
+    public static void deleteVolumeSnapshot(DateraObject.DateraConnection conn, String snapshotName)
+            throws DateraObject.DateraError {
+
+        // split the snapshot name to appInstanceName and the snapshot timestamp
+        String[] tokens = snapshotName.split(":");
+        Preconditions.checkArgument(tokens.length == 2);
+
+        // A snapshot is stored in Cloudstack as <AppInstanceName>:<SnapshotTime>
+        String appInstanceName = tokens[0];
+        String snapshotTime = tokens[1];
+
+        HttpDelete deleteSnapshotReq = new HttpDelete(
+                generateApiUrl("app_instances", appInstanceName, "storage_instances", DateraObject.DEFAULT_STORAGE_NAME,
+                        "volumes", DateraObject.DEFAULT_VOLUME_NAME, "snapshots", snapshotTime));
+
+        executeApiRequest(conn, deleteSnapshotReq);
+    }
+
+    public static DateraObject.VolumeSnapshot getVolumeSnapshot(DateraObject.DateraConnection conn,
+            String appInstanceName, String snapshotTime) throws DateraObject.DateraError {
+
+        HttpGet getSnapshotReq = new HttpGet(
+                generateApiUrl("app_instances", appInstanceName, "storage_instances", DateraObject.DEFAULT_STORAGE_NAME,
+                        "volumes", DateraObject.DEFAULT_VOLUME_NAME, "snapshots", snapshotTime));
+
+        String resp = executeApiRequest(conn, getSnapshotReq);
+        return gson.fromJson(resp, DateraObject.VolumeSnapshot.class);
+    }
+
+    public static DateraObject.VolumeSnapshot takeVolumeSnapshot(DateraObject.DateraConnection conn,
+            String baseAppInstanceName) throws UnsupportedEncodingException, DateraObject.DateraError {
+
+        HttpPost takeSnasphotReq = new HttpPost(
+                generateApiUrl("app_instances", baseAppInstanceName, "storage_instances",
+                        DateraObject.DEFAULT_STORAGE_NAME, "volumes", DateraObject.DEFAULT_VOLUME_NAME, "snapshots"));
+
+        String snapshotUuid = UUID.randomUUID().toString();
+        DateraObject.VolumeSnapshot volumeSnapshot = new DateraObject.VolumeSnapshot(snapshotUuid);
+        takeSnasphotReq.setEntity(new StringEntity(gson.toJson(volumeSnapshot)));
+        String snapshotResponse = executeApiRequest(conn, takeSnasphotReq);
+        volumeSnapshot = gson.fromJson(snapshotResponse, DateraObject.VolumeSnapshot.class);
+        String snapshotTime = volumeSnapshot.getTimestamp();
+
+        int retries = DateraUtil.DEFAULT_RETRIES;
+
+        do {
+            try {
+                Thread.sleep(DateraUtil.POLL_TIMEOUT_MS);
+            } catch (InterruptedException e) {
+                return null;
+            }
+            volumeSnapshot = getVolumeSnapshot(conn, baseAppInstanceName, snapshotTime);
+        } while ((!Objects.equals(volumeSnapshot.getOpState(), DateraUtil.STATE_AVAILABLE)) && --retries > 0);
+
+        return volumeSnapshot;
+    }
+
+    public static DateraObject.AppInstance restoreVolumeSnapshot(DateraObject.DateraConnection conn,
+            String snapshotName) throws DateraObject.DateraError {
+
+        // split the snapshot name to appInstanceName and the snapshot timestamp
+        String[] tokens = snapshotName.split(":");
+        Preconditions.checkArgument(tokens.length == 2);
+
+        // A snapshot is stored in Cloudstack as <AppInstanceName>:<SnapshotTime>
+        String appInstanceName = tokens[0];
+        String snapshotTime = tokens[1];
+
+        HttpPut restoreSnapshotReq = new HttpPut(generateApiUrl("app_instances", appInstanceName, "storage_instances",
+                DateraObject.DEFAULT_STORAGE_NAME, "volumes", DateraObject.DEFAULT_VOLUME_NAME));
+
+        try {
+            // bring appInstance offline
+            updateAppInstanceAdminState(conn, appInstanceName, DateraObject.AppState.OFFLINE);
+
+            DateraObject.VolumeSnapshotRestore volumeSnapshotRestore = new DateraObject.VolumeSnapshotRestore(
+                    snapshotTime);
+
+            StringEntity jsonParams = new StringEntity(gson.toJson(volumeSnapshotRestore));
+            restoreSnapshotReq.setEntity(jsonParams);
+            executeApiRequest(conn, restoreSnapshotReq);
+            // bring appInstance online
+            updateAppInstanceAdminState(conn, appInstanceName, DateraObject.AppState.ONLINE);
+
+        } catch (UnsupportedEncodingException e) {
+            throw new CloudRuntimeException("Failed to restore volume snapshot" + e.getMessage());
+        }
+        return getAppInstance(conn, appInstanceName);
+
+    }
+
+    private static String executeApiRequest(DateraObject.DateraConnection conn, HttpRequest apiReq)
+            throws DateraObject.DateraError {
+
+        // Get the token first
+        String authToken = null;
+        try {
+            authToken = login(conn);
+        } catch (UnsupportedEncodingException e) {
+            throw new CloudRuntimeException("Unable to login to Datera " + e.getMessage());
+        }
+
+        if (authToken == null) {
+            throw new CloudRuntimeException("Unable to login to Datera: error getting auth token ");
+        }
+
+        apiReq.addHeader(HEADER_AUTH_TOKEN, authToken);
+
+        return executeHttp(conn, apiReq);
+    }
+
+    private static String executeHttp(DateraObject.DateraConnection conn, HttpRequest request)
+            throws DateraObject.DateraError {
+        CloseableHttpClient httpclient = HttpClientBuilder.create().build();
+        String response = null;
+
+        if (null == httpclient) {
+            throw new CloudRuntimeException("Unable to create httpClient for request");
+        }
+
+        try {
+
+            request.setHeader(HEADER_CONTENT_TYPE, HEADER_VALUE_JSON);
+
+            HttpHost target = new HttpHost(conn.getManagementIp(), conn.getManagementPort(), SCHEME_HTTP);
+
+            HttpResponse httpResponse = httpclient.execute(target, request);
+
+            HttpEntity entity = httpResponse.getEntity();
+            StatusLine status = httpResponse.getStatusLine();
+            response = EntityUtils.toString(entity);
+
+            assert response != null;
+
+            if (status.getStatusCode() != HttpStatus.SC_OK) {
+                // check if this is an error
+                DateraObject.DateraError error = gson.fromJson(response, DateraObject.DateraError.class);
+                if (error != null && error.isError()) {
+                    throw error;
+                } else {
+                    throw new CloudRuntimeException("Error while trying to get HTTP object from Datera");
+                }
+
+            }
+
+        } catch (IOException e) {
+            throw new CloudRuntimeException("Error while sending request to Datera. Error " + e.getMessage());
+        }
+
+        return response;
+    }
+
+    protected static String generateApiUrl(String... args) {
+        ArrayList<String> urlList = new ArrayList<String>(Arrays.asList(args));
+
+        urlList.add(0, API_VERSION);
+        urlList.add(0, "");
+
+        return StringUtils.join(urlList, "/");
+    }
+
+    public static String getManagementVip(String url) {
+        return getVip(DateraUtil.MANAGEMENT_VIP, url);
+    }
+
+    public static String getStorageVip(String url) {
+        return getVip(DateraUtil.STORAGE_VIP, url);
+    }
+
+    public static int getManagementPort(String url) {
+        return getPort(DateraUtil.MANAGEMENT_VIP, url, DEFAULT_MANAGEMENT_PORT);
+    }
+
+    public static int getStoragePort(String url) {
+        return getPort(DateraUtil.STORAGE_VIP, url, DEFAULT_STORAGE_PORT);
+    }
+
+    public static int getNumReplicas(String url) {
+        try {
+            String value = getValue(DateraUtil.NUM_REPLICAS, url, false);
+            return Integer.parseInt(value);
+        } catch (NumberFormatException ex) {
+            return DEFAULT_NUM_REPLICAS;
+        }
+
+    }
+
+    public static String getVolPlacement(String url) {
+        String volPlacement = getValue(DateraUtil.VOL_PLACEMENT, url, false);
+        if (volPlacement == null) {
+            return DEFAULT_VOL_PLACEMENT;
+        } else {
+            return volPlacement;
+        }
+    }
+
+    public static String getIpPool(String url) {
+        String ipPool = getValue(DateraUtil.IP_POOL, url, false);
+        if (ipPool == null) {
+            return DEFAULT_IP_POOL;
+        } else {
+            return ipPool;
+        }
+    }
+
+    private static String getVip(String keyToMatch, String url) {
+        String delimiter = ":";
+
+        String storageVip = getValue(keyToMatch, url);
+
+        int index = storageVip.indexOf(delimiter);
+
+        if (index != -1) {
+            return storageVip.substring(0, index);
+        }
+
+        return storageVip;
+    }
+
+    private static int getPort(String keyToMatch, String url, int defaultPortNumber) {
+        String delimiter = ":";
+
+        String storageVip = getValue(keyToMatch, url);
+
+        int index = storageVip.indexOf(delimiter);
+
+        int portNumber = defaultPortNumber;
+
+        if (index != -1) {
+            String port = storageVip.substring(index + delimiter.length());
+
+            try {
+                portNumber = Integer.parseInt(port);
+            } catch (NumberFormatException ex) {
+                throw new IllegalArgumentException("Invalid URL format (port is not an integer)");
+            }
+        }
+
+        return portNumber;
+    }
+
+    public static String getValue(String keyToMatch, String url) {
+        return getValue(keyToMatch, url, true);
+    }
+
+    public static String getValue(String keyToMatch, String url, boolean throwExceptionIfNotFound) {
+        String delimiter1 = ";";
+        String delimiter2 = "=";
+
+        StringTokenizer st = new StringTokenizer(url, delimiter1);
+
+        while (st.hasMoreElements()) {
+            String token = st.nextElement().toString();
+
+            int index = token.indexOf(delimiter2);
+
+            if (index == -1) {
+                throw new CloudRuntimeException("Invalid URL format");
+            }
+
+            String key = token.substring(0, index);
+
+            if (key.equalsIgnoreCase(keyToMatch)) {
+                return token.substring(index + delimiter2.length());
+            }
+        }
+
+        if (throwExceptionIfNotFound) {
+            throw new CloudRuntimeException("Key not found in URL");
+        }
+
+        return null;
+    }
+
+    public static String getModifiedUrl(String originalUrl) {
+        StringBuilder sb = new StringBuilder();
+
+        String delimiter = ";";
+
+        StringTokenizer st = new StringTokenizer(originalUrl, delimiter);
+
+        while (st.hasMoreElements()) {
+            String token = st.nextElement().toString().toUpperCase();
+
+            if (token.startsWith(DateraUtil.MANAGEMENT_VIP.toUpperCase())
+                    || token.startsWith(DateraUtil.STORAGE_VIP.toUpperCase())) {
+                sb.append(token).append(delimiter);
+            }
+        }
+
+        String modifiedUrl = sb.toString();
+        int lastIndexOf = modifiedUrl.lastIndexOf(delimiter);
+
+        if (lastIndexOf == (modifiedUrl.length() - delimiter.length())) {
+            return modifiedUrl.substring(0, lastIndexOf);
+        }
+
+        return modifiedUrl;
+    }
+
+    public static DateraObject.DateraConnection getDateraConnection(long storagePoolId,
+            StoragePoolDetailsDao storagePoolDetailsDao) {
+        StoragePoolDetailVO storagePoolDetail = storagePoolDetailsDao.findDetail(storagePoolId,
+                DateraUtil.MANAGEMENT_VIP);
+
+        String mVip = storagePoolDetail.getValue();
+
+        storagePoolDetail = storagePoolDetailsDao.findDetail(storagePoolId, DateraUtil.MANAGEMENT_PORT);
+
+        int mPort = Integer.parseInt(storagePoolDetail.getValue());
+
+        storagePoolDetail = storagePoolDetailsDao.findDetail(storagePoolId, DateraUtil.CLUSTER_ADMIN_USERNAME);
+
+        String clusterAdminUsername = storagePoolDetail.getValue();
+
+        storagePoolDetail = storagePoolDetailsDao.findDetail(storagePoolId, DateraUtil.CLUSTER_ADMIN_PASSWORD);
+
+        String clusterAdminPassword = storagePoolDetail.getValue();
+
+        return new DateraObject.DateraConnection(mVip, mPort, clusterAdminUsername, clusterAdminPassword);
+    }
+
+    public static boolean hostsSupport_iScsi(List<HostVO> hosts) {
+        if (hosts == null || hosts.size() == 0) {
+            return false;
+        }
+
+        for (Host host : hosts) {
+            if (!hostSupport_iScsi(host)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public static boolean hostSupport_iScsi(Host host) {
+        if (host == null || host.getStorageUrl() == null || host.getStorageUrl().trim().length() == 0
+                || !host.getStorageUrl().startsWith("iqn")) {
+            return false;
+        }
+        return true;
+    }
+
+    public static String getInitiatorGroupKey(long storagePoolId) {
+        return "DateraInitiatorGroup-" + storagePoolId;
+    }
+
+    /**
+     * Checks wether a host initiator is present in an initiator group
+     *
+     * @param initiator      Host initiator to check
+     * @param initiatorGroup the initiator group
+     * @return true if host initiator is in the group, false otherwise
+     */
+    public static boolean isInitiatorPresentInGroup(DateraObject.Initiator initiator,
+            DateraObject.InitiatorGroup initiatorGroup) {
+
+        for (String memberPath : initiatorGroup.getMembers()) {
+            if (memberPath.equals(initiator.getPath())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public static int bytesToGib(long volumeSizeBytes) {
+        return (int) Math.ceil(volumeSizeBytes / (double) ONEGIB_BYTES);
+    }
+
+    public static long gibToBytes(int volumeSizeGb) {
+        return volumeSizeGb * ONEGIB_BYTES;
+    }
+
+    /**
+     * IQN path is stored in the DB by cloudstack it is of the form /<IQN>/0
+     *
+     * @param iqn: IQN of the LUN
+     * @return IQN path as defined above
+     */
+    public static String generateIqnPath(String iqn) {
+        if (iqn != null) {
+            return "/" + iqn + "/0";
+        }
+        return null;
+    }
+
+    /**
+     * Does the opposite of generateIqnPath
+     *
+     * @param iqnPath
+     * @return timmed IQN path
+     */
+
+    public static String extractIqn(String iqnPath) {
+
+        if (iqnPath == null) {
+            return null;
+        }
+
+        if (iqnPath.endsWith("/")) {
+            iqnPath = iqnPath.substring(0, iqnPath.length() - 1);
+        }
+
+        final String tokens[] = iqnPath.split("/");
+        if (tokens.length != 3) {
+            final String msg = "Wrong iscsi path " + iqnPath + " it should be /targetIQN/LUN";
+            s_logger.warn(msg);
+            return null;
+        }
+
+        return tokens[1].trim();
+    }
+
+    /**
+     * Generate random uuid
+     *
+     * @param seed
+     * @param length ( default to 8 )
+     * @return String uuid
+     */
+    public static String generateUUID(String seed) {
+        int length = UUID_LENGTH;
+        // creating UUID
+        UUID uid = UUID.fromString(seed);
+        String uuid = String.valueOf(uid.randomUUID()).substring(0, length);
+
+        return uuid;
+    }
+
+}
diff --git a/plugins/storage/volume/datera/src/main/resources/META-INF/cloudstack/storage-volume-datera/module.properties b/plugins/storage/volume/datera/src/main/resources/META-INF/cloudstack/storage-volume-datera/module.properties
new file mode 100644
index 0000000..cecb8b1
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/resources/META-INF/cloudstack/storage-volume-datera/module.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+name=storage-volume-datera
+parent=storage
diff --git a/plugins/storage/volume/datera/src/main/resources/META-INF/cloudstack/storage-volume-datera/spring-storage-volume-datera-context.xml b/plugins/storage/volume/datera/src/main/resources/META-INF/cloudstack/storage-volume-datera/spring-storage-volume-datera-context.xml
new file mode 100644
index 0000000..84ca8fd
--- /dev/null
+++ b/plugins/storage/volume/datera/src/main/resources/META-INF/cloudstack/storage-volume-datera/spring-storage-volume-datera-context.xml
@@ -0,0 +1,33 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns:aop="http://www.springframework.org/schema/aop"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+                      http://www.springframework.org/schema/beans/spring-beans.xsd
+                      http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
+                      http://www.springframework.org/schema/context
+                      http://www.springframework.org/schema/context/spring-context.xsd"
+                      >
+
+    <bean id="dateraDataStoreProvider"
+        class="org.apache.cloudstack.storage.datastore.provider.DateraPrimaryDataStoreProvider" />
+
+</beans>
diff --git a/test/integration/plugins/datera/TestVolumes.py b/test/integration/plugins/datera/TestVolumes.py
new file mode 100644
index 0000000..2dc9ebe
--- /dev/null
+++ b/test/integration/plugins/datera/TestVolumes.py
@@ -0,0 +1,1127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import unittest
+import random
+import os
+import json
+import time
+import math
+import XenAPI
+import collections
+import distutils.util
+
+logger = logging.getLogger(__name__)
+logger_handler = logging.FileHandler('/var/tmp/{}.log'.format(__name__))
+logger_formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+logger_handler.setFormatter(logger_formatter)
+logger.addHandler(logger_handler)
+logger.setLevel(logging.INFO)
+
+# All tests inherit from cloudstackTestCase
+from marvin.cloudstackTestCase import cloudstackTestCase
+
+from nose.plugins.attrib import attr
+
+# Import Integration Libraries
+
+# base - contains all resources as entities and defines create, delete, list operations on them
+from marvin.lib.base import (Account, DiskOffering, ServiceOffering,
+                             StoragePool, User, VirtualMachine, Volume)
+
+# common - commonly used methods for all tests are listed here
+from marvin.lib.common import (get_domain, get_template, get_zone,
+                               list_clusters, list_hosts,
+                               list_virtual_machines,
+                               list_volumes, list_disk_offering)
+
+# utils - utility classes for common cleanup, external library wrappers, etc.
+from marvin.lib.utils import cleanup_resources
+
+from marvin.cloudstackAPI import resizeVolume
+
+#from dfs_sdk import DateraApi
+from dfs_sdk import get_api
+
+
+class TestData():
+    account = "account"
+    capacityBytes = "capacitybytes"
+    capacityIops = "capacityiops"
+    clusterId = "clusterId"
+    managedComputeOffering = "managedComputeoffering"
+    nonManagedComputeOffering = "nonManagedComputeoffering"
+    diskName = "diskname"
+    diskOffering = "diskoffering"
+    domainId = "domainId"
+    hypervisor = "hypervisor"
+    login = "login"
+    mvip = "mvip"
+    password = "password"
+    port = "port"
+    primaryStorage = "primarystorage"
+    provider = "provider"
+    scope = "scope"
+    Datera = "datera"
+    storageTag = "Datera_SAN_1"
+    tags = "tags"
+    templateCacheName = "centos56-x86-64-xen"  # TODO
+    templateName = "templatename"
+    testAccount = "testaccount"
+    url = "url"
+    user = "user"
+    username = "username"
+    virtualMachine = "virtualmachine"
+    virtualMachine2 = "virtualmachine2"
+    volume_1 = "volume_1"
+    volume_2 = "volume_2"
+    xenServer = "xenserver"
+    zoneId = "zoneId"
+
+    def __init__(self):
+        self.testdata = {
+            TestData.Datera: {
+                TestData.mvip: "172.19.2.214",
+                TestData.login: "admin",
+                TestData.password: "password",
+                TestData.port: 80,
+                TestData.url: "https://172.19.2.214:443"
+            },
+            TestData.xenServer: {
+                TestData.username: "root",
+                TestData.password: "password"
+            },
+            TestData.account: {
+                "email": "test@test.com",
+                "firstname": "John",
+                "lastname": "Doe",
+                "username": "test",
+                "password": "test"
+            },
+            TestData.testAccount: {
+                "email": "test2@test2.com",
+                "firstname": "Jane",
+                "lastname": "Doe",
+                "username": "test2",
+                "password": "test"
+            },
+            TestData.user: {
+                "email": "user@test.com",
+                "firstname": "Jane",
+                "lastname": "Doe",
+                "username": "testuser",
+                "password": "password"
+            },
+            TestData.primaryStorage: {
+                "name": "Datera-%d" % random.randint(0, 100),
+                TestData.scope: "ZONE",
+                "url": "MVIP=172.19.2.214;SVIP=172.28.214.9;" +
+                       "clusterAdminUsername=admin;clusterAdminPassword=password;" +
+                       "clusterDefaultMinIops=10000;clusterDefaultMaxIops=15000;" +
+                       "numReplicas=3;",
+                TestData.provider: "Datera",
+                TestData.tags: TestData.storageTag,
+                TestData.capacityIops: 4500000,
+                TestData.capacityBytes: 2251799813685248,
+                TestData.hypervisor: "Any"
+            },
+            TestData.virtualMachine: {
+                "name": "TestVM",
+                "displayname": "TestVM",
+                "privateport": 22,
+                "publicport": 22,
+                "protocol": "tcp"
+            },
+            TestData.virtualMachine2: {
+                "name": "TestVM2",
+                "displayname": "TestVM2",
+                "privateport": 22,
+                "publicport": 22,
+                "protocol": "tcp"
+            },
+            TestData.managedComputeOffering: {
+                "name": "DT_CO_1",
+                "displaytext": "DT_CO_1 (Min IOPS = 10,000; Max IOPS = 15,000)",
+                "cpunumber": 1,
+                "cpuspeed": 100,
+                "memory": 128,
+                "storagetype": "shared",
+                "customizediops": False,
+                "miniops": "10000",
+                "maxiops": "15000",
+                "hypervisorsnapshotreserve": 200,
+                "tags": TestData.storageTag
+            },
+            TestData.nonManagedComputeOffering: {
+                "name": "DT_CO_2",
+                "displaytext": "DT_CO_2 (Min IOPS = 10,000; Max IOPS = 15,000)",
+                "cpunumber": 1,
+                "cpuspeed": 100,
+                "memory": 128,
+                "storagetype": "shared",
+                "customizediops": False,
+                "miniops": "10000",
+                "maxiops": "15000",
+                "hypervisorsnapshotreserve": 200,
+                "tags": TestData.storageTag
+            },
+
+            TestData.diskOffering: {
+                "name": "DT_DO_1",
+                "displaytext": "DT_DO_1 (5GB Min IOPS = 300; Max IOPS = 500)",
+                "disksize": 5,
+                "customizediops": False,
+                "miniops": 300,
+                "maxiops": 500,
+                "hypervisorsnapshotreserve": 200,
+                TestData.tags: TestData.storageTag,
+                "storagetype": "shared"
+            },
+            "testdiskofferings": {
+                "customiopsdo": {
+                    "name": "DT_Custom_Iops_DO",
+                    "displaytext": "Customized Iops DO",
+                    "disksize": 5,
+                    "customizediops": True,
+                    "miniops": 500,
+                    "maxiops": 1000,
+                    "hypervisorsnapshotreserve": 200,
+                    TestData.tags: TestData.storageTag,
+                    "storagetype": "shared"
+                },
+                "customsizedo": {
+                    "name": "DT_Custom_Size_DO",
+                    "displaytext": "Customized Size DO",
+                    "disksize": 5,
+                    "customizediops": False,
+                    "miniops": 500,
+                    "maxiops": 1000,
+                    "hypervisorsnapshotreserve": 200,
+                    TestData.tags: TestData.storageTag,
+                    "storagetype": "shared"
+                },
+                "customsizeandiopsdo": {
+                    "name": "DT_Custom_Iops_Size_DO",
+                    "displaytext": "Customized Size and Iops DO",
+                    "disksize": 10,
+                    "customizediops": True,
+                    "miniops": 400,
+                    "maxiops": 800,
+                    "hypervisorsnapshotreserve": 200,
+                    TestData.tags: TestData.storageTag,
+                    "storagetype": "shared"
+                },
+                "newiopsdo": {
+                    "name": "DT_New_Iops_DO",
+                    "displaytext": "New Iops (min=350, max = 700)",
+                    "disksize": 5,
+                    "miniops": 350,
+                    "maxiops": 700,
+                    "hypervisorsnapshotreserve": 200,
+                    TestData.tags: TestData.storageTag,
+                    "storagetype": "shared"
+                },
+                "newsizedo": {
+                    "name": "DT_New_Size_DO",
+                    "displaytext": "New Size: 10",
+                    "disksize": 10,
+                    "customizediops": False,
+                    "miniops": 400,
+                    "maxiops": 800,
+                    "hypervisorsnapshotreserve": 200,
+                    TestData.tags: TestData.storageTag,
+                    "storagetype": "shared"
+                },
+                "newsizeandiopsdo": {
+                    "name": "DT_New_Size_Iops_DO",
+                    "displaytext": "New Size and Iops",
+                    "disksize": 10,
+                    "customizediops": False,
+                    "miniops": 200,
+                    "maxiops": 800,
+                    "hypervisorsnapshotreserve": 200,
+                    TestData.tags: TestData.storageTag,
+                    "storagetype": "shared"
+                }
+            },
+            TestData.volume_1: {
+                TestData.diskName: "test-volume",
+            },
+            TestData.volume_2: {
+                TestData.diskName: "test-volume-2",
+            },
+            TestData.templateName: "tiny linux kvm",  # TODO
+            TestData.zoneId: 1,
+            TestData.clusterId: 1,
+            TestData.domainId: 1,
+        }
+
+    def update(self, overrideFileName):
+        if os.path.exists(overrideFileName):
+            with open(overrideFileName) as fd:
+                self.testdata = self._update(self.testdata, json.loads(fd.read()))
+
+    def _update(self, d, u):
+
+        for k, v in u.iteritems():
+            if isinstance(v, collections.Mapping):
+                r = self.update(d.get(k, {}), v)
+                d[k] = r
+            else:
+                d[k] = u[k]
+        return d
+
+
+class TestVolumes(cloudstackTestCase):
+    _should_only_be_one_vm_in_list_err_msg = "There should only be one VM in this list."
+    _should_only_be_one_volume_in_list_err_msg = "There should only be one volume in this list."
+    _volume_vm_id_and_vm_id_do_not_match_err_msg = "The volume's VM ID and the VM's ID do not match."
+    _vm_not_in_running_state_err_msg = "The VM is not in the 'Running' state."
+    _vm_not_in_stopped_state_err_msg = "The VM is not in the 'Stopped' state."
+    _sr_not_shared_err_msg = "The SR is not shared."
+    _list_should_be_empty = "The list should be empty."
+    _volume_resize_err = "The Volume was not resized correctly."
+
+    @classmethod
+    def setUpXenServer(cls):
+
+        # Set up xenAPI connection
+        hosts = list_hosts(cls.apiClient, clusterid=cls.testdata[TestData.clusterId])
+        xenserver_info = cls.testdata[TestData.xenServer]
+
+        for h in hosts:
+            host_ip = "https://" + h.ipaddress
+            try:
+                cls.xen_session = XenAPI.Session(host_ip)
+                cls.xen_session.xenapi.login_with_password(xenserver_info[TestData.username],
+                                                           xenserver_info[TestData.password])
+                break
+            except XenAPI.Failure as e:
+                pass
+
+        cls.compute_offering = ServiceOffering.create(
+            cls.apiClient,
+            cls.testdata[TestData.managedComputeOffering]
+        )
+
+        cls.device_name = 'xvdb'
+
+    @classmethod
+    def setUpKVM(cls):
+        logger.info("Setting up KVM")
+        # KVM doesn't support root disks
+        cls.compute_offering = ServiceOffering.create(
+            cls.apiClient,
+            cls.testdata[TestData.nonManagedComputeOffering]
+        )
+
+        cls.device_name = 'vdb'
+
+    @classmethod
+    def setUpClass(cls):
+        """
+        1. Init ACS API and DB connection
+        2. Init Datera API connection
+        3. Create ACS Primary storage
+        4. Create ACS compute and disk offering.
+        5. Create ACS data disk without attaching to a VM 
+        """
+        logger.info("Setting up Class")
+
+        # Set up API client
+        testclient = super(TestVolumes, cls).getClsTestClient()
+        cls.apiClient = testclient.getApiClient()
+        cls.dbConnection = testclient.getDbConnection()
+
+        # Setup test data
+        td = TestData()
+        if cls.config.TestData and cls.config.TestData.Path:
+            td.update(cls.config.TestData.Path)
+        cls.testdata = td.testdata
+
+        # Get Resources from Cloud Infrastructure
+        cls.zone = get_zone(cls.apiClient, zone_name=cls.config.zones[0].name)
+        cls.cluster = list_clusters(cls.apiClient)[0]
+        cls.template = get_template(cls.apiClient, cls.zone.id)
+        cls.domain = get_domain(cls.apiClient, cls.testdata[TestData.domainId])
+
+        # Set up datera connection
+        datera = cls.testdata[TestData.Datera]
+        cls.dt_client = get_api(
+            username=datera[TestData.login],
+            password=datera[TestData.password],
+            hostname=datera[TestData.mvip],
+            version="v2"
+        )
+
+        # Create test account
+        cls.account = Account.create(
+            cls.apiClient,
+            cls.testdata["account"],
+            admin=1
+        )
+
+        # Set up connection to make customized API calls
+        cls.user = User.create(
+            cls.apiClient,
+            cls.testdata["user"],
+            account=cls.account.name,
+            domainid=cls.domain.id
+        )
+
+        primarystorage = cls.testdata[TestData.primaryStorage]
+
+        cls.primary_storage = StoragePool.create(
+            cls.apiClient,
+            primarystorage,
+            scope=primarystorage[TestData.scope],
+            zoneid=cls.zone.id,
+            provider=primarystorage[TestData.provider],
+            tags=primarystorage[TestData.tags],
+            capacityiops=primarystorage[TestData.capacityIops],
+            capacitybytes=primarystorage[TestData.capacityBytes],
+            hypervisor=primarystorage[TestData.hypervisor]
+        )
+
+        cls.disk_offering = DiskOffering.create(
+            cls.apiClient,
+            cls.testdata[TestData.diskOffering]
+        )
+
+        cls.disk_offering_new = DiskOffering.create(
+            cls.apiClient,
+            cls.testdata['testdiskofferings']['newsizeandiopsdo']
+        )
+
+        cls.supports_resign = cls._get_supports_resign()
+
+        # Set up hypervisor specific connections
+        if cls.cluster.hypervisortype.lower() == 'xenserver':
+            cls.setUpXenServer()
+        if cls.cluster.hypervisortype.lower() == 'kvm':
+            cls.setUpKVM()
+
+        # Create 1 data volume_1
+        cls.volume = Volume.create(
+            cls.apiClient,
+            cls.testdata[TestData.volume_1],
+            account=cls.account.name,
+            domainid=cls.domain.id,
+            zoneid=cls.zone.id,
+            diskofferingid=cls.disk_offering.id
+        )
+
+        # Resources that are to be destroyed
+        cls._cleanup = [
+            cls.volume,
+            cls.compute_offering,
+            cls.disk_offering,
+            cls.disk_offering_new,
+            cls.user,
+            cls.account
+        ]
+
+    @classmethod
+    def tearDownClass(cls):
+        logger.info("Tearing Down Class")
+        try:
+            cleanup_resources(cls.apiClient, cls._cleanup)
+
+            cls.primary_storage.delete(cls.apiClient)
+
+            cls._purge_datera_volumes()
+
+        except Exception as e:
+            logging.debug("Exception in tearDownClass(cls): %s" % e)
+
+    def setUp(self):
+        logger.info("Setup test")
+        self.attached = False
+        self.cleanup = []
+
+    def tearDown(self):
+        logger.info("Tearing Down test")
+        cleanup_resources(self.apiClient, self.cleanup)
+
+    @classmethod
+    def _set_supports_resign(cls, val):
+
+        supports_resign = str(val).lower()
+        cls.supports_resign = val
+
+        # make sure you can connect to MySQL: https://teamtreehouse.com/community/cant-connect-remotely-to-mysql-server-with-mysql-workbench
+
+        sql_query = "Update host_details Set value = '" + supports_resign + "' Where name = 'supportsResign'"
+        cls.dbConnection.execute(sql_query)
+
+        sql_query = "Update cluster_details Set value = '" + supports_resign + "' Where name = 'supportsResign'"
+        cls.dbConnection.execute(sql_query)
+
+    @classmethod
+    def _get_supports_resign(cls):
+
+        sql_query = "SELECT value from cluster_details Where name='supportsResign' AND cluster_id=%d" % cls.testdata[
+            TestData.clusterId]
+
+        sql_result = cls.dbConnection.execute(sql_query)
+        logger.warn(sql_result)
+
+        if len(sql_result) < 1:
+            return False
+
+        return bool(distutils.util.strtobool(sql_result[0][0].lower()))
+
+    def _get_cs_storage_pool_db_id(self, storage_pool):
+        return self._get_db_id("storage_pool", storage_pool)
+
+    def _get_db_id(self, table, db_obj):
+        sql_query = "Select id From " + table + " Where uuid = '" + str(db_obj.id) + "'"
+        sql_result = self.dbConnection.execute(sql_query)
+        return sql_result[0][0]
+
+    @classmethod
+    def _purge_datera_volumes(cls):
+        logger.warn("Deleting all volumes")
+        for ai in cls.dt_client.app_instances.get().values():
+            logger.warn(ai)
+            if 'CS-T' in ai['name']:
+                ai.set(admin_state="offline")
+                ai.delete()
+
+    def test_01_attach_new_volume_to_stopped_VM(self):
+
+        '''Attach a volume to a stopped virtual machine, then start VM'''
+        # Create VM and volume for tests
+        virtual_machine = VirtualMachine.create(
+            self.apiClient,
+            self.testdata[TestData.virtualMachine],
+            accountid=self.account.name,
+            zoneid=self.zone.id,
+            serviceofferingid=self.compute_offering.id,
+            templateid=self.template.id,
+            domainid=self.domain.id,
+            startvm=True,
+            mode='advanced'
+        )
+        self.cleanup.append(virtual_machine)
+
+        template_volume_name = \
+            self._get_app_instance_name_from_cs_volume(self.template,
+                                                       vol_type='TEMPLATE')
+        dt_volume = self._check_and_get_dt_volume(template_volume_name)
+
+        virtual_machine.stop(self.apiClient, forced=True)
+
+        new_volume = Volume.create(
+            self.apiClient,
+            self.testdata[TestData.volume_2],
+            account=self.account.name,
+            domainid=self.domain.id,
+            zoneid=self.zone.id,
+            diskofferingid=self.disk_offering.id
+        )
+
+        self.cleanup.append(new_volume)
+
+        self._check_and_get_cs_volume(new_volume.id, self.testdata[TestData.volume_2][TestData.diskName])
+
+        new_volume = virtual_machine.attach_volume(
+            self.apiClient,
+            new_volume
+        )
+
+        newvolume = self._check_and_get_cs_volume(new_volume.id, self.testdata[TestData.volume_2][TestData.diskName])
+
+        virtual_machine.start(self.apiClient)
+
+        vm = self._get_vm(virtual_machine.id)
+
+        self.assertEqual(
+            newvolume.virtualmachineid,
+            vm.id,
+            TestVolumes._volume_vm_id_and_vm_id_do_not_match_err_msg
+        )
+
+        self.assertEqual(
+            vm.state.lower(),
+            "running",
+            TestVolumes._vm_not_in_running_state_err_msg
+        )
+
+        dt_volume_size = self._get_volume_size_with_hsr(newvolume)
+
+        iqn = self._get_iqn(newvolume)
+
+        dt_new_volname = self._get_app_instance_name_from_cs_volume(newvolume)
+
+        dt_volume = self._check_and_get_dt_volume(dt_new_volname)
+
+        self._check_size_and_iops(dt_volume, newvolume, dt_volume_size)
+
+        initiator_group_name = self._get_initiator_group_name()
+
+        self._check_initiator_group(dt_volume, initiator_group_name)
+
+        self._check_hypervisor(iqn)
+        logger.info("Detach volume from the VM")
+        virtual_machine.detach_volume(
+            self.apiClient,
+            new_volume
+        )
+
+    def test_02_attach_detach_attach_volume(self):
+        '''Attach, detach, and attach volume to a running VM'''
+
+        # Create VM and volume for tests
+        virtual_machine = VirtualMachine.create(
+            self.apiClient,
+            self.testdata[TestData.virtualMachine],
+            accountid=self.account.name,
+            zoneid=self.zone.id,
+            serviceofferingid=self.compute_offering.id,
+            templateid=self.template.id,
+            domainid=self.domain.id,
+            startvm=True,
+            mode='advanced'
+        )
+        self.cleanup.append(virtual_machine)
+
+        self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        #######################################
+        #######################################
+        # STEP 1: Attach volume to running VM #
+        #######################################
+        #######################################
+        self.volume = virtual_machine.attach_volume(
+            self.apiClient,
+            self.volume
+        )
+
+        self.attached = True
+
+        vol = self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        vm = self._get_vm(virtual_machine.id)
+
+        initiator_group_name = self._get_initiator_group_name()
+
+        self.assertEqual(
+            vol.virtualmachineid,
+            vm.id,
+            TestVolumes._volume_vm_id_and_vm_id_do_not_match_err_msg
+        )
+
+        self.assertEqual(
+            vm.state.lower(),
+            'running',
+            TestVolumes._vm_not_in_running_state_err_msg
+        )
+
+        iqn = self._get_iqn(self.volume)
+
+        dt_volume_size = self._get_volume_size_with_hsr(self.volume)
+
+        dt_volume_name = self._get_app_instance_name_from_cs_volume(self.volume)
+
+        dt_volume = self._check_and_get_dt_volume(dt_volume_name)
+
+        self._check_initiator_group(dt_volume, initiator_group_name)
+
+        self._check_size_and_iops(dt_volume, vol, dt_volume_size)
+
+        self._check_hypervisor(iqn)
+
+        #########################################
+        #########################################
+        # STEP 2: Detach volume from running VM #
+        #########################################
+        #########################################
+
+        self.volume = virtual_machine.detach_volume(
+            self.apiClient,
+            self.volume
+        )
+
+        self.attached = False
+
+        vol = self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        vm = self._get_vm(virtual_machine.id)
+
+        self.assertEqual(
+            vol.virtualmachineid,
+            None,
+            "The volume should not be attached to a VM."
+        )
+
+        self.assertEqual(
+            vm.state.lower(),
+            'running',
+            str(vm.state)
+        )
+
+        dt_volume = self._check_and_get_dt_volume(dt_volume_name)
+
+        self._check_initiator_group(dt_volume, initiator_group_name, False)
+
+        self._check_hypervisor(iqn, False)
+
+        #######################################
+        #######################################
+        # STEP 3: Attach volume to running VM #
+        #######################################
+        #######################################
+
+        time.sleep(30)
+        self.volume = virtual_machine.attach_volume(
+            self.apiClient,
+            self.volume
+        )
+
+        self.attached = True
+
+        vol = self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        vm = self._get_vm(virtual_machine.id)
+
+        self.assertEqual(
+            vol.virtualmachineid,
+            vm.id,
+            TestVolumes._volume_vm_id_and_vm_id_do_not_match_err_msg
+        )
+
+        self.assertEqual(
+            vm.state.lower(),
+            'running',
+            TestVolumes._vm_not_in_running_state_err_msg
+        )
+
+        dt_volume = self._check_and_get_dt_volume(dt_volume_name)
+
+        self._check_initiator_group(dt_volume, initiator_group_name)
+
+        self._check_hypervisor(iqn)
+
+    def test_03_attached_volume_reboot_VM(self):
+        '''Attach volume to running VM, then reboot.'''
+        # Create VM and volume for tests
+        virtual_machine = VirtualMachine.create(
+            self.apiClient,
+            self.testdata[TestData.virtualMachine],
+            accountid=self.account.name,
+            zoneid=self.zone.id,
+            serviceofferingid=self.compute_offering.id,
+            templateid=self.template.id,
+            domainid=self.domain.id,
+            startvm=True,
+            mode='advanced'
+        )
+        self.cleanup.append(virtual_machine)
+
+        self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        #######################################
+        #######################################
+        # STEP 1: Attach volume to running VM #
+        #######################################
+        #######################################
+
+        self.volume = virtual_machine.attach_volume(
+            self.apiClient,
+            self.volume
+        )
+
+        self.attached = True
+
+        dt_volume_name = self._get_app_instance_name_from_cs_volume(self.volume)
+
+        vol = self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        vm = self._get_vm(virtual_machine.id)
+
+        initiator_group_name = self._get_initiator_group_name()
+
+        self.assertEqual(
+            vol.virtualmachineid,
+            vm.id,
+            TestVolumes._volume_vm_id_and_vm_id_do_not_match_err_msg
+        )
+
+        self.assertEqual(
+            vm.state.lower(),
+            'running',
+            TestVolumes._vm_not_in_running_state_err_msg
+        )
+
+        iqn = self._get_iqn(self.volume)
+
+        volume_size_gb = self._get_volume_size_with_hsr(self.volume)
+
+        dt_volume = self._check_and_get_dt_volume(dt_volume_name)
+
+        self._check_size_and_iops(dt_volume, vol, volume_size_gb)
+
+        self._check_initiator_group(dt_volume, initiator_group_name)
+
+        self._check_hypervisor(iqn)
+
+        #######################################
+        #######################################
+        # STEP 2: Reboot VM with attached vol #
+        #######################################
+        #######################################
+        virtual_machine.reboot(self.apiClient)
+
+        vol = self._check_and_get_cs_volume(self.volume.id, self.testdata[TestData.volume_1][TestData.diskName])
+
+        vm = self._get_vm(virtual_machine.id)
+
+        iqn = self._get_iqn(self.volume)
+
+        dt_volume_size = self._get_volume_size_with_hsr(self.volume)
+
+        dt_volume = self._check_and_get_dt_volume(dt_volume_name)
+
+        self._check_size_and_iops(dt_volume, vol, dt_volume_size)
+
+        self._check_initiator_group(dt_volume, initiator_group_name)
+
+        self._check_hypervisor(iqn)
+
+    def _check_if_device_visible_in_vm(self, vm, dev_name):
+
+        try:
+            ssh_client = vm.get_ssh_client()
+        except Exception as e:
+            self.fail("SSH failed for virtual machine: %s - %s" %
+                      (vm.ipaddress, e))
+
+        cmd = "iostat | grep %s" % dev_name
+        res = ssh_client.execute(cmd)
+        logger.warn(cmd)
+        logger.warn(res)
+
+        if not res:
+            self.fail("Device %s not found on VM: %s" % (dev_name, vm.ipaddress))
+
+    def _check_list(self, in_list, expected_size_of_list, err_msg):
+        self.assertEqual(
+            isinstance(in_list, list),
+            True,
+            "'in_list' is not a list."
+        )
+
+        self.assertEqual(
+            len(in_list),
+            expected_size_of_list,
+            err_msg
+        )
+
+    def _check_initiator_group(self, dt_volume, initiator_group_name, should_exist=True):
+
+        volume_initiator_groups = dt_volume['storage_instances']['storage-1']['acl_policy']['initiator_groups']
+
+        if should_exist:
+            self.assertTrue(
+                initiator_group_name in volume_initiator_groups[0],
+                "Initiator group not assigned to volume"
+            )
+
+        else:
+
+            self.assertTrue(
+                len(volume_initiator_groups) == 0,
+                "Initiator group still asigined to volume, should have been removed"
+            )
+
+    def _check_volume(self, volume, volume_name, disk_offering):
+        self.assertTrue(
+            volume.name.startswith(volume_name),
+            "The volume name is incorrect."
+        )
+
+        self.assertEqual(
+            volume.diskofferingid,
+            disk_offering.id,
+            "The disk offering is incorrect."
+        )
+
+        self.assertEqual(
+            volume.zoneid,
+            self.zone.id,
+            "The zone is incorrect."
+        )
+
+        self.assertEqual(
+            volume.storagetype,
+            self.disk_offering.storagetype,
+            "The storage type is incorrect."
+        )
+
+    def _check_size_and_iops(self, dt_volume, cs_volume, size):
+
+        dt_max_total_iops = dt_volume['storage_instances']['storage-1']['volumes']['volume-1']['performance_policy'][
+            'total_iops_max']
+        self.assertEqual(
+            dt_max_total_iops,
+            cs_volume.maxiops,
+            "Check QOS - Max IOPS: " + str(dt_max_total_iops)
+        )
+
+        dt_volume_size = dt_volume['storage_instances']['storage-1']['volumes']['volume-1']['size']
+        self.assertEqual(
+            dt_volume_size,
+            size,
+            "Check volume size: " + str(dt_volume_size)
+        )
+
+    def _check_and_get_cs_volume(self, volume_id, volume_name, disk_offering=None):
+
+        if not disk_offering:
+            disk_offering = self.disk_offering
+
+        list_volumes_response = list_volumes(
+            self.apiClient,
+            id=volume_id
+        )
+
+        self._check_list(list_volumes_response, 1, TestVolumes._should_only_be_one_volume_in_list_err_msg)
+
+        cs_volume = list_volumes_response[0]
+
+        self._check_volume(cs_volume, volume_name, disk_offering)
+
+        return cs_volume
+
+    def _get_app_instance_name_from_cs_volume(self, cs_volume, vol_type='VOLUME'):
+        """
+        Get Datera app_instance name based on ACS data object types
+        Eg. CS-V-test-volume-7XWJ5Q-dfc41254-371a-40b3-b410-129eb79893c0
+        """
+        app_inst_prefix = 'CS'
+
+        if vol_type == 'VOLUME':
+            vol_type_char = 'V'
+            uuid = cs_volume.id
+            name = cs_volume.name
+            app_instance_name = app_inst_prefix + '-' + vol_type_char + '-' + name + '-' + uuid
+
+        if vol_type == 'TEMPLATE':
+            vol_type_char = 'T'
+            uuid = cs_volume.id
+            primary_storage_db_id = str(self._get_cs_storage_pool_db_id(self.primary_storage))
+            app_instance_name = app_inst_prefix + '-' + vol_type_char + '-' + uuid + '-' + primary_storage_db_id
+
+        return app_instance_name
+
+    def _get_iqn(self, cs_volume):
+        """
+        Get IQN for the CS volume from Datera
+        """
+        app_instance_name = self._get_app_instance_name_from_cs_volume(cs_volume)
+        app_instance = self.dt_client.app_instances.get(app_instance_name)
+        return app_instance['storage_instances']['storage-1']['access']['iqn']
+
+    def _get_cs_volume_size_with_hsr(self, cs_volume):
+
+        disk_size_bytes = cs_volume.size
+
+        disk_offering_id = cs_volume.diskofferingid
+
+        disk_offering = list_disk_offering(self.apiClient, id=disk_offering_id)[0]
+
+        hsr = disk_offering.hypervisorsnapshotreserve
+
+        disk_size_with_hsr_bytes = disk_size_bytes + (disk_size_bytes * hsr) / 100
+
+        disk_size_with_hsr_gb = int(math.ceil(disk_size_with_hsr_bytes / (1024 ** 3)))
+
+        return disk_size_with_hsr_gb
+
+    def _get_volume_size_with_hsr(self, cs_volume):
+
+        app_instance_name = self._get_app_instance_name_from_cs_volume(cs_volume)
+        app_instance = self.dt_client.app_instances.get(app_instance_name)
+
+        volume_size_gb = app_instance['storage_instances']['storage-1']['volumes']['volume-1']['size']
+
+        self.assertEqual(
+            isinstance(volume_size_gb, int),
+            True,
+            "The volume size should be a non-zero integer."
+        )
+
+        return volume_size_gb
+
+    def _get_initiator_group_name(self):
+        init_group_prefix = 'CS-InitiatorGroup'
+        initiator_group_name = init_group_prefix + '-' + self.cluster.id
+        self.dt_client.initiator_groups.get(initiator_group_name)
+        return initiator_group_name
+
+    def _get_dt_volumes(self):
+        return self.dt_client.app_instances.get()
+
+    def _get_vm(self, vm_id):
+        list_vms_response = list_virtual_machines(self.apiClient, id=vm_id)
+
+        self._check_list(list_vms_response, 1, TestVolumes._should_only_be_one_vm_in_list_err_msg)
+
+        return list_vms_response[0]
+
+    def _check_and_get_dt_volume(self, dt_volume_name, should_exist=True):
+        dt_volume = None
+        dt_volumes = self._get_dt_volumes()
+
+        for volume in dt_volumes.values():
+            if volume['name'] == dt_volume_name:
+                dt_volume = volume
+                break
+
+        if should_exist:
+            self.assertNotEqual(
+                dt_volume,
+                None,
+                "Check if Datera volume was created: " + str(dt_volumes)
+            )
+        else:
+            self.assertEqual(
+                dt_volume,
+                None,
+                "Check if volume was deleted: " + str(dt_volumes)
+            )
+
+        return dt_volume
+
+    def _resize_volume(self, volume, new_disk_offering):
+
+        cmd = resizeVolume.resizeVolumeCmd()
+        cmd.id = self.volume.id
+        cmd.diskofferingid = new_disk_offering.id
+
+        self.apiClient.resizeVolume(cmd)
+
+        do_size_bytes = int(new_disk_offering.disksize * (1024 ** 3))
+        retries = 3
+        success = False
+
+        while retries > 0:
+            retries -= 1
+
+            list_volumes_response = list_volumes(
+                self.apiClient,
+                id=volume.id
+            )
+
+            for vol in list_volumes_response:
+                if vol.id == volume.id and \
+                                int(vol.size) == do_size_bytes and \
+                                vol.state == 'Ready':
+                    success = True
+
+            if success:
+                break
+            else:
+                time.sleep(10)
+
+        self.assertEqual(success, True, self._volume_resize_err)
+
+    def _check_hypervisor(self, iqn, should_exist=True):
+        if self.cluster.hypervisortype.lower() == 'xenserver':
+            self._check_xen_sr(iqn, should_exist)
+        else:
+            return
+
+    def _check_xen_sr(self, iqn, should_exist=True):
+
+        xen_sr_name = "/" + iqn + "/0"
+        if should_exist:
+            xen_sr = self.xen_session.xenapi.SR.get_by_name_label(xen_sr_name)[0]
+
+            self.sr_shared = self.xen_session.xenapi.SR.get_shared(xen_sr)
+
+            self.assertEqual(
+                self.sr_shared,
+                True,
+                TestVolumes._sr_not_shared_err_msg
+            )
+        else:
+            xen_sr = self.xen_session.xenapi.SR.get_by_name_label(xen_sr_name)
+
+            self._check_list(xen_sr, 0, TestVolumes._list_should_be_empty)
+
+    def _check_if_device_removed_in_vm(self, vm, dev_name):
+
+        try:
+            ssh_client = vm.get_ssh_client()
+        except Exception as e:
+            self.fail("SSH failed for virtual machine: %s - %s" %
+                      (vm.ipaddress, e))
+
+        cmd = "iostat | grep %s" % dev_name
+        res = ssh_client.execute(cmd)
+        logger.warn(cmd)
+        logger.warn(res)
+
+        if res:
+            self.fail("Device %s still attached on VM: %s" % (dev_name, vm.ipaddress))
+
+    def _start_device_io(self, vm, dev_name):
+
+        try:
+            ssh_client = vm.get_ssh_client()
+        except Exception as e:
+            self.fail("SSH failed for virtual machine: %s - %s" %
+                      (vm.ipaddress, e))
+
+        cmd = "dd if=/dev/urandom of=/dev/%s &" % dev_name
+        res = ssh_client.execute(cmd)
+        logger.warn(cmd)
+        logger.warn(res)
+
+    def _stop_device_io(self, vm, dev_name):
+
+        try:
+            ssh_client = vm.get_ssh_client()
+        except Exception as e:
+            self.fail("SSH failed for virtual machine: %s - %s" %
+                      (vm.ipaddress, e))
+
+        cmd = "killall -9 dd"
+        res = ssh_client.execute(cmd)
+        logger.warn(cmd)
+        logger.warn(res)
+
+    def _get_bytes_written(self, vm, dev_name):
+
+        try:
+            ssh_client = vm.get_ssh_client()
+        except Exception as e:
+            self.fail("SSH failed for virtual machine: %s - %s" %
+                      (vm.ipaddress, e))
+
+        cmd = "iostat | grep %s " % dev_name
+        res = ssh_client.execute(cmd)
+        logger.warn(cmd)
+        logger.warn(res)
+
+        self.assertNotEqual(res, None, "Error getting iostat info")
+
+        ret_data = ' '.join(map(str, res)).strip()
+        return int(ret_data.split()[-1])