You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ra...@apache.org on 2016/11/02 09:14:31 UTC

[3/4] git commit: updated refs/heads/master to f7733b4

    Support Backup of Snapshots for Managed Storage

    This PR adds an ability to Pass a new parameter, locationType,
    to the \u201ccreateSnapshot\u201d API command. Depending on the locationType,
    we decide where the snapshot should go in case of managed storage.

    There are two possible values for the locationType param

    1) `Standard`: The standard operation for managed storage is to
    keep the snapshot on the device. For non-managed storage, this will
    be to upload it to secondary storage. This option will be the
    default.

    2) `Archive`: Applicable only to managed storage. This will
    keep the snapshot on the secondary storage. For non-managed
    storage, this will result in an error.

    The reason for implementing this feature is to avoid a single
    point of failure for primary storage. Right now in case of managed
    storage, if the primary storage goes down, there is no easy way
    to recover data as all snapshots are also stored on the primary.
    This features allows us to mitigate that risk.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/f46651e6
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/f46651e6
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/f46651e6

Branch: refs/heads/master
Commit: f46651e6721106941deeb6b5e6bf51d7e9efc61c
Parents: 054a717
Author: Syed <sy...@gmail.com>
Authored: Thu Jun 30 13:37:33 2016 -0400
Committer: Mike Tutkowski <mi...@solidfire.com>
Committed: Sun Oct 30 23:19:58 2016 -0600

----------------------------------------------------------------------
 api/src/com/cloud/storage/Snapshot.java         |  12 +-
 api/src/com/cloud/storage/VolumeApiService.java |   8 +-
 .../storage/snapshot/SnapshotApiService.java    |   2 +-
 .../org/apache/cloudstack/api/ApiConstants.java |   1 +
 .../user/snapshot/CreateSnapshotCmd.java        |  56 ++-
 .../api/response/SnapshotResponse.java          |  18 +-
 .../api/command/test/CreateSnapshotCmdTest.java | 132 ++++++
 .../storage/to/PrimaryDataStoreTO.java          |  13 +-
 .../subsystem/api/storage/SnapshotResult.java   |  12 +-
 .../com/cloud/vm/VmWorkTakeVolumeSnapshot.java  |   8 +-
 .../src/com/cloud/storage/SnapshotVO.java       |  29 +-
 .../motion/StorageSystemDataMotionStrategy.java | 424 +++++++++++++++----
 .../storage/snapshot/SnapshotObject.java        |   3 +
 .../storage/snapshot/SnapshotServiceImpl.java   |   2 +-
 .../storage/snapshot/SnapshotStrategyBase.java  |   2 +-
 .../snapshot/StorageSystemSnapshotStrategy.java | 148 +++++--
 .../snapshot/XenserverSnapshotStrategy.java     |   4 +-
 .../image/db/SnapshotDataStoreDaoImpl.java      |  37 +-
 .../storage/snapshot/SnapshotEntityImpl.java    |   5 +
 .../xenserver/resource/CitrixResourceBase.java  | 378 +++++++++--------
 .../resource/XenServerStorageProcessor.java     |  77 ++--
 .../resource/Xenserver625StorageProcessor.java  | 102 +++--
 .../CitrixResizeVolumeCommandWrapper.java       |   2 +-
 server/src/com/cloud/api/ApiDBUtils.java        |   6 +
 server/src/com/cloud/api/ApiResponseHelper.java |   1 +
 server/src/com/cloud/configuration/Config.java  |  17 +-
 .../configuration/ConfigurationManagerImpl.java |   1 +
 .../cloud/storage/CreateSnapshotPayload.java    |   8 +-
 .../com/cloud/storage/VolumeApiServiceImpl.java | 181 ++++----
 .../storage/snapshot/SnapshotManagerImpl.java   |  95 +++--
 .../cloud/storage/VolumeApiServiceImplTest.java | 117 +++--
 .../storage/snapshot/SnapshotManagerTest.java   |  71 ++--
 .../resource/NfsSecondaryStorageResource.java   | 227 +++++-----
 setup/db/db/schema-481to490.sql                 |   2 +
 .../plugins/solidfire/TestSnapshots.py          |   2 +-
 tools/marvin/marvin/lib/base.py                 |   7 +-
 36 files changed, 1464 insertions(+), 746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/src/com/cloud/storage/Snapshot.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/storage/Snapshot.java b/api/src/com/cloud/storage/Snapshot.java
index 7f0168e..9e44e57 100644
--- a/api/src/com/cloud/storage/Snapshot.java
+++ b/api/src/com/cloud/storage/Snapshot.java
@@ -16,14 +16,13 @@
 // under the License.
 package com.cloud.storage;
 
-import java.util.Date;
-
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.utils.fsm.StateObject;
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.api.Identity;
 import org.apache.cloudstack.api.InternalIdentity;
 
-import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.utils.fsm.StateObject;
+import java.util.Date;
 
 public interface Snapshot extends ControlledEntity, Identity, InternalIdentity, StateObject<Snapshot.State> {
     public enum Type {
@@ -67,6 +66,10 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity,
         CreateRequested, OperationNotPerformed, BackupToSecondary, BackedupToSecondary, DestroyRequested, CopyingRequested, OperationSucceeded, OperationFailed
     }
 
+    enum LocationType {
+        PRIMARY, SECONDARY
+    }
+
     public static final long MANUAL_POLICY_ID = 0L;
 
     @Override
@@ -89,4 +92,5 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity,
 
     short getsnapshotType();
 
+    LocationType getLocationType(); // This type is in reference to the location where the snapshot resides (ex. primary storage, archive on secondary storage, etc.)
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/src/com/cloud/storage/VolumeApiService.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/storage/VolumeApiService.java b/api/src/com/cloud/storage/VolumeApiService.java
index 7832b89..f562ce2 100644
--- a/api/src/com/cloud/storage/VolumeApiService.java
+++ b/api/src/com/cloud/storage/VolumeApiService.java
@@ -70,8 +70,6 @@ public interface VolumeApiService {
     /**
      * Uploads the volume to secondary storage
      *
-     * @param UploadVolumeCmdByAdmin cmd
-     *
      * @return Volume object
      */
     Volume uploadVolume(UploadVolumeCmd cmd)    throws ResourceAllocationException;
@@ -82,11 +80,11 @@ public interface VolumeApiService {
 
     Volume attachVolumeToVM(AttachVolumeCmd command);
 
-    Volume detachVolumeFromVM(DetachVolumeCmd cmmd);
+    Volume detachVolumeFromVM(DetachVolumeCmd cmd);
 
-    Snapshot takeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account, boolean quiescevm) throws ResourceAllocationException;
+    Snapshot takeSnapshot(Long volumeId, Long policyId, Long snapshotId, Account account, boolean quiescevm, Snapshot.LocationType locationType) throws ResourceAllocationException;
 
-    Snapshot allocSnapshot(Long volumeId, Long policyId, String snapshotName) throws ResourceAllocationException;
+    Snapshot allocSnapshot(Long volumeId, Long policyId, String snapshotName, Snapshot.LocationType locationType) throws ResourceAllocationException;
 
     Volume updateVolume(long volumeId, String path, String state, Long storageId, Boolean displayVolume, String customId, long owner, String chainInfo);
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/src/com/cloud/storage/snapshot/SnapshotApiService.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/storage/snapshot/SnapshotApiService.java b/api/src/com/cloud/storage/snapshot/SnapshotApiService.java
index fb48f47..013704d 100644
--- a/api/src/com/cloud/storage/snapshot/SnapshotApiService.java
+++ b/api/src/com/cloud/storage/snapshot/SnapshotApiService.java
@@ -86,7 +86,7 @@ public interface SnapshotApiService {
 
     boolean deleteSnapshotPolicies(DeleteSnapshotPoliciesCmd cmd);
 
-    Snapshot allocSnapshot(Long volumeId, Long policyId, String snapshotName) throws ResourceAllocationException;
+    Snapshot allocSnapshot(Long volumeId, Long policyId, String snapshotName, Snapshot.LocationType locationType) throws ResourceAllocationException;
 
     /**
      * Create a snapshot of a volume

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/src/org/apache/cloudstack/api/ApiConstants.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/ApiConstants.java b/api/src/org/apache/cloudstack/api/ApiConstants.java
index 0e83849..ad68658 100644
--- a/api/src/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/org/apache/cloudstack/api/ApiConstants.java
@@ -131,6 +131,7 @@ public class ApiConstants {
     public static final String INTERNAL_DNS1 = "internaldns1";
     public static final String INTERNAL_DNS2 = "internaldns2";
     public static final String INTERVAL_TYPE = "intervaltype";
+    public static final String LOCATION_TYPE = "locationtype";
     public static final String IOPS_READ_RATE = "iopsreadrate";
     public static final String IOPS_WRITE_RATE = "iopswriterate";
     public static final String IP_ADDRESS = "ipaddress";

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/src/org/apache/cloudstack/api/command/user/snapshot/CreateSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/command/user/snapshot/CreateSnapshotCmd.java b/api/src/org/apache/cloudstack/api/command/user/snapshot/CreateSnapshotCmd.java
index 306ab5f..4523889 100644
--- a/api/src/org/apache/cloudstack/api/command/user/snapshot/CreateSnapshotCmd.java
+++ b/api/src/org/apache/cloudstack/api/command/user/snapshot/CreateSnapshotCmd.java
@@ -16,8 +16,15 @@
 // under the License.
 package org.apache.cloudstack.api.command.user.snapshot;
 
-import org.apache.log4j.Logger;
-
+import com.cloud.event.EventTypes;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.exception.PermissionDeniedException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.projects.Project;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.Volume;
+import com.cloud.user.Account;
+import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.cloudstack.api.APICommand;
 import org.apache.cloudstack.api.ApiCommandJobType;
 import org.apache.cloudstack.api.ApiConstants;
@@ -30,16 +37,7 @@ import org.apache.cloudstack.api.response.DomainResponse;
 import org.apache.cloudstack.api.response.SnapshotPolicyResponse;
 import org.apache.cloudstack.api.response.SnapshotResponse;
 import org.apache.cloudstack.api.response.VolumeResponse;
-import org.apache.cloudstack.context.CallContext;
-
-import com.cloud.event.EventTypes;
-import com.cloud.exception.InvalidParameterValueException;
-import com.cloud.exception.PermissionDeniedException;
-import com.cloud.exception.ResourceAllocationException;
-import com.cloud.projects.Project;
-import com.cloud.storage.Snapshot;
-import com.cloud.storage.Volume;
-import com.cloud.user.Account;
+import org.apache.log4j.Logger;
 
 @APICommand(name = "createSnapshot", description = "Creates an instant snapshot of a volume.", responseObject = SnapshotResponse.class, entityType = {Snapshot.class},
         requestHasSensitiveInfo = false, responseHasSensitiveInfo = false)
@@ -74,6 +72,10 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
     @Parameter(name = ApiConstants.SNAPSHOT_QUIESCEVM, type = CommandType.BOOLEAN, required = false, description = "quiesce vm if true")
     private Boolean quiescevm;
 
+    @Parameter(name = ApiConstants.LOCATION_TYPE, type = CommandType.STRING, required = false, description = "Currently applicable only for managed storage. " +
+            "Valid location types: 'primary', 'secondary'. Default = 'primary'.")
+    private String locationType;
+
     @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "the name of the snapshot")
     private String snapshotName;
 
@@ -108,7 +110,7 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
     }
 
     public String getVolumeUuid() {
-        Volume volume = (Volume)this._entityMgr.findById(Volume.class, getVolumeId());
+        Volume volume = _entityMgr.findById(Volume.class, getVolumeId());
         if (volume == null) {
             throw new InvalidParameterValueException("Unable to find volume's UUID");
         }
@@ -184,7 +186,7 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
 
     @Override
     public void create() throws ResourceAllocationException {
-        Snapshot snapshot = _volumeService.allocSnapshot(getVolumeId(), getPolicyId(), getSnapshotName());
+        Snapshot snapshot = _volumeService.allocSnapshot(getVolumeId(), getPolicyId(), getSnapshotName(), getLocationType());
         if (snapshot != null) {
             setEntityId(snapshot.getId());
             setEntityUuid(snapshot.getUuid());
@@ -195,21 +197,37 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
 
     @Override
     public void execute() {
-        s_logger.info("VOLSS: createSnapshotCmd starts:" + System.currentTimeMillis());
-        CallContext.current().setEventDetails("Volume Id: " + getVolumeUuid());
         Snapshot snapshot;
         try {
             snapshot =
-                _volumeService.takeSnapshot(getVolumeId(), getPolicyId(), getEntityId(), _accountService.getAccount(getEntityOwnerId()), getQuiescevm());
+                _volumeService.takeSnapshot(getVolumeId(), getPolicyId(), getEntityId(), _accountService.getAccount(getEntityOwnerId()), getQuiescevm(), getLocationType());
+
             if (snapshot != null) {
                 SnapshotResponse response = _responseGenerator.createSnapshotResponse(snapshot);
                 response.setResponseName(getCommandName());
                 setResponseObject(response);
             } else {
-                throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + volumeId);
+                throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + getVolumeId());
             }
         } catch (Exception e) {
-            throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + volumeId);
+            throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + getVolumeId());
+        }
+    }
+
+    private Snapshot.LocationType getLocationType() {
+
+        if (Snapshot.LocationType.values() == null || Snapshot.LocationType.values().length == 0 || locationType == null) {
+            return null;
+        }
+
+        try {
+            String lType = locationType.trim().toUpperCase();
+            return Snapshot.LocationType.valueOf(lType);
+        } catch (IllegalArgumentException e) {
+            String errMesg = "Invalid locationType " + locationType + "Specified for volume " + getVolumeId()
+                        + " Valid values are: primary,secondary ";
+            s_logger.warn(errMesg);
+            throw  new CloudRuntimeException(errMesg);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java b/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
index aa8de5c..f560f43 100644
--- a/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
+++ b/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
@@ -16,17 +16,15 @@
 // under the License.
 package org.apache.cloudstack.api.response;
 
-import java.util.Date;
-import java.util.List;
-
+import com.cloud.serializer.Param;
+import com.cloud.storage.Snapshot;
 import com.google.gson.annotations.SerializedName;
-
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.BaseResponse;
 import org.apache.cloudstack.api.EntityReference;
 
-import com.cloud.serializer.Param;
-import com.cloud.storage.Snapshot;
+import java.util.Date;
+import java.util.List;
 
 @EntityReference(value = Snapshot.class)
 public class SnapshotResponse extends BaseResponse implements ControlledEntityResponse {
@@ -82,6 +80,10 @@ public class SnapshotResponse extends BaseResponse implements ControlledEntityRe
     @Param(description = "valid types are hourly, daily, weekly, monthy, template, and none.")
     private String intervalType;
 
+    @SerializedName(ApiConstants.LOCATION_TYPE)
+    @Param(description = "valid location types are primary and secondary.")
+    private String locationType;
+
     @SerializedName(ApiConstants.STATE)
     @Param(description = "the state of the snapshot. BackedUp means that snapshot is ready to be used; Creating - the snapshot is being allocated on the primary storage; BackingUp - the snapshot is being backed up on secondary storage")
     private Snapshot.State state;
@@ -166,6 +168,10 @@ public class SnapshotResponse extends BaseResponse implements ControlledEntityRe
         this.intervalType = intervalType;
     }
 
+    public void setLocationType(String locationType) {
+        this.locationType = locationType;
+    }
+
     public void setState(Snapshot.State state) {
         this.state = state;
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/api/test/org/apache/cloudstack/api/command/test/CreateSnapshotCmdTest.java
----------------------------------------------------------------------
diff --git a/api/test/org/apache/cloudstack/api/command/test/CreateSnapshotCmdTest.java b/api/test/org/apache/cloudstack/api/command/test/CreateSnapshotCmdTest.java
new file mode 100644
index 0000000..5e2b222
--- /dev/null
+++ b/api/test/org/apache/cloudstack/api/command/test/CreateSnapshotCmdTest.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.command.test;
+
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.VolumeApiService;
+import com.cloud.user.Account;
+import com.cloud.user.AccountService;
+import junit.framework.TestCase;
+import org.apache.cloudstack.api.ResponseGenerator;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.command.user.snapshot.CreateSnapshotCmd;
+import org.apache.cloudstack.api.response.SnapshotResponse;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.isNull;
+
+public class CreateSnapshotCmdTest extends TestCase {
+
+    private CreateSnapshotCmd createSnapshotCmd;
+    private ResponseGenerator responseGenerator;
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Override
+    @Before
+    public void setUp() {
+
+        createSnapshotCmd = new CreateSnapshotCmd() {
+
+            @Override
+            public String getCommandName() {
+                return "createsnapshotresponse";
+            }
+
+            @Override
+            public Long getVolumeId(){
+                return 1L;
+            }
+
+            @Override
+            public long getEntityOwnerId(){
+                return 1L;
+            }
+        };
+
+    }
+
+    @Test
+    public void testCreateSuccess() {
+
+        AccountService accountService = Mockito.mock(AccountService.class);
+        Account account = Mockito.mock(Account.class);
+        Mockito.when(accountService.getAccount(anyLong())).thenReturn(account);
+
+        VolumeApiService volumeApiService = Mockito.mock(VolumeApiService.class);
+        Snapshot snapshot = Mockito.mock(Snapshot.class);
+        try {
+
+            Mockito.when(volumeApiService.takeSnapshot(anyLong(), anyLong(), anyLong(),
+                    any(Account.class), anyBoolean(), isNull(Snapshot.LocationType.class))).thenReturn(snapshot);
+
+        } catch (Exception e) {
+            Assert.fail("Received exception when success expected " + e.getMessage());
+        }
+
+        responseGenerator = Mockito.mock(ResponseGenerator.class);
+        SnapshotResponse snapshotResponse = Mockito.mock(SnapshotResponse.class);
+        Mockito.when(responseGenerator.createSnapshotResponse(snapshot)).thenReturn(snapshotResponse);
+        Mockito.doNothing().when(snapshotResponse).setAccountName(anyString());
+
+        createSnapshotCmd._accountService = accountService;
+        createSnapshotCmd._responseGenerator = responseGenerator;
+        createSnapshotCmd._volumeService = volumeApiService;
+
+        try {
+            createSnapshotCmd.execute();
+        } catch (Exception e) {
+            Assert.fail("Received exception when success expected " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCreateFailure() {
+
+        AccountService accountService = Mockito.mock(AccountService.class);
+        Account account = Mockito.mock(Account.class);
+        Mockito.when(accountService.getAccount(anyLong())).thenReturn(account);
+
+        VolumeApiService volumeApiService = Mockito.mock(VolumeApiService.class);
+
+        try {
+                Mockito.when(volumeApiService.takeSnapshot(anyLong(), anyLong(), anyLong(),
+                        any(Account.class), anyBoolean(), isNull(Snapshot.LocationType.class))).thenReturn(null);
+        } catch (Exception e) {
+            Assert.fail("Received exception when success expected " + e.getMessage());
+        }
+
+        createSnapshotCmd._accountService = accountService;
+        createSnapshotCmd._volumeService = volumeApiService;
+
+        try {
+            createSnapshotCmd.execute();
+        } catch (ServerApiException exception) {
+            Assert.assertEquals("Failed to create snapshot due to an internal error creating snapshot for volume 1", exception.getDescription());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/core/src/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
----------------------------------------------------------------------
diff --git a/core/src/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java b/core/src/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
index 67ff0d7..1572efe 100644
--- a/core/src/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
+++ b/core/src/org/apache/cloudstack/storage/to/PrimaryDataStoreTO.java
@@ -19,13 +19,12 @@
 
 package org.apache.cloudstack.storage.to;
 
-import java.util.Map;
-
-import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStore;
-
 import com.cloud.agent.api.to.DataStoreTO;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.Storage.StoragePoolType;
+import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStore;
+
+import java.util.Map;
 
 public class PrimaryDataStoreTO implements DataStoreTO {
     public static final String MANAGED = PrimaryDataStore.MANAGED;
@@ -52,6 +51,7 @@ public class PrimaryDataStoreTO implements DataStoreTO {
     private Map<String, String> details;
     private static final String pathSeparator = "/";
     private Boolean fullCloneFlag;
+    private final boolean isManaged;
 
     public PrimaryDataStoreTO(PrimaryDataStore dataStore) {
         this.uuid = dataStore.getUuid();
@@ -63,6 +63,7 @@ public class PrimaryDataStoreTO implements DataStoreTO {
         this.setPort(dataStore.getPort());
         this.url = dataStore.getUri();
         this.details = dataStore.getDetails();
+        this.isManaged = dataStore.isManaged();
     }
 
     public long getId() {
@@ -153,4 +154,8 @@ public class PrimaryDataStoreTO implements DataStoreTO {
     public void setFullCloneFlag(Boolean fullCloneFlag) {
         this.fullCloneFlag = fullCloneFlag;
     }
+
+    public boolean isManaged() {
+        return isManaged;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/api/src/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotResult.java
----------------------------------------------------------------------
diff --git a/engine/api/src/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotResult.java b/engine/api/src/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotResult.java
index ca505aa..7622693 100644
--- a/engine/api/src/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotResult.java
+++ b/engine/api/src/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotResult.java
@@ -21,21 +21,21 @@ import org.apache.cloudstack.storage.command.CommandResult;
 import com.cloud.agent.api.Answer;
 
 public class SnapshotResult extends CommandResult {
-    private SnapshotInfo snashot;
+    private SnapshotInfo snapshot;
     private Answer answer;
 
     public SnapshotResult(SnapshotInfo snapshot, Answer answer) {
         super();
-        this.setSnashot(snapshot);
+        this.setSnapshot(snapshot);
         this.setAnswer(answer);
     }
 
-    public SnapshotInfo getSnashot() {
-        return snashot;
+    public SnapshotInfo getSnapshot() {
+        return snapshot;
     }
 
-    public void setSnashot(SnapshotInfo snashot) {
-        this.snashot = snashot;
+    public void setSnapshot(SnapshotInfo snapshot) {
+        this.snapshot = snapshot;
     }
 
     public Answer getAnswer() {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/components-api/src/com/cloud/vm/VmWorkTakeVolumeSnapshot.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkTakeVolumeSnapshot.java b/engine/components-api/src/com/cloud/vm/VmWorkTakeVolumeSnapshot.java
index 3160be1..495dc46 100644
--- a/engine/components-api/src/com/cloud/vm/VmWorkTakeVolumeSnapshot.java
+++ b/engine/components-api/src/com/cloud/vm/VmWorkTakeVolumeSnapshot.java
@@ -16,6 +16,8 @@
 // under the License.
 package com.cloud.vm;
 
+import com.cloud.storage.Snapshot;
+
 public class VmWorkTakeVolumeSnapshot extends VmWork {
 
     private static final long serialVersionUID = 341816293003023823L;
@@ -24,14 +26,16 @@ public class VmWorkTakeVolumeSnapshot extends VmWork {
     private Long policyId;
     private Long snapshotId;
     private boolean quiesceVm;
+    private Snapshot.LocationType locationType;
 
     public VmWorkTakeVolumeSnapshot(long userId, long accountId, long vmId, String handlerName,
-            Long volumeId, Long policyId, Long snapshotId, boolean quiesceVm) {
+            Long volumeId, Long policyId, Long snapshotId, boolean quiesceVm, Snapshot.LocationType locationType) {
         super(userId, accountId, vmId, handlerName);
         this.volumeId = volumeId;
         this.policyId = policyId;
         this.snapshotId = snapshotId;
         this.quiesceVm = quiesceVm;
+        this.locationType = locationType;
     }
 
     public Long getVolumeId() {
@@ -49,4 +53,6 @@ public class VmWorkTakeVolumeSnapshot extends VmWork {
     public boolean isQuiesceVm() {
         return quiesceVm;
     }
+
+    public Snapshot.LocationType getLocationType() { return locationType; }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/schema/src/com/cloud/storage/SnapshotVO.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/com/cloud/storage/SnapshotVO.java b/engine/schema/src/com/cloud/storage/SnapshotVO.java
index 950c5e9..8e5c0b6 100644
--- a/engine/schema/src/com/cloud/storage/SnapshotVO.java
+++ b/engine/schema/src/com/cloud/storage/SnapshotVO.java
@@ -16,8 +16,9 @@
 // under the License.
 package com.cloud.storage;
 
-import java.util.Date;
-import java.util.UUID;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.utils.db.GenericDao;
+import com.google.gson.annotations.Expose;
 
 import javax.persistence.Column;
 import javax.persistence.Entity;
@@ -27,11 +28,8 @@ import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Table;
-
-import com.google.gson.annotations.Expose;
-
-import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.utils.db.GenericDao;
+import java.util.Date;
+import java.util.UUID;
 
 @Entity
 @Table(name = "snapshots")
@@ -69,6 +67,11 @@ public class SnapshotVO implements Snapshot {
     @Column(name = "snapshot_type")
     short snapshotType;
 
+    @Expose
+    @Column(name = "location_type", updatable = true, nullable = true)
+    @Enumerated(value = EnumType.STRING)
+    private LocationType locationType;
+
     @Column(name = "type_description")
     String typeDescription;
 
@@ -103,7 +106,7 @@ public class SnapshotVO implements Snapshot {
     }
 
     public SnapshotVO(long dcId, long accountId, long domainId, Long volumeId, Long diskOfferingId, String name, short snapshotType, String typeDescription, long size,
-            Long minIops, Long maxIops, HypervisorType hypervisorType) {
+            Long minIops, Long maxIops, HypervisorType hypervisorType, LocationType locationType) {
         dataCenterId = dcId;
         this.accountId = accountId;
         this.domainId = domainId;
@@ -119,6 +122,7 @@ public class SnapshotVO implements Snapshot {
         this.hypervisorType = hypervisorType;
         version = "2.2";
         uuid = UUID.randomUUID().toString();
+        this.locationType = locationType;
     }
 
     @Override
@@ -172,6 +176,15 @@ public class SnapshotVO implements Snapshot {
     }
 
     @Override
+    public LocationType getLocationType() {
+        return locationType;
+    }
+
+    public void setLocationType(LocationType locationType) {
+        this.locationType = locationType;
+    }
+
+    @Override
     public HypervisorType getHypervisorType() {
         return hypervisorType;
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/datamotion/src/org/apache/cloudstack/storage/motion/StorageSystemDataMotionStrategy.java
----------------------------------------------------------------------
diff --git a/engine/storage/datamotion/src/org/apache/cloudstack/storage/motion/StorageSystemDataMotionStrategy.java b/engine/storage/datamotion/src/org/apache/cloudstack/storage/motion/StorageSystemDataMotionStrategy.java
index 7a59ad0..b636524 100644
--- a/engine/storage/datamotion/src/org/apache/cloudstack/storage/motion/StorageSystemDataMotionStrategy.java
+++ b/engine/storage/datamotion/src/org/apache/cloudstack/storage/motion/StorageSystemDataMotionStrategy.java
@@ -18,76 +18,91 @@
  */
 package org.apache.cloudstack.storage.motion;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-
-import javax.inject.Inject;
-
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.to.DataStoreTO;
+import com.cloud.agent.api.to.DataTO;
+import com.cloud.agent.api.to.DiskTO;
+import com.cloud.agent.api.to.NfsTO;
+import com.cloud.agent.api.to.VirtualMachineTO;
+import com.cloud.configuration.Config;
 import com.cloud.dc.dao.ClusterDao;
 import com.cloud.exception.AgentUnavailableException;
 import com.cloud.exception.OperationTimedoutException;
-
+import com.cloud.host.Host;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.host.dao.HostDetailsDao;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.server.ManagementService;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.DiskOfferingVO;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.Storage.ImageFormat;
+import com.cloud.storage.Volume;
+import com.cloud.storage.VolumeDetailVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.DiskOfferingDao;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.SnapshotDetailsDao;
+import com.cloud.storage.dao.SnapshotDetailsVO;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.dao.VolumeDetailsDao;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.VirtualMachineManager;
+import com.google.common.base.Preconditions;
 import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.ClusterScope;
 import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionStrategy;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
+import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
+import org.apache.cloudstack.engine.subsystem.api.storage.HostScope;
 import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event;
+import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
 import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.StorageCacheManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
 import org.apache.cloudstack.framework.async.AsyncCallFuture;
 import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.storage.command.CopyCmdAnswer;
 import org.apache.cloudstack.storage.command.CopyCommand;
+import org.apache.cloudstack.storage.command.DeleteCommand;
+import org.apache.cloudstack.storage.command.DettachAnswer;
+import org.apache.cloudstack.storage.command.DettachCommand;
 import org.apache.cloudstack.storage.command.ResignatureAnswer;
 import org.apache.cloudstack.storage.command.ResignatureCommand;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
+import org.apache.cloudstack.storage.to.SnapshotObjectTO;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
-import com.cloud.agent.AgentManager;
-import com.cloud.agent.api.to.DiskTO;
-import com.cloud.agent.api.to.VirtualMachineTO;
-import com.cloud.configuration.Config;
-import com.cloud.host.Host;
-import com.cloud.host.HostVO;
-import com.cloud.host.dao.HostDao;
-import com.cloud.host.dao.HostDetailsDao;
-import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.server.ManagementService;
-import com.cloud.storage.DataStoreRole;
-import com.cloud.storage.DiskOfferingVO;
-import com.cloud.storage.SnapshotVO;
-import com.cloud.storage.Storage.ImageFormat;
-import com.cloud.storage.VolumeDetailVO;
-import com.cloud.storage.VolumeVO;
-import com.cloud.storage.dao.DiskOfferingDao;
-import com.cloud.storage.dao.SnapshotDao;
-import com.cloud.storage.dao.SnapshotDetailsDao;
-import com.cloud.storage.dao.SnapshotDetailsVO;
-import com.cloud.storage.dao.VolumeDao;
-import com.cloud.storage.dao.VolumeDetailsDao;
-import com.cloud.utils.NumbersUtil;
-import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.vm.VirtualMachineManager;
-
-import com.google.common.base.Preconditions;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 @Component
 public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
@@ -109,7 +124,8 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
     @Inject private VolumeDataFactory _volumeDataFactory;
     @Inject private VolumeDetailsDao volumeDetailsDao;
     @Inject private VolumeService _volumeService;
-
+    @Inject private StorageCacheManager cacheMgr;
+    @Inject private EndPointSelector selector;
     @Override
     public StrategyPriority canHandle(DataObject srcData, DataObject destData) {
         if (srcData instanceof SnapshotInfo) {
@@ -180,9 +196,9 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
 
             boolean canHandleSrc = canHandle(srcData);
 
-            if (canHandleSrc && destData instanceof TemplateInfo &&
+            if (canHandleSrc && (destData instanceof TemplateInfo || destData instanceof SnapshotInfo) &&
                     (destData.getDataStore().getRole() == DataStoreRole.Image || destData.getDataStore().getRole() == DataStoreRole.ImageCache)) {
-                handleCreateTemplateFromSnapshot(snapshotInfo, (TemplateInfo)destData, callback);
+                handleCopyDataToSecondaryStorage(snapshotInfo, destData, callback);
 
                 return;
             }
@@ -207,16 +223,12 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
                     }
                 }
 
-                if (canHandleSrc) {
-                    String errMsg = "This operation is not supported (DataStoreCapabilities.STORAGE_SYSTEM_SNAPSHOT " +
-                            "not supported by destination storage plug-in). " + getDestDataStoreMsg(destData);
-
-                    LOGGER.warn(errMsg);
-
-                    throw new UnsupportedOperationException(errMsg);
+                if (canHandleDest) {
+                    handleCreateVolumeFromSnapshotOnSecondaryStorage(snapshotInfo, volumeInfo, callback);
+                    return;
                 }
 
-                if (canHandleDest) {
+                if (canHandleSrc) {
                     String errMsg = "This operation is not supported (DataStoreCapabilities.STORAGE_SYSTEM_SNAPSHOT " +
                             "not supported by source storage plug-in). " + getSrcDataStoreMsg(srcData);
 
@@ -280,7 +292,72 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
         return Boolean.parseBoolean(property);
     }
 
-    private void handleCreateTemplateFromSnapshot(SnapshotInfo snapshotInfo, TemplateInfo templateInfo, AsyncCompletionCallback<CopyCommandResult> callback) {
+    protected boolean needCacheStorage(DataObject srcData, DataObject destData) {
+        DataTO srcTO = srcData.getTO();
+        DataStoreTO srcStoreTO = srcTO.getDataStore();
+        DataTO destTO = destData.getTO();
+        DataStoreTO destStoreTO = destTO.getDataStore();
+
+        // both snapshot and volume are on primary datastore. No need for a cache storage as
+        // hypervisor will copy directly
+        if (srcStoreTO instanceof PrimaryDataStoreTO && destStoreTO instanceof PrimaryDataStoreTO) {
+            return false;
+        }
+
+        if (srcStoreTO instanceof NfsTO || srcStoreTO.getRole() == DataStoreRole.ImageCache) {
+            return false;
+        }
+
+
+        if (destStoreTO instanceof NfsTO || destStoreTO.getRole() == DataStoreRole.ImageCache) {
+            return false;
+        }
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("needCacheStorage true, dest at " + destTO.getPath() + " dest role " + destStoreTO.getRole().toString() + srcTO.getPath() + " src role " +
+                srcStoreTO.getRole().toString());
+        }
+        return true;
+    }
+
+    private Scope pickCacheScopeForCopy(DataObject srcData, DataObject destData) {
+        Scope srcScope = srcData.getDataStore().getScope();
+        Scope destScope = destData.getDataStore().getScope();
+
+        Scope selectedScope = null;
+        if (srcScope.getScopeId() != null) {
+            selectedScope = getZoneScope(srcScope);
+        } else if (destScope.getScopeId() != null) {
+            selectedScope = getZoneScope(destScope);
+        } else {
+            LOGGER.warn("Cannot find a zone-wide scope for movement that needs a cache storage");
+        }
+        return selectedScope;
+    }
+
+    private Scope getZoneScope(Scope scope) {
+        ZoneScope zoneScope;
+        if (scope instanceof ClusterScope) {
+            ClusterScope clusterScope = (ClusterScope)scope;
+            zoneScope = new ZoneScope(clusterScope.getZoneId());
+        } else if (scope instanceof HostScope) {
+            HostScope hostScope = (HostScope)scope;
+            zoneScope = new ZoneScope(hostScope.getZoneId());
+        } else {
+            zoneScope = (ZoneScope)scope;
+        }
+        return zoneScope;
+    }
+
+    /**
+     * This function is responsible for copying a volume from the managed store to a secondary store. This is used in two cases
+     * 1) When creating a template from a snapshot
+     * 2) When createSnapshot is called with location=SECONDARY
+     *
+     * @param snapshotInfo Source snapshot
+     * @param destData destination (can be template or snapshot)
+     * @param callback callback for async
+     */
+    private void handleCopyDataToSecondaryStorage(SnapshotInfo snapshotInfo, DataObject destData, AsyncCompletionCallback<CopyCommandResult> callback) {
         try {
             snapshotInfo.processEvent(Event.CopyingRequested);
         }
@@ -292,6 +369,16 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
 
         boolean usingBackendSnapshot = usingBackendSnapshotFor(snapshotInfo);
         boolean computeClusterSupportsResign = clusterDao.getSupportsResigning(hostVO.getClusterId());
+        boolean needCache = needCacheStorage(snapshotInfo, destData);
+
+        DataObject destOnStore = destData;
+
+        if (needCache) {
+            // creates an object in the DB for data to be cached
+            Scope selectedScope = pickCacheScopeForCopy(snapshotInfo, destData);
+            destOnStore = cacheMgr.getCacheObject(snapshotInfo, selectedScope);
+            destOnStore.processEvent(Event.CreateOnlyRequested);
+        }
 
         if (usingBackendSnapshot && !computeClusterSupportsResign) {
             String noSupportForResignErrMsg = "Unable to locate an applicable host with which to perform a resignature operation : Cluster ID = " + hostVO.getClusterId();
@@ -310,16 +397,15 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
 
             String value = _configDao.getValue(Config.PrimaryStorageDownloadWait.toString());
             int primaryStorageDownloadWait = NumbersUtil.parseInt(value, Integer.parseInt(Config.PrimaryStorageDownloadWait.getDefaultValue()));
-            CopyCommand copyCommand = new CopyCommand(snapshotInfo.getTO(), templateInfo.getTO(), primaryStorageDownloadWait, VirtualMachineManager.ExecuteInSequence.value());
+            CopyCommand copyCommand = new CopyCommand(snapshotInfo.getTO(), destOnStore.getTO(), primaryStorageDownloadWait, VirtualMachineManager.ExecuteInSequence.value());
 
             String errMsg = null;
-
             CopyCmdAnswer copyCmdAnswer = null;
 
             try {
                 // If we are using a back-end snapshot, then we should still have access to it from the hosts in the cluster that hostVO is in
                 // (because we passed in true as the third parameter to createVolumeFromSnapshot above).
-                if (usingBackendSnapshot == false) {
+                if (!usingBackendSnapshot) {
                     _volumeService.grantAccess(snapshotInfo, hostVO, srcDataStore);
                 }
 
@@ -328,21 +414,46 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
                 copyCommand.setOptions(srcDetails);
 
                 copyCmdAnswer = (CopyCmdAnswer)_agentMgr.send(hostVO.getId(), copyCommand);
-            }
-            catch (CloudRuntimeException | AgentUnavailableException | OperationTimedoutException ex) {
+
+                if (needCache) {
+
+                    // If cached storage was needed (in case of object store as secondary
+                    // storage), at this point, the data has been copied from the primary
+                    // to the NFS cache by the hypervisor. We now invoke another copy
+                    // command to copy this data from cache to secondary storage. We
+                    // then cleanup the cache
+
+                    destOnStore.processEvent(Event.OperationSuccessed, copyCmdAnswer);
+
+                    CopyCommand cmd = new CopyCommand(destOnStore.getTO(), destData.getTO(), primaryStorageDownloadWait, VirtualMachineManager.ExecuteInSequence.value());
+                    EndPoint ep = selector.select(destOnStore, destData);
+
+                    if (ep == null) {
+                        errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
+                        LOGGER.error(errMsg);
+                        copyCmdAnswer = new CopyCmdAnswer(errMsg);
+                    } else {
+                        copyCmdAnswer = (CopyCmdAnswer) ep.sendMessage(cmd);
+                    }
+
+                    // clean up snapshot copied to staging
+                    performCleanupCacheStorage(destOnStore);
+                }
+
+
+            } catch (CloudRuntimeException | AgentUnavailableException | OperationTimedoutException ex) {
                 String msg = "Failed to create template from snapshot (Snapshot ID = " + snapshotInfo.getId() + ") : ";
 
                 LOGGER.warn(msg, ex);
 
                 throw new CloudRuntimeException(msg + ex.getMessage());
-            }
-            finally {
-                try {
-                    _volumeService.revokeAccess(snapshotInfo, hostVO, srcDataStore);
-                }
-                catch (Exception ex) {
-                    LOGGER.warn("Error revoking access to snapshot (Snapshot ID = " + snapshotInfo.getId() + "): " + ex.getMessage(), ex);
-                }
+
+            } finally {
+
+                // detach and remove access after the volume is created
+                SnapshotObjectTO snapshotObjectTO = (SnapshotObjectTO) snapshotInfo.getTO();
+                DiskTO disk = new DiskTO(snapshotObjectTO, null, snapshotInfo.getPath(), Volume.Type.UNKNOWN);
+                detachManagedStoreVolume(snapshotInfo, hostVO, getProperty(snapshotInfo.getId(), DiskTO.IQN), srcDataStore.getId(), disk);
 
                 if (copyCmdAnswer == null || !copyCmdAnswer.getResult()) {
                     if (copyCmdAnswer != null && !StringUtils.isEmpty(copyCmdAnswer.getDetails())) {
@@ -380,6 +491,141 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
     }
 
     /**
+     * Creates a volume on the storage from a snapshot that resides on the secondary storage (archived snapshot).
+     * @param snapshotInfo snapshot on secondary
+     * @param volumeInfo volume to be created on the storage
+     * @param callback  for async
+     */
+    private void handleCreateVolumeFromSnapshotOnSecondaryStorage(SnapshotInfo snapshotInfo, VolumeInfo volumeInfo, AsyncCompletionCallback<CopyCommandResult> callback) {
+
+        // at this point, the snapshotInfo and volumeInfo should have the same disk offering ID (so either one should be OK to get a DiskOfferingVO instance)
+        DiskOfferingVO diskOffering = _diskOfferingDao.findByIdIncludingRemoved(volumeInfo.getDiskOfferingId());
+        SnapshotVO snapshot = _snapshotDao.findById(snapshotInfo.getId());
+        DataStore destDataStore = volumeInfo.getDataStore();
+
+        // update the volume's hv_ss_reserve (hypervisor snapshot reserve) from a disk offering (used for managed storage)
+        _volumeService.updateHypervisorSnapshotReserveForVolume(diskOffering, volumeInfo.getId(), snapshot.getHypervisorType());
+
+        CopyCmdAnswer copyCmdAnswer = null;
+        String errMsg = null;
+
+        HostVO hostVO = null;
+        try {
+
+            //create a volume on the storage
+            AsyncCallFuture<VolumeApiResult> future = _volumeService.createVolumeAsync(volumeInfo, volumeInfo.getDataStore());
+            VolumeApiResult result = future.get();
+
+            if (result.isFailed()) {
+                LOGGER.error("Failed to create a volume: " + result.getResult());
+                throw new CloudRuntimeException(result.getResult());
+            }
+
+            volumeInfo = _volumeDataFactory.getVolume(volumeInfo.getId(), volumeInfo.getDataStore());
+
+            volumeInfo.processEvent(Event.MigrationRequested);
+
+            volumeInfo = _volumeDataFactory.getVolume(volumeInfo.getId(), volumeInfo.getDataStore());
+
+            hostVO = getHost(snapshotInfo.getDataCenterId(), false);
+
+            //copy the volume from secondary via the hypervisor
+            copyCmdAnswer = performCopyOfVdi(volumeInfo, snapshotInfo, hostVO);
+
+            if (copyCmdAnswer == null || !copyCmdAnswer.getResult()) {
+                if (copyCmdAnswer != null && !StringUtils.isEmpty(copyCmdAnswer.getDetails())) {
+                    errMsg = copyCmdAnswer.getDetails();
+                }
+                else {
+                    errMsg = "Unable to create volume from snapshot";
+                }
+            }
+        }
+        catch (Exception ex) {
+            errMsg = ex.getMessage() != null ? ex.getMessage() : "Copy operation failed in 'StorageSystemDataMotionStrategy.handleCreateVolumeFromSnapshotBothOnStorageSystem'";
+        } finally {
+
+
+            DiskTO disk = new DiskTO(volumeInfo.getTO(), volumeInfo.getDeviceId(), volumeInfo.getPath(),volumeInfo.getVolumeType());
+            long storagePoolId = volumeInfo.getPoolId();
+            detachManagedStoreVolume(volumeInfo, hostVO, volumeInfo.get_iScsiName(), storagePoolId, disk);
+        }
+
+        CopyCommandResult result = new CopyCommandResult(null, copyCmdAnswer);
+
+        result.setResult(errMsg);
+
+        callback.complete(result);
+
+
+    }
+
+    /**
+     * Detaches a managed volume from a host
+     *  @param dataObject Volume to be detached
+     * @param hostVO    Host where the volume is currently attached
+     * @param storagePoolId Storage where volume resides
+     * @param disk  Object which stores disk attributes to send to the agents
+     */
+    private void detachManagedStoreVolume(DataObject dataObject, HostVO hostVO, String iqn, long storagePoolId, DiskTO disk) {
+
+        DataStore destDataStore = dataObject.getDataStore();
+        DettachCommand cmd = new DettachCommand(disk, null);
+        StoragePoolVO storagePool = _storagePoolDao.findById(storagePoolId);
+
+        cmd.setManaged(true);
+        cmd.setStorageHost(storagePool.getHostAddress());
+        cmd.setStoragePort(storagePool.getPort());
+        cmd.set_iScsiName(iqn);
+
+        try {
+            DettachAnswer dettachAnswer = (DettachAnswer) _agentMgr.send(hostVO.getId(), cmd);
+
+            if (!dettachAnswer.getResult()) {
+                LOGGER.warn("Error detaching DataObject:" + dettachAnswer.getDetails());
+            }
+
+        } catch (Exception e) {
+            LOGGER.warn("Error detaching DataObject " + dataObject.getId() + " Error: " + e.getMessage());
+        }
+
+
+        try {
+            _volumeService.revokeAccess(dataObject, hostVO, destDataStore);
+        } catch (Exception e) {
+            LOGGER.warn("Error revoking access to DataObject (DataObject ID = " + dataObject.getId() + "): " + e.getMessage(), e);
+        }
+    }
+
+    private void performCleanupCacheStorage(DataObject destOnStore) {
+        destOnStore.processEvent(Event.DestroyRequested);
+
+        DeleteCommand deleteCommand = new DeleteCommand(destOnStore.getTO());
+        EndPoint ep = selector.select(destOnStore);
+        try {
+            if (ep == null) {
+                LOGGER.warn("Unable to cleanup staging NFS because no endpoint was found " +
+                "Object: " + destOnStore.getType() + " ID: " + destOnStore.getId());
+
+                destOnStore.processEvent(Event.OperationFailed);
+            } else {
+                Answer deleteAnswer = ep.sendMessage(deleteCommand);
+                if (deleteAnswer != null && deleteAnswer.getResult()) {
+                    LOGGER.warn("Unable to cleanup staging NFS " + deleteAnswer.getDetails() +
+                    "Object: " + destOnStore.getType() + " ID: " + destOnStore.getId());
+                    destOnStore.processEvent(Event.OperationFailed);
+                }
+            }
+
+            destOnStore.processEvent(Event.OperationSuccessed);
+        } catch (Exception e) {
+            destOnStore.processEvent(Event.OperationFailed);
+            LOGGER.warn("Unable to clean up staging cache Exception " + e.getMessage() +
+                    "Object: " + destOnStore.getType() + " ID: " + destOnStore.getId());
+        }
+    }
+
+    /**
      * Clones a template present on the storage to a new volume and resignatures it.
      *
      * @param templateInfo   source template
@@ -418,7 +664,9 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
             volumeDetail = volumeDetailsDao.persist(volumeDetail);
 
             AsyncCallFuture<VolumeApiResult> future = _volumeService.createVolumeAsync(volumeInfo, volumeInfo.getDataStore());
-            VolumeApiResult result = future.get();
+            int storagePoolMaxWaitSeconds = NumbersUtil.parseInt(_configDao.getValue(Config.StoragePoolMaxWaitSeconds.key()), 3600);
+
+            VolumeApiResult result = future.get(storagePoolMaxWaitSeconds, TimeUnit.SECONDS);
 
             if (volumeDetail != null) {
                 volumeDetailsDao.remove(volumeDetail.getId());
@@ -426,14 +674,11 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
 
             if (result.isFailed()) {
                 LOGGER.warn("Failed to create a volume: " + result.getResult());
-
                 throw new CloudRuntimeException(result.getResult());
             }
 
             volumeInfo = _volumeDataFactory.getVolume(volumeInfo.getId(), volumeInfo.getDataStore());
-
             volumeInfo.processEvent(Event.MigrationRequested);
-
             volumeInfo = _volumeDataFactory.getVolume(volumeInfo.getId(), volumeInfo.getDataStore());
 
             copyCmdAnswer = performResignature(volumeInfo, hostVO);
@@ -441,12 +686,11 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
             if (copyCmdAnswer == null || !copyCmdAnswer.getResult()) {
                 if (copyCmdAnswer != null && !StringUtils.isEmpty(copyCmdAnswer.getDetails())) {
                     throw new CloudRuntimeException(copyCmdAnswer.getDetails());
-                }
-                else {
+                } else {
                     throw new CloudRuntimeException("Unable to create a volume from a template");
                 }
             }
-        } catch (InterruptedException | ExecutionException ex) {
+        } catch (InterruptedException | ExecutionException | TimeoutException ex ) {
             volumeInfo.getDataStore().getDriver().deleteAsync(volumeInfo.getDataStore(), volumeInfo, null);
 
             throw new CloudRuntimeException("Create volume from template (ID = " + templateInfo.getId() + ") failed " + ex.getMessage());
@@ -574,8 +818,7 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
         if (copyCmdAnswer == null || !copyCmdAnswer.getResult()) {
             if (copyCmdAnswer != null && !StringUtils.isEmpty(copyCmdAnswer.getDetails())) {
                 throw new CloudRuntimeException(copyCmdAnswer.getDetails());
-            }
-            else {
+            } else {
                 throw new CloudRuntimeException("Unable to create volume from snapshot");
             }
         }
@@ -802,14 +1045,45 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
         return new CopyCmdAnswer(newVolume);
     }
 
+    protected DataObject cacheSnapshotChain(SnapshotInfo snapshot, Scope scope) {
+        DataObject leafData = null;
+        DataStore store = cacheMgr.getCacheStorage(snapshot, scope);
+        while (snapshot != null) {
+            DataObject cacheData = cacheMgr.createCacheObject(snapshot, store);
+            if (leafData == null) {
+                leafData = cacheData;
+            }
+            snapshot = snapshot.getParent();
+        }
+        return leafData;
+    }
+
+    /**
+     * Copies data from secondary storage to a primary volume
+     * @param volumeInfo The primary volume
+     * @param snapshotInfo  destination of the copy
+     * @param hostVO the host used to copy the data
+     * @return result of the copy
+     */
     private CopyCmdAnswer performCopyOfVdi(VolumeInfo volumeInfo, SnapshotInfo snapshotInfo, HostVO hostVO) {
         String value = _configDao.getValue(Config.PrimaryStorageDownloadWait.toString());
         int primaryStorageDownloadWait = NumbersUtil.parseInt(value, Integer.parseInt(Config.PrimaryStorageDownloadWait.getDefaultValue()));
-        CopyCommand copyCommand = new CopyCommand(snapshotInfo.getTO(), volumeInfo.getTO(), primaryStorageDownloadWait, VirtualMachineManager.ExecuteInSequence.value());
 
+        DataObject srcData = snapshotInfo;
         CopyCmdAnswer copyCmdAnswer = null;
+        DataObject cacheData = null;
+        boolean needCacheStorage = needCacheStorage(snapshotInfo, volumeInfo);
+
+        if (needCacheStorage) {
+            cacheData = cacheSnapshotChain(snapshotInfo, new ZoneScope(volumeInfo.getDataCenterId()));
+            srcData = cacheData;
+
+        }
+
+        CopyCommand copyCommand = new CopyCommand(srcData.getTO(), volumeInfo.getTO(), primaryStorageDownloadWait, VirtualMachineManager.ExecuteInSequence.value());
 
         try {
+
             _volumeService.grantAccess(snapshotInfo, hostVO, snapshotInfo.getDataStore());
             _volumeService.grantAccess(volumeInfo, hostVO, volumeInfo.getDataStore());
 
@@ -833,6 +1107,10 @@ public class StorageSystemDataMotionStrategy implements DataMotionStrategy {
         finally {
             _volumeService.revokeAccess(snapshotInfo, hostVO, snapshotInfo.getDataStore());
             _volumeService.revokeAccess(volumeInfo, hostVO, volumeInfo.getDataStore());
+
+            if (needCacheStorage && copyCmdAnswer != null && copyCmdAnswer.getResult()) {
+                performCleanupCacheStorage(cacheData);
+            }
         }
 
         return copyCmdAnswer;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
----------------------------------------------------------------------
diff --git a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotObject.java b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
index 60ff31e..c7e9aa1 100644
--- a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
+++ b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
@@ -239,6 +239,9 @@ public class SnapshotObject implements SnapshotInfo {
     }
 
     @Override
+    public LocationType getLocationType() { return snapshot.getLocationType(); }
+
+    @Override
     public State getState() {
         return snapshot.getState();
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotServiceImpl.java b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotServiceImpl.java
index f7f044f..d13df92 100644
--- a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotServiceImpl.java
+++ b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotServiceImpl.java
@@ -284,7 +284,7 @@ public class SnapshotServiceImpl implements SnapshotService {
             if (res.isFailed()) {
                 throw new CloudRuntimeException(res.getResult());
             }
-            SnapshotInfo destSnapshot = res.getSnashot();
+            SnapshotInfo destSnapshot = res.getSnapshot();
             return destSnapshot;
         } catch (InterruptedException e) {
             s_logger.debug("failed copy snapshot", e);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotStrategyBase.java
----------------------------------------------------------------------
diff --git a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotStrategyBase.java b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotStrategyBase.java
index b08a837..ba16e75 100644
--- a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotStrategyBase.java
+++ b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/SnapshotStrategyBase.java
@@ -28,7 +28,7 @@ public abstract class SnapshotStrategyBase implements SnapshotStrategy {
 
     @Override
     public SnapshotInfo takeSnapshot(SnapshotInfo snapshot) {
-        return snapshotSvr.takeSnapshot(snapshot).getSnashot();
+        return snapshotSvr.takeSnapshot(snapshot).getSnapshot();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/StorageSystemSnapshotStrategy.java
----------------------------------------------------------------------
diff --git a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/StorageSystemSnapshotStrategy.java b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/StorageSystemSnapshotStrategy.java
index 02691ff..591e3a6 100644
--- a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/StorageSystemSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/StorageSystemSnapshotStrategy.java
@@ -16,36 +16,6 @@
 // under the License.
 package org.apache.cloudstack.storage.snapshot;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import javax.inject.Inject;
-
-import org.apache.log4j.Logger;
-import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
-import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
-import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotResult;
-import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
-import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
-import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
-import org.apache.cloudstack.storage.command.SnapshotAndCopyAnswer;
-import org.apache.cloudstack.storage.command.SnapshotAndCopyCommand;
-import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
-import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
-
-import org.springframework.stereotype.Component;
-
-import com.google.common.base.Optional;
-
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.to.DiskTO;
 import com.cloud.dc.dao.ClusterDao;
@@ -57,6 +27,7 @@ import com.cloud.org.Cluster;
 import com.cloud.org.Grouping.AllocationState;
 import com.cloud.resource.ResourceState;
 import com.cloud.server.ManagementService;
+import com.cloud.storage.CreateSnapshotPayload;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.Snapshot;
 import com.cloud.storage.SnapshotVO;
@@ -72,6 +43,33 @@ import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.fsm.NoTransitionException;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.dao.VMInstanceDao;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.cloudstack.engine.subsystem.api.storage.ChapInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreCapabilities;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.StrategyPriority;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService;
+import org.apache.cloudstack.storage.command.SnapshotAndCopyAnswer;
+import org.apache.cloudstack.storage.command.SnapshotAndCopyCommand;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 @Component
 public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
@@ -92,7 +90,34 @@ public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
 
     @Override
     public SnapshotInfo backupSnapshot(SnapshotInfo snapshotInfo) {
-        return snapshotInfo;
+
+        Preconditions.checkArgument(snapshotInfo != null, "backupSnapshot expects a valid snapshot");
+
+        if (snapshotInfo.getLocationType() != Snapshot.LocationType.SECONDARY) {
+
+            markAsBackedUp((SnapshotObject)snapshotInfo);
+            return snapshotInfo;
+        }
+
+        // At this point the snapshot is either taken as a native
+        // snapshot on the storage or exists as a volume on the storage (clone).
+        // If archive flag is passed, we will copy this snapshot to secondary
+        // storage and delete it from the primary storage.
+
+        HostVO host = getHost(snapshotInfo.getVolumeId());
+        boolean canStorageSystemCreateVolumeFromSnapshot = canStorageSystemCreateVolumeFromSnapshot(snapshotInfo.getBaseVolume().getPoolId());
+        boolean computeClusterSupportsResign = clusterDao.getSupportsResigning(host.getClusterId());
+
+        if (!canStorageSystemCreateVolumeFromSnapshot || !computeClusterSupportsResign) {
+            String mesg = "Cannot archive snapshot: Either canStorageSystemCreateVolumeFromSnapshot or " +
+                    "computeClusterSupportsResign were false.  ";
+
+            s_logger.warn(mesg);
+            throw new CloudRuntimeException(mesg);
+        }
+
+        return snapshotSvr.backupSnapshot(snapshotInfo);
+
     }
 
     @Override
@@ -113,6 +138,19 @@ public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
             throw new InvalidParameterValueException("Unable to delete snapshotshot " + snapshotId + " because it is in the following state: " + snapshotVO.getState());
         }
 
+        return cleanupSnapshotOnPrimaryStore(snapshotId);
+    }
+
+    /**
+     * Cleans up a snapshot which was taken on a primary store. This function
+     * removes
+     *
+     * @param snapshotId: ID of snapshot that needs to be removed
+     * @return true if snapshot is removed, false otherwise
+     */
+
+    private boolean cleanupSnapshotOnPrimaryStore(long snapshotId) {
+
         SnapshotObject snapshotObj = (SnapshotObject)snapshotDataFactory.getSnapshot(snapshotId, DataStoreRole.Primary);
 
         if (snapshotObj == null) {
@@ -153,7 +191,6 @@ public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
 
             return false;
         }
-
         return true;
     }
 
@@ -178,6 +215,8 @@ public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
         }
 
         SnapshotResult result = null;
+        SnapshotInfo snapshotOnPrimary = null;
+        SnapshotInfo backedUpSnapshot = null;
 
         try {
             volumeInfo.stateTransit(Volume.Event.SnapshotRequested);
@@ -212,20 +251,46 @@ public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
                 performSnapshotAndCopyOnHostSide(volumeInfo, snapshotInfo);
             }
 
-            markAsBackedUp((SnapshotObject)result.getSnashot());
+            snapshotOnPrimary = result.getSnapshot();
+            backedUpSnapshot = backupSnapshot(snapshotOnPrimary);
+
+            updateLocationTypeInDb(backedUpSnapshot);
+
         }
         finally {
             if (result != null && result.isSuccess()) {
                 volumeInfo.stateTransit(Volume.Event.OperationSucceeded);
-            }
-            else {
+
+                if (snapshotOnPrimary != null && snapshotInfo.getLocationType() == Snapshot.LocationType.SECONDARY) {
+                    // cleanup the snapshot on primary
+                    try {
+                        snapshotSvr.deleteSnapshot(snapshotOnPrimary);
+                    } catch (Exception e) {
+                        s_logger.warn("Failed to clean up snapshot on primary Id:" + snapshotOnPrimary.getId() + " "
+                                + e.getMessage());
+                    }
+                }
+            } else {
                 volumeInfo.stateTransit(Volume.Event.OperationFailed);
             }
-
-            snapshotDao.releaseFromLockTable(snapshotInfo.getId());
         }
 
-        return snapshotInfo;
+        snapshotDao.releaseFromLockTable(snapshotInfo.getId());
+        return backedUpSnapshot;
+    }
+
+    private void updateLocationTypeInDb(SnapshotInfo snapshotInfo) {
+        Object objPayload = snapshotInfo.getPayload();
+
+        if (objPayload instanceof CreateSnapshotPayload) {
+            CreateSnapshotPayload payload = (CreateSnapshotPayload)objPayload;
+
+            SnapshotVO snapshot = snapshotDao.findById(snapshotInfo.getId());
+
+            snapshot.setLocationType(payload.getLocationType());
+
+            snapshotDao.update(snapshotInfo.getId(), snapshot);
+        }
     }
 
     private boolean canStorageSystemCreateVolumeFromSnapshot(long storagePoolId) {
@@ -519,6 +584,13 @@ public class StorageSystemSnapshotStrategy extends SnapshotStrategyBase {
 
         DataStore dataStore = dataStoreMgr.getDataStore(storagePoolId, DataStoreRole.Primary);
 
+        Snapshot.LocationType locationType = snapshot.getLocationType();
+
+        //If the snapshot exisits on Secondary Storage, we can't delete it.
+        if (SnapshotOperation.DELETE.equals(op) && Snapshot.LocationType.SECONDARY.equals(locationType)) {
+            return StrategyPriority.CANT_HANDLE;
+        }
+
         if (dataStore != null) {
             Map<String, String> mapCapabilities = dataStore.getDriver().getCapabilities();
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/XenserverSnapshotStrategy.java
----------------------------------------------------------------------
diff --git a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/XenserverSnapshotStrategy.java b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/XenserverSnapshotStrategy.java
index 2544484..719bea0 100644
--- a/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/XenserverSnapshotStrategy.java
+++ b/engine/storage/snapshot/src/org/apache/cloudstack/storage/snapshot/XenserverSnapshotStrategy.java
@@ -290,7 +290,7 @@ public class XenserverSnapshotStrategy extends SnapshotStrategyBase {
 
     @Override
     public boolean revertSnapshot(SnapshotInfo snapshot) {
-        if (canHandle(snapshot,SnapshotOperation.REVERT) == StrategyPriority.CANT_HANDLE) {
+        if (canHandle(snapshot, SnapshotOperation.REVERT) == StrategyPriority.CANT_HANDLE) {
             throw new CloudRuntimeException("Reverting not supported. Create a template or volume based on the snapshot instead.");
         }
 
@@ -372,7 +372,7 @@ public class XenserverSnapshotStrategy extends SnapshotStrategyBase {
                 }
             }
 
-            snapshot = result.getSnashot();
+            snapshot = result.getSnapshot();
             DataStore primaryStore = snapshot.getDataStore();
 
             SnapshotInfo backupedSnapshot = backupSnapshot(snapshot);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/src/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/storage/src/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java b/engine/storage/src/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
index fea0b77..24bb542 100644
--- a/engine/storage/src/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
+++ b/engine/storage/src/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
@@ -16,25 +16,6 @@
 // under the License.
 package org.apache.cloudstack.storage.image.db;
 
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import javax.naming.ConfigurationException;
-
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
-
-import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
-import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
-import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event;
-import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.State;
-import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
-import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
-
 import com.cloud.storage.DataStoreRole;
 import com.cloud.utils.db.DB;
 import com.cloud.utils.db.GenericDaoBase;
@@ -43,6 +24,22 @@ import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.TransactionLegacy;
 import com.cloud.utils.db.UpdateBuilder;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.State;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
+
+import javax.naming.ConfigurationException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
 
 @Component
 public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO, Long> implements SnapshotDataStoreDao {
@@ -103,6 +100,7 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
         snapshotSearch = createSearchBuilder();
         snapshotSearch.and("snapshot_id", snapshotSearch.entity().getSnapshotId(), SearchCriteria.Op.EQ);
         snapshotSearch.and("store_role", snapshotSearch.entity().getRole(), SearchCriteria.Op.EQ);
+        snapshotSearch.and("state", snapshotSearch.entity().getState(), SearchCriteria.Op.EQ);
         snapshotSearch.done();
 
         storeSnapshotSearch = createSearchBuilder();
@@ -294,6 +292,7 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
         SearchCriteria<SnapshotDataStoreVO> sc = snapshotSearch.create();
         sc.setParameters("snapshot_id", snapshotId);
         sc.setParameters("store_role", role);
+        sc.setParameters("state", State.Ready);
         return findOneBy(sc);
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f46651e6/engine/storage/src/org/apache/cloudstack/storage/snapshot/SnapshotEntityImpl.java
----------------------------------------------------------------------
diff --git a/engine/storage/src/org/apache/cloudstack/storage/snapshot/SnapshotEntityImpl.java b/engine/storage/src/org/apache/cloudstack/storage/snapshot/SnapshotEntityImpl.java
index a660f41..3cea3ea 100644
--- a/engine/storage/src/org/apache/cloudstack/storage/snapshot/SnapshotEntityImpl.java
+++ b/engine/storage/src/org/apache/cloudstack/storage/snapshot/SnapshotEntityImpl.java
@@ -185,6 +185,11 @@ public class SnapshotEntityImpl implements SnapshotEntity {
     }
 
     @Override
+    public LocationType getLocationType() {
+        return null;
+    }
+
+    @Override
     public Class<?> getEntityType() {
         return Snapshot.class;
     }