You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ro...@apache.org on 2020/09/17 04:42:34 UTC

[cloudstack] branch master updated: server: Secondary Storage Usage Improvements (#4053)

This is an automated email from the ASF dual-hosted git repository.

rohit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/master by this push:
     new b464fe4  server: Secondary Storage Usage Improvements (#4053)
b464fe4 is described below

commit b464fe41c67812145731c7c6c3edebb94bc47487
Author: Pearl Dsilva <pe...@gmail.com>
AuthorDate: Thu Sep 17 10:12:10 2020 +0530

    server: Secondary Storage Usage Improvements (#4053)
    
    This feature enables the following:
    Balanced migration of data objects from source Image store to destination Image store(s)
    Complete migration of data
    setting an image store to read-only
    viewing download progress of templates across all data stores
    Related Primate PR: apache/cloudstack-primate#326
---
 api/src/main/java/com/cloud/event/EventTypes.java  |   6 +
 .../java/com/cloud/storage/StorageService.java     |   2 +
 api/src/main/java/com/cloud/storage/Volume.java    |   2 +
 .../org/apache/cloudstack/api/ApiConstants.java    |   5 +
 .../java/org/apache/cloudstack/api/BaseCmd.java    |   3 +
 .../command/admin/storage/ListImageStoresCmd.java  |  10 +-
 .../storage/MigrateSecondaryStorageDataCmd.java    | 115 ++++++
 ...mageStoresCmd.java => UpdateImageStoreCmd.java} |  85 ++--
 .../api/response/ImageStoreResponse.java           |  16 +-
 .../cloudstack/api/response/MigrationResponse.java |  73 ++++
 .../cloudstack/api/response/TemplateResponse.java  |   9 +
 .../cloudstack/storage/ImageStoreService.java      |  14 +-
 .../service/StorageOrchestrationService.java}      |  12 +-
 .../subsystem/api/storage/DataStoreManager.java    |   6 +
 .../subsystem/api/storage/EndPointSelector.java    |   2 +
 .../api/storage/ObjectInDataStoreStateMachine.java |   7 +-
 ...pshotInfo.java => SecondaryStorageService.java} |  41 +-
 .../engine/subsystem/api/storage/SnapshotInfo.java |   4 +
 .../java/com/cloud/storage/StorageManager.java     |   6 +
 .../engine/orchestration/DataMigrationUtility.java | 258 ++++++++++++
 .../engine/orchestration/StorageOrchestrator.java  | 451 +++++++++++++++++++++
 .../spring-engine-orchestration-core-context.xml   |   5 +
 .../src/main/java/com/cloud/host/dao/HostDao.java  |   4 +
 .../main/java/com/cloud/host/dao/HostDaoImpl.java  |  22 +
 .../com/cloud/secstorage/CommandExecLogDao.java    |   1 +
 .../cloud/secstorage/CommandExecLogDaoImpl.java    |  16 +-
 .../storage/datastore/db/ImageStoreDao.java        |   2 +-
 .../storage/datastore/db/ImageStoreDaoImpl.java    |   5 +-
 .../storage/datastore/db/ImageStoreVO.java         |  11 +
 .../storage/datastore/db/SnapshotDataStoreDao.java |   5 +
 .../storage/datastore/db/TemplateDataStoreDao.java |   2 +
 .../resources/META-INF/db/schema-41310to41400.sql  |  27 ++
 .../storage/motion/DataMotionServiceImpl.java      |  11 +-
 .../storage/image/SecondaryStorageServiceImpl.java | 203 ++++++++++
 .../storage/image/TemplateDataFactoryImpl.java     |  11 +-
 .../storage/image/TemplateServiceImpl.java         |   2 +-
 .../manager/ImageStoreProviderManagerImpl.java     |  66 ++-
 .../storage/image/store/TemplateObject.java        |   4 +-
 .../spring-engine-storage-image-core-context.xml   |   6 +-
 .../storage/snapshot/SnapshotObject.java           |  23 +-
 .../storage/datastore/DataStoreManagerImpl.java    |  26 +-
 .../datastore/ObjectInDataStoreManagerImpl.java    |   8 +
 .../storage/endpoint/DefaultEndPointSelector.java  |  23 +-
 .../storage/image/BaseImageStoreDriverImpl.java    | 139 ++++++-
 .../image/datastore/ImageStoreProviderManager.java |   4 +
 .../storage/image/db/SnapshotDataStoreDaoImpl.java |  28 +-
 .../storage/image/db/TemplateDataStoreDaoImpl.java |  10 +
 .../cloudstack/storage/volume/VolumeObject.java    |   9 +-
 .../framework/jobs/dao/AsyncJobDaoImpl.java        |   4 +-
 .../cloudstack/metrics/PrometheusExporterImpl.java |   2 +-
 pom.xml                                            |   1 +
 server/pom.xml                                     |   5 +
 .../java/com/cloud/api/query/QueryManagerImpl.java |  11 +-
 .../cloud/api/query/dao/ImageStoreJoinDaoImpl.java |   1 +
 .../cloud/api/query/dao/TemplateJoinDaoImpl.java   |  27 +-
 .../com/cloud/api/query/vo/ImageStoreJoinVO.java   |   7 +
 .../configuration/ConfigurationManagerImpl.java    |   2 +-
 .../com/cloud/server/ManagementServerImpl.java     |   4 +
 .../com/cloud/storage/ImageStoreServiceImpl.java   | 163 ++++++++
 .../java/com/cloud/storage/StorageManagerImpl.java |  18 +-
 .../com/cloud/storage/VolumeApiServiceImpl.java    |  34 +-
 .../cloud/storage/download/DownloadListener.java   |   2 +-
 .../secondary/SecondaryStorageVmManager.java       |   4 +
 .../cloud/template/HypervisorTemplateAdapter.java  |  44 +-
 .../com/cloud/template/TemplateManagerImpl.java    |   2 +-
 .../diagnostics/DiagnosticsServiceImpl.java        |   2 +-
 .../core/spring-server-core-managers-context.xml   |   2 +
 .../configuration/ConfigurationManagerTest.java    |  16 +-
 .../PremiumSecondaryStorageManagerImpl.java        |  87 +++-
 .../SecondaryStorageManagerImpl.java               |  11 +-
 .../resource/NfsSecondaryStorageResource.java      | 100 ++++-
 test/integration/smoke/test_secondary_storage.py   | 173 ++++++++
 test/integration/smoke/test_templates.py           |  34 ++
 tools/apidoc/gen_toc.py                            |   1 +
 74 files changed, 2351 insertions(+), 206 deletions(-)

diff --git a/api/src/main/java/com/cloud/event/EventTypes.java b/api/src/main/java/com/cloud/event/EventTypes.java
index d147192..d723f56 100644
--- a/api/src/main/java/com/cloud/event/EventTypes.java
+++ b/api/src/main/java/com/cloud/event/EventTypes.java
@@ -70,6 +70,7 @@ import com.cloud.projects.Project;
 import com.cloud.server.ResourceTag;
 import com.cloud.storage.GuestOS;
 import com.cloud.storage.GuestOSHypervisor;
+import com.cloud.storage.ImageStore;
 import com.cloud.storage.Snapshot;
 import com.cloud.storage.StoragePool;
 import com.cloud.storage.Volume;
@@ -239,6 +240,7 @@ public class EventTypes {
     public static final String EVENT_TEMPLATE_EXTRACT = "TEMPLATE.EXTRACT";
     public static final String EVENT_TEMPLATE_UPLOAD = "TEMPLATE.UPLOAD";
     public static final String EVENT_TEMPLATE_CLEANUP = "TEMPLATE.CLEANUP";
+    public static final String EVENT_FILE_MIGRATE = "FILE.MIGRATE";
 
     // Volume Events
     public static final String EVENT_VOLUME_CREATE = "VOLUME.CREATE";
@@ -329,6 +331,8 @@ public class EventTypes {
     public static final String EVENT_STORAGE_IP_RANGE_DELETE = "STORAGE.IP.RANGE.DELETE";
     public static final String EVENT_STORAGE_IP_RANGE_UPDATE = "STORAGE.IP.RANGE.UPDATE";
 
+    public static final String EVENT_IMAGE_STORE_DATA_MIGRATE = "IMAGE.STORE.MIGRATE.DATA";
+
     // Configuration Table
     public static final String EVENT_CONFIGURATION_VALUE_EDIT = "CONFIGURATION.VALUE.EDIT";
 
@@ -1021,6 +1025,8 @@ public class EventTypes {
         entityEventDetails.put(EVENT_POD_ROLLING_MAINTENANCE, PodResponse.class);
         entityEventDetails.put(EVENT_CLUSTER_ROLLING_MAINTENANCE, ClusterResponse.class);
         entityEventDetails.put(EVENT_HOST_ROLLING_MAINTENANCE, HostResponse.class);
+
+        entityEventDetails.put(EVENT_IMAGE_STORE_DATA_MIGRATE, ImageStore.class);
     }
 
     public static String getEntityForEvent(String eventName) {
diff --git a/api/src/main/java/com/cloud/storage/StorageService.java b/api/src/main/java/com/cloud/storage/StorageService.java
index aebbbcd..207fc8f 100644
--- a/api/src/main/java/com/cloud/storage/StorageService.java
+++ b/api/src/main/java/com/cloud/storage/StorageService.java
@@ -102,4 +102,6 @@ public interface StorageService {
      */
     ImageStore migrateToObjectStore(String name, String url, String providerName, Map<String, String> details) throws DiscoveryException;
 
+    ImageStore updateImageStoreStatus(Long id, Boolean readonly);
+
 }
diff --git a/api/src/main/java/com/cloud/storage/Volume.java b/api/src/main/java/com/cloud/storage/Volume.java
index dde9d60..5fd78ef 100644
--- a/api/src/main/java/com/cloud/storage/Volume.java
+++ b/api/src/main/java/com/cloud/storage/Volume.java
@@ -84,6 +84,8 @@ public interface Volume extends ControlledEntity, Identity, InternalIdentity, Ba
             s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Resizing, Event.OperationFailed, Ready, null));
             s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Allocated, Event.UploadRequested, UploadOp, null));
             s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Uploaded, Event.CopyRequested, Copying, null));
+            s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Ready, Event.OperationSucceeded, Ready, null));
+            s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Ready, Event.OperationFailed, Ready, null));
             s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Copying, Event.OperationSucceeded, Ready, Arrays.asList(new StateMachine2.Transition.Impact[]{StateMachine2.Transition.Impact.USAGE})));
             s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Copying, Event.OperationFailed, Uploaded, null));
             s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadOp, Event.DestroyRequested, Destroy, null));
diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index df28455..88f083b 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -114,6 +114,7 @@ public class ApiConstants {
     public static final String DISK_IO_WRITE = "diskiowrite";
     public static final String DISK_IO_PSTOTAL = "diskiopstotal";
     public static final String DISK_SIZE = "disksize";
+    public static final String DOWNLOAD_DETAILS = "downloaddetails";
     public static final String UTILIZATION = "utilization";
     public static final String DRIVER = "driver";
     public static final String ROOT_DISK_SIZE = "rootdisksize";
@@ -235,6 +236,7 @@ public class ApiConstants {
     public static final String MAX_MEMORY = "maxmemory";
     public static final String MIN_CPU_NUMBER = "mincpunumber";
     public static final String MIN_MEMORY = "minmemory";
+    public static final String MIGRATION_TYPE = "migrationtype";
     public static final String MEMORY = "memory";
     public static final String MODE = "mode";
     public static final String KEEPALIVE_ENABLED = "keepaliveenabled";
@@ -355,6 +357,7 @@ public class ApiConstants {
     public static final String TARGET_IQN = "targetiqn";
     public static final String TEMPLATE_FILTER = "templatefilter";
     public static final String TEMPLATE_ID = "templateid";
+    public static final String TEMPLATE_IDS = "templateids";
     public static final String TEMPLATE_NAME = "templatename";
     public static final String ISO_ID = "isoid";
     public static final String TIMEOUT = "timeout";
@@ -789,6 +792,8 @@ public class ApiConstants {
     public static final String EXITCODE = "exitcode";
     public static final String TARGET_ID = "targetid";
     public static final String FILES = "files";
+    public static final String SRC_POOL = "srcpool";
+    public static final String DEST_POOLS = "destpools";
     public static final String VOLUME_IDS = "volumeids";
 
     public static final String ROUTER_ID = "routerid";
diff --git a/api/src/main/java/org/apache/cloudstack/api/BaseCmd.java b/api/src/main/java/org/apache/cloudstack/api/BaseCmd.java
index 5b4c7aa..c897aad 100644
--- a/api/src/main/java/org/apache/cloudstack/api/BaseCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/BaseCmd.java
@@ -40,6 +40,7 @@ import org.apache.cloudstack.network.element.InternalLoadBalancerElementService;
 import org.apache.cloudstack.network.lb.ApplicationLoadBalancerService;
 import org.apache.cloudstack.network.lb.InternalLoadBalancerVMService;
 import org.apache.cloudstack.query.QueryService;
+import org.apache.cloudstack.storage.ImageStoreService;
 import org.apache.cloudstack.usage.UsageService;
 import org.apache.log4j.Logger;
 
@@ -131,6 +132,8 @@ public abstract class BaseCmd {
     @Inject
     public TemplateApiService _templateService;
     @Inject
+    public ImageStoreService _imageStoreService;
+    @Inject
     public SecurityGroupService _securityGroupService;
     @Inject
     public SnapshotApiService _snapshotService;
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java
index 8c37c78..4f7cf81 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java
@@ -16,8 +16,6 @@
 // under the License.
 package org.apache.cloudstack.api.command.admin.storage;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.api.APICommand;
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.BaseListCmd;
@@ -25,6 +23,7 @@ import org.apache.cloudstack.api.Parameter;
 import org.apache.cloudstack.api.response.ImageStoreResponse;
 import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.api.response.ZoneResponse;
+import org.apache.log4j.Logger;
 
 @APICommand(name = "listImageStores", description = "Lists image stores.", responseObject = ImageStoreResponse.class, since = "4.2.0",
         requestHasSensitiveInfo = false, responseHasSensitiveInfo = false)
@@ -52,6 +51,9 @@ public class ListImageStoresCmd extends BaseListCmd {
     @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = ImageStoreResponse.class, description = "the ID of the storage pool")
     private Long id;
 
+    @Parameter(name = ApiConstants.READ_ONLY, type = CommandType.BOOLEAN, entityType = ImageStoreResponse.class, description = "read-only status of the image store", since = "4.15.0")
+    private Boolean readonly;
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -80,6 +82,10 @@ public class ListImageStoresCmd extends BaseListCmd {
         this.provider = provider;
     }
 
+    public Boolean getReadonly() {
+        return readonly;
+    }
+
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/MigrateSecondaryStorageDataCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/MigrateSecondaryStorageDataCmd.java
new file mode 100644
index 0000000..9abbecf
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/MigrateSecondaryStorageDataCmd.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api.command.admin.storage;
+
+import java.util.List;
+
+import org.apache.cloudstack.acl.RoleType;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseAsyncCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.response.ImageStoreResponse;
+import org.apache.cloudstack.api.response.MigrationResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
+
+import com.cloud.event.EventTypes;
+
+@APICommand(name = MigrateSecondaryStorageDataCmd.APINAME,
+        description = "migrates data objects from one secondary storage to destination image store(s)",
+        responseObject = MigrationResponse.class,
+        requestHasSensitiveInfo = false,
+        responseHasSensitiveInfo = false,
+        since = "4.15.0",
+        authorized = {RoleType.Admin})
+public class MigrateSecondaryStorageDataCmd extends BaseAsyncCmd {
+
+    public static final Logger LOGGER = Logger.getLogger(MigrateSecondaryStorageDataCmd.class.getName());
+
+    public static final String APINAME = "migrateSecondaryStorageData";
+
+    /////////////////////////////////////////////////////
+    //////////////// API parameters /////////////////////
+    /////////////////////////////////////////////////////
+
+    @Parameter(name = ApiConstants.SRC_POOL,
+            type = CommandType.UUID,
+            entityType = ImageStoreResponse.class,
+            description = "id of the image store from where the data is to be migrated",
+    required = true)
+    private Long id;
+
+    @Parameter(name = ApiConstants.DEST_POOLS,
+    type = CommandType.LIST,
+    collectionType = CommandType.UUID,
+    entityType = ImageStoreResponse.class,
+    description = "id(s) of the destination secondary storage pool(s) to which the templates are to be migrated",
+    required = true)
+    private List<Long> migrateTo;
+
+    @Parameter(name = ApiConstants.MIGRATION_TYPE,
+    type = CommandType.STRING,
+    description = "Balance: if you want data to be distributed evenly among the destination stores, " +
+            "Complete: If you want to migrate the entire data from source image store to the destination store(s). Default: Complete")
+    private String migrationType;
+
+    /////////////////////////////////////////////////////
+    /////////////////// Accessors ///////////////////////
+    /////////////////////////////////////////////////////
+
+    public Long getId() {
+        return id;
+    }
+
+    public List<Long> getMigrateTo() {
+        return migrateTo;
+    }
+
+    public String getMigrationType() {
+        return migrationType;
+    }
+
+    @Override
+    public String getEventType() {
+        return EventTypes.EVENT_IMAGE_STORE_DATA_MIGRATE;
+    }
+
+    @Override
+    public String getEventDescription() {
+        return "Attempting to migrate files/data objects ";
+    }
+
+    @Override
+    public void execute()  {
+        MigrationResponse response = _imageStoreService.migrateData(this);
+        response.setObjectName("imagestore");
+        this.setResponseObject(response);
+        CallContext.current().setEventDetails(response.getMessage());
+    }
+
+    @Override
+    public String getCommandName() {
+        return APINAME.toLowerCase() + BaseAsyncCmd.RESPONSE_SUFFIX;
+    }
+
+    @Override
+    public long getEntityOwnerId() {
+        return CallContext.current().getCallingAccountId();
+    }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateImageStoreCmd.java
similarity index 52%
copy from api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java
copy to api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateImageStoreCmd.java
index 8c37c78..d7dca93 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateImageStoreCmd.java
@@ -14,70 +14,47 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.cloudstack.api.command.admin.storage;
 
-import org.apache.log4j.Logger;
+package org.apache.cloudstack.api.command.admin.storage;
 
 import org.apache.cloudstack.api.APICommand;
 import org.apache.cloudstack.api.ApiConstants;
-import org.apache.cloudstack.api.BaseListCmd;
+import org.apache.cloudstack.api.ApiErrorCode;
+import org.apache.cloudstack.api.BaseCmd;
 import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.ImageStoreResponse;
-import org.apache.cloudstack.api.response.ListResponse;
-import org.apache.cloudstack.api.response.ZoneResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
 
-@APICommand(name = "listImageStores", description = "Lists image stores.", responseObject = ImageStoreResponse.class, since = "4.2.0",
-        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false)
-public class ListImageStoresCmd extends BaseListCmd {
-    public static final Logger s_logger = Logger.getLogger(ListImageStoresCmd.class.getName());
+import com.cloud.storage.ImageStore;
 
-    private static final String s_name = "listimagestoresresponse";
+@APICommand(name = UpdateImageStoreCmd.APINAME, description = "Updates image store read-only status", responseObject = ImageStoreResponse.class, entityType = {ImageStore.class},
+        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.15.0")
+public class UpdateImageStoreCmd extends BaseCmd {
+    private static final Logger LOG = Logger.getLogger(UpdateImageStoreCmd.class.getName());
+    public static final String APINAME = "updateImageStore";
 
     /////////////////////////////////////////////////////
     //////////////// API parameters /////////////////////
     /////////////////////////////////////////////////////
-
-    @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "the name of the image store")
-    private String storeName;
-
-    @Parameter(name = ApiConstants.PROTOCOL, type = CommandType.STRING, description = "the image store protocol")
-    private String protocol;
-
-    @Parameter(name = ApiConstants.PROVIDER, type = CommandType.STRING, description = "the image store provider")
-    private String provider;
-
-    @Parameter(name = ApiConstants.ZONE_ID, type = CommandType.UUID, entityType = ZoneResponse.class, description = "the Zone ID for the image store")
-    private Long zoneId;
-
-    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = ImageStoreResponse.class, description = "the ID of the storage pool")
+    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = ImageStoreResponse.class, required = true, description = "Image Store UUID")
     private Long id;
 
+    @Parameter(name = ApiConstants.READ_ONLY, type = CommandType.BOOLEAN, required = true, description = "If set to true, it designates the corresponding image store to read-only, " +
+            "hence not considering them during storage migration")
+    private Boolean readonly;
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
 
-    public Long getZoneId() {
-        return zoneId;
-    }
-
-    public String getStoreName() {
-        return storeName;
-    }
-
-    public String getProtocol() {
-        return protocol;
-    }
-
     public Long getId() {
         return id;
     }
 
-    public String getProvider() {
-        return provider;
-    }
-
-    public void setProvider(String provider) {
-        this.provider = provider;
+    public Boolean getReadonly() {
+        return readonly;
     }
 
     /////////////////////////////////////////////////////
@@ -85,14 +62,28 @@ public class ListImageStoresCmd extends BaseListCmd {
     /////////////////////////////////////////////////////
 
     @Override
+    public void execute() {
+        ImageStore result = _storageService.updateImageStoreStatus(getId(), getReadonly());
+        ImageStoreResponse storeResponse = null;
+        if (result != null) {
+            storeResponse = _responseGenerator.createImageStoreResponse(result);
+            storeResponse.setResponseName(getCommandName()+"response");
+            storeResponse.setObjectName("imagestore");
+            setResponseObject(storeResponse);
+        } else {
+            throw new ServerApiException(ApiErrorCode.INTERNAL_ERROR, "Failed to update Image store status");
+        }
+
+    }
+
+    @Override
     public String getCommandName() {
-        return s_name;
+        return APINAME;
     }
 
+
     @Override
-    public void execute() {
-        ListResponse<ImageStoreResponse> response = _queryService.searchForImageStores(this);
-        response.setResponseName(getCommandName());
-        this.setResponseObject(response);
+    public long getEntityOwnerId() {
+        return CallContext.current().getCallingAccountId();
     }
 }
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/ImageStoreResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/ImageStoreResponse.java
index fba9b2d..190181e 100644
--- a/api/src/main/java/org/apache/cloudstack/api/response/ImageStoreResponse.java
+++ b/api/src/main/java/org/apache/cloudstack/api/response/ImageStoreResponse.java
@@ -16,8 +16,6 @@
 // under the License.
 package org.apache.cloudstack.api.response;
 
-import com.google.gson.annotations.SerializedName;
-
 import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.BaseResponse;
 import org.apache.cloudstack.api.EntityReference;
@@ -25,6 +23,7 @@ import org.apache.cloudstack.api.EntityReference;
 import com.cloud.serializer.Param;
 import com.cloud.storage.ImageStore;
 import com.cloud.storage.ScopeType;
+import com.google.gson.annotations.SerializedName;
 
 @EntityReference(value = ImageStore.class)
 public class ImageStoreResponse extends BaseResponse {
@@ -60,6 +59,10 @@ public class ImageStoreResponse extends BaseResponse {
     @Param(description = "the scope of the image store")
     private ScopeType scope;
 
+    @SerializedName("readonly")
+    @Param(description = "defines if store is read-only")
+    private Boolean readonly;
+
     @SerializedName("disksizetotal")
     @Param(description = "the total disk size of the host")
     private Long diskSizeTotal;
@@ -140,6 +143,12 @@ public class ImageStoreResponse extends BaseResponse {
         this.protocol = protocol;
     }
 
+    public Boolean getReadonly() {
+        return readonly;
+    }
+
+    public void setReadonly(Boolean readonly) { this.readonly = readonly; }
+
     public void setDiskSizeTotal(Long diskSizeTotal) {
         this.diskSizeTotal = diskSizeTotal;
     }
@@ -147,5 +156,4 @@ public class ImageStoreResponse extends BaseResponse {
     public void setDiskSizeUsed(Long diskSizeUsed) {
         this.diskSizeUsed = diskSizeUsed;
     }
-
-}
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/MigrationResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/MigrationResponse.java
new file mode 100644
index 0000000..c67b1d2
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/response/MigrationResponse.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api.response;
+
+import org.apache.cloudstack.api.BaseResponse;
+import org.apache.cloudstack.api.EntityReference;
+
+import com.cloud.serializer.Param;
+import com.cloud.storage.ImageStore;
+import com.google.gson.annotations.SerializedName;
+
+@EntityReference(value = ImageStore.class)
+public class MigrationResponse extends BaseResponse {
+    @SerializedName("message")
+    @Param(description = "Response message from migration of secondary storage data objects")
+    private String message;
+
+    @SerializedName("migrationtype")
+    @Param(description = "Type of migration requested for")
+    private String migrationType;
+
+    @SerializedName("success")
+    @Param(description = "true if operation is executed successfully")
+    private boolean success;
+
+    MigrationResponse() {
+    }
+
+    public MigrationResponse(String message, String migrationType, boolean success) {
+        this.message = message;
+        this.migrationType = migrationType;
+        this.success = success;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String getMigrationType() {
+        return migrationType;
+    }
+
+    public void setMigrationType(String migrationType) {
+        this.migrationType = migrationType;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+}
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/TemplateResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/TemplateResponse.java
index 81fc2f3..094fe2a 100644
--- a/api/src/main/java/org/apache/cloudstack/api/response/TemplateResponse.java
+++ b/api/src/main/java/org/apache/cloudstack/api/response/TemplateResponse.java
@@ -18,6 +18,7 @@ package org.apache.cloudstack.api.response;
 
 import java.util.Date;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -173,6 +174,10 @@ public class TemplateResponse extends BaseResponseWithTagInformation implements
     @Param(description = "additional key/value details tied with template")
     private Map details;
 
+    @SerializedName(ApiConstants.DOWNLOAD_DETAILS)
+    @Param(description = "Lists the download progress of a template across all secondary storages")
+    private List<Map<String, String>> downloadDetails;
+
     @SerializedName(ApiConstants.BITS)
     @Param(description = "the processor bit size", since = "4.10")
     private int bits;
@@ -255,6 +260,10 @@ public class TemplateResponse extends BaseResponseWithTagInformation implements
         this.isPublic = isPublic;
     }
 
+    public void setDownloadProgress(List<Map<String, String>> downloadDetails) {
+        this.downloadDetails = downloadDetails;
+    }
+
     public void setCreated(Date created) {
         this.created = created;
     }
diff --git a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java b/api/src/main/java/org/apache/cloudstack/storage/ImageStoreService.java
similarity index 68%
copy from engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
copy to api/src/main/java/org/apache/cloudstack/storage/ImageStoreService.java
index fb57563..b8f14ad 100644
--- a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
+++ b/api/src/main/java/org/apache/cloudstack/storage/ImageStoreService.java
@@ -14,12 +14,16 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package com.cloud.secstorage;
 
-import java.util.Date;
+package org.apache.cloudstack.storage;
 
-import com.cloud.utils.db.GenericDao;
+import org.apache.cloudstack.api.command.admin.storage.MigrateSecondaryStorageDataCmd;
+import org.apache.cloudstack.api.response.MigrationResponse;
 
-public interface CommandExecLogDao extends GenericDao<CommandExecLogVO, Long> {
-    public void expungeExpiredRecords(Date cutTime);
+public interface ImageStoreService {
+
+    public static enum MigrationPolicy {
+        BALANCE, COMPLETE
+    }
+    MigrationResponse migrateData(MigrateSecondaryStorageDataCmd cmd);
 }
diff --git a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java b/engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/StorageOrchestrationService.java
similarity index 67%
copy from engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
copy to engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/StorageOrchestrationService.java
index fb57563..7bf845d 100644
--- a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/orchestration/service/StorageOrchestrationService.java
@@ -14,12 +14,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package com.cloud.secstorage;
 
-import java.util.Date;
+package org.apache.cloudstack.engine.orchestration.service;
 
-import com.cloud.utils.db.GenericDao;
+import java.util.List;
 
-public interface CommandExecLogDao extends GenericDao<CommandExecLogVO, Long> {
-    public void expungeExpiredRecords(Date cutTime);
+import org.apache.cloudstack.api.response.MigrationResponse;
+import org.apache.cloudstack.storage.ImageStoreService.MigrationPolicy;
+
+public interface StorageOrchestrationService {
+    MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatastores, MigrationPolicy migrationPolicy);
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/DataStoreManager.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/DataStoreManager.java
index ad5b162..80e3ce1 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/DataStoreManager.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/DataStoreManager.java
@@ -33,10 +33,16 @@ public interface DataStoreManager {
 
     List<DataStore> getImageStoresByScope(ZoneScope scope);
 
+    List<DataStore> getImageStoresByScopeExcludingReadOnly(ZoneScope scope);
+
     DataStore getRandomImageStore(long zoneId);
 
+    DataStore getRandomUsableImageStore(long zoneId);
+
     DataStore getImageStoreWithFreeCapacity(long zoneId);
 
+    DataStore getImageStoreWithFreeCapacity(List<DataStore> imageStores);
+
     List<DataStore> listImageStoresWithFreeCapacity(long zoneId);
 
     List<DataStore> getImageCacheStores(Scope scope);
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/EndPointSelector.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/EndPointSelector.java
index 0613a11..ec27250 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/EndPointSelector.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/EndPointSelector.java
@@ -33,6 +33,8 @@ public interface EndPointSelector {
 
     List<EndPoint> selectAll(DataStore store);
 
+    List<EndPoint> findAllEndpointsForScope(DataStore store);
+
     EndPoint select(Scope scope, Long storeId);
 
     EndPoint select(DataStore store, String downloadUrl);
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/ObjectInDataStoreStateMachine.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/ObjectInDataStoreStateMachine.java
index 204cab0..611d124 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/ObjectInDataStoreStateMachine.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/ObjectInDataStoreStateMachine.java
@@ -29,6 +29,7 @@ public interface ObjectInDataStoreStateMachine extends StateObject<ObjectInDataS
         Ready("Template downloading is accomplished"),
         Copying("The object is being coping"),
         Migrating("The object is being migrated"),
+        Migrated("The object has been migrated"),
         Destroying("Template is destroying"),
         Destroyed("Template is destroyed"),
         Failed("Failed to download template");
@@ -49,12 +50,16 @@ public interface ObjectInDataStoreStateMachine extends StateObject<ObjectInDataS
         DestroyRequested,
         OperationSuccessed,
         OperationFailed,
+        CopyRequested,
         CopyingRequested,
         MigrationRequested,
+        MigrationSucceeded,
+        MigrationFailed,
         MigrationCopyRequested,
         MigrationCopySucceeded,
         MigrationCopyFailed,
         ResizeRequested,
-        ExpungeRequested
+        ExpungeRequested,
+        MigrateDataRequested
     }
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SecondaryStorageService.java
similarity index 54%
copy from engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java
copy to engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SecondaryStorageService.java
index ef72afc..07828fd 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SecondaryStorageService.java
@@ -14,35 +14,30 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.cloudstack.engine.subsystem.api.storage;
-
-import com.cloud.storage.Snapshot;
-import com.cloud.utils.exception.CloudRuntimeException;
-
-public interface SnapshotInfo extends DataObject, Snapshot {
-    SnapshotInfo getParent();
-
-    String getPath();
 
-    SnapshotInfo getChild();
-
-    VolumeInfo getBaseVolume();
-
-    void addPayload(Object data);
-
-    Object getPayload();
+package org.apache.cloudstack.engine.subsystem.api.storage;
 
-    void setFullBackup(Boolean fullBackup);
+import java.util.List;
+import java.util.Map;
 
-    Boolean getFullBackup();
+import org.apache.cloudstack.framework.async.AsyncCallFuture;
+import org.apache.cloudstack.storage.command.CommandResult;
 
-    Long getDataCenterId();
+import com.cloud.utils.Pair;
 
-    ObjectInDataStoreStateMachine.State getStatus();
+public interface SecondaryStorageService {
+    class DataObjectResult extends CommandResult {
+        private final DataObject data;
 
-    boolean isRevertable();
+        public DataObjectResult(DataObject data) {
+            super();
+            this.data = data;
+        }
 
-    long getPhysicalSize();
+        public DataObject getData() {
+            return this.data;
+        }
 
-    void markBackedUp() throws CloudRuntimeException;
+    }
+    AsyncCallFuture<DataObjectResult> migrateData(DataObject srcDataObject, DataStore srcDatastore, DataStore destDatastore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain);
 }
diff --git a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java
index ef72afc..58a82ac 100644
--- a/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java
+++ b/engine/api/src/main/java/org/apache/cloudstack/engine/subsystem/api/storage/SnapshotInfo.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.apache.cloudstack.engine.subsystem.api.storage;
 
+import java.util.List;
+
 import com.cloud.storage.Snapshot;
 import com.cloud.utils.exception.CloudRuntimeException;
 
@@ -26,6 +28,8 @@ public interface SnapshotInfo extends DataObject, Snapshot {
 
     SnapshotInfo getChild();
 
+    List<SnapshotInfo> getChildren();
+
     VolumeInfo getBaseVolume();
 
     void addPayload(Object data);
diff --git a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
index 62a241b..0f52206 100644
--- a/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
+++ b/engine/components-api/src/main/java/com/cloud/storage/StorageManager.java
@@ -112,6 +112,12 @@ public interface StorageManager extends StorageService {
     ConfigKey<Integer> PRIMARY_STORAGE_DOWNLOAD_WAIT = new ConfigKey<Integer>("Storage", Integer.class, "primary.storage.download.wait", "10800",
             "In second, timeout for download template to primary storage", false);
 
+    ConfigKey<Integer>  SecStorageMaxMigrateSessions = new ConfigKey<Integer>("Advanced", Integer.class, "secstorage.max.migrate.sessions", "2",
+            "The max number of concurrent copy command execution sessions that an SSVM can handle", true, ConfigKey.Scope.Global);
+
+    ConfigKey<Integer> MaxDataMigrationWaitTime = new ConfigKey<Integer>("Advanced", Integer.class, "max.data.migration.wait.time", "15",
+            "Maximum wait time for a data migration task before spawning a new SSVM", false, ConfigKey.Scope.Global);
+
     /**
      * Returns a comma separated list of tags for the specified storage pool
      * @param poolId
diff --git a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
new file mode 100644
index 0000000..86b7350
--- /dev/null
+++ b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    /**
+     *  This function verifies if the given image store contains data objects that are not in any of the following states:
+     *  "Ready" "Allocated", "Destroying", "Destroyed", "Failed". If this is the case, and if the migration policy is complete,
+     *  the migration is terminated.
+     */
+    private boolean filesReadyToMigrate(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated", "Destroying", "Destroyed", "Failed"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= (Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= (Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= (Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReadyToMigrate(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as there are data objects which are not Ready - i.e, they may be in Migrating, creating, copying, etc. states");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so as to make
+     * an informed decision of picking the datastore with maximum free capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;
+            }
+        });
+        HashMap<Long, Pair<Long, Long>> temp = new LinkedHashMap<>();
+        for (Map.Entry<Long, Pair<Long, Long>> value : list) {
+            temp.put(value.getKey(), value.getValue());
+        }
+
+        return new ArrayList<>(temp.keySet());
+    }
+
+    protected List<DataObject> getSortedValidSourcesList(DataStore srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<DataObject> files = new ArrayList<>();
+        files.addAll(getAllReadyTemplates(srcDataStore));
+        files.addAll(getAllReadySnapshotsAndChains(srcDataStore, snapshotChains));
+        files.addAll(getAllReadyVolumes(srcDataStore));
+
+        files = sortFilesOnSize(files, snapshotChains);
+
+        return files;
+    }
+
+    protected List<DataObject> sortFilesOnSize(List<DataObject> files, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        Collections.sort(files, new Comparator<DataObject>() {
+            @Override
+            public int compare(DataObject o1, DataObject o2) {
+                Long size1 = o1.getSize();
+                Long size2 = o2.getSize();
+                if (o1 instanceof SnapshotInfo) {
+                    size1 = snapshotChains.get(o1).second();
+                }
+                if (o2 instanceof  SnapshotInfo) {
+                    size2 = snapshotChains.get(o2).second();
+                }
+                return size2 > size1 ? 1 : -1;
+            }
+        });
+        return files;
+    }
+
+    protected List<DataObject> getAllReadyTemplates(DataStore srcDataStore) {
+
+        List<DataObject> files = new LinkedList<>();
+        List<TemplateDataStoreVO> templates = templateDataStoreDao.listByStoreId(srcDataStore.getId());
+        for (TemplateDataStoreVO template : templates) {
+            VMTemplateVO templateVO = templateDao.findById(template.getTemplateId());
+            if (template.getState() == ObjectInDataStoreStateMachine.State.Ready && !templateVO.isPublicTemplate()) {
+                files.add(templateFactory.getTemplate(template.getTemplateId(), srcDataStore));
+            }
+        }
+        return files;
+    }
+
+    /** Returns parent snapshots and snapshots that do not have any children; snapshotChains comprises of the snapshot chain info
+     * for each parent snapshot and the cumulative size of the chain - this is done to ensure that all the snapshots in a chain
+     * are migrated to the same datastore
+     */
+    protected List<DataObject> getAllReadySnapshotsAndChains(DataStore srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<SnapshotInfo> files = new LinkedList<>();
+        List<SnapshotDataStoreVO> snapshots = snapshotDataStoreDao.listByStoreId(srcDataStore.getId(), DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            SnapshotVO snapshotVO = snapshotDao.findById(snapshot.getSnapshotId());
+            if (snapshot.getState() == ObjectInDataStoreStateMachine.State.Ready && snapshot.getParentSnapshotId() == 0 ) {
+                SnapshotInfo snap = snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), DataStoreRole.Image);
+                files.add(snap);
+            }
+        }
+
+        for (SnapshotInfo parent : files) {
+            List<SnapshotInfo> chain = new ArrayList<>();
+            chain.add(parent);
+            for (int i =0; i< chain.size(); i++) {
+                SnapshotInfo child = chain.get(i);
+                List<SnapshotInfo> children = child.getChildren();
+                if (children != null) {
+                    chain.addAll(children);
+                }
+            }
+            snapshotChains.put(parent, new Pair<List<SnapshotInfo>, Long>(chain, getSizeForChain(chain)));
+        }
+
+        return (List<DataObject>) (List<?>) files;
+    }
+
+    protected Long getSizeForChain(List<SnapshotInfo> chain) {
+        Long size = 0L;
+        for (SnapshotInfo snapshot : chain) {
+            size += snapshot.getSize();
+        }
+        return size;
+    }
+
+
+    protected List<DataObject> getAllReadyVolumes(DataStore srcDataStore) {
+        List<DataObject> files = new LinkedList<>();
+        List<VolumeDataStoreVO> volumes = volumeDataStoreDao.listByStoreId(srcDataStore.getId());
+        for (VolumeDataStoreVO volume : volumes) {
+            if (volume.getState() == ObjectInDataStoreStateMachine.State.Ready) {
+                files.add(volumeFactory.getVolume(volume.getVolumeId(), srcDataStore));
+            }
+        }
+        return files;
+    }
+
+    /** Returns the count of active SSVMs - SSVM with agents in connected state, so as to dynamically increase the thread pool
+     * size when SSVMs scale
+     */
+    protected int activeSSVMCount(DataStore dataStore) {
+        long datacenterId = dataStore.getScope().getScopeId();
+        List<SecondaryStorageVmVO> ssvms =
+                secStorageVmDao.getSecStorageVmListInStates(null, datacenterId, VirtualMachine.State.Running, VirtualMachine.State.Migrating);
+        int activeSSVMs = 0;
+        for (SecondaryStorageVmVO vm : ssvms) {
+            String name = "s-"+vm.getId()+"-VM";
+            HostVO ssHost = hostDao.findByName(name);
+            if (ssHost != null) {
+                if (ssHost.getState() == Status.Up) {
+                    activeSSVMs++;
+                }
+            }
+        }
+        return activeSSVMs;
+    }
+}
diff --git a/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/StorageOrchestrator.java b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/StorageOrchestrator.java
new file mode 100644
index 0000000..85b182a
--- /dev/null
+++ b/engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/StorageOrchestrator.java
@@ -0,0 +1,451 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.api.response.MigrationResponse;
+import org.apache.cloudstack.engine.orchestration.service.StorageOrchestrationService;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.SecondaryStorageService;
+import org.apache.cloudstack.engine.subsystem.api.storage.SecondaryStorageService.DataObjectResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.framework.async.AsyncCallFuture;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.Configurable;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.storage.ImageStoreService.MigrationPolicy;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.log4j.Logger;
+
+import com.cloud.server.StatsCollector;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StorageService;
+import com.cloud.storage.StorageStats;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class StorageOrchestrator extends ManagerBase implements StorageOrchestrationService, Configurable {
+
+    private static final Logger s_logger = Logger.getLogger(StorageOrchestrator.class);
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    SnapshotDao snapshotDao;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    DataStoreManager dataStoreManager;
+    @Inject
+    StatsCollector statsCollector;
+    @Inject
+    public StorageService storageService;
+    @Inject
+    ConfigurationDao configDao;
+    @Inject
+    private SecondaryStorageService secStgSrv;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    DataMigrationUtility migrationHelper;
+
+    ConfigKey<Double> ImageStoreImbalanceThreshold = new ConfigKey<>("Advanced", Double.class,
+            "image.store.imbalance.threshold",
+            "0.3",
+            "The storage imbalance threshold that is compared with the standard deviation percentage for a storage utilization metric. " +
+                    "The value is a percentage in decimal format.",
+            true, ConfigKey.Scope.Global);
+
+    Integer numConcurrentCopyTasksPerSSVM = 2;
+    private double imageStoreCapacityThreshold = 0.90;
+
+    @Override
+    public String getConfigComponentName() {
+        return StorageOrchestrationService.class.getName();
+    }
+
+    @Override
+    public ConfigKey<?>[] getConfigKeys() {
+        return new ConfigKey<?>[]{ImageStoreImbalanceThreshold};
+    }
+
+    static class MigrateBlockingQueue<T> extends ArrayBlockingQueue<T> {
+
+        MigrateBlockingQueue(int size) {
+            super(size);
+        }
+
+        public boolean offer(T task) {
+            try {
+                this.put(task);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return true;
+        }
+    }
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+        numConcurrentCopyTasksPerSSVM = StorageManager.SecStorageMaxMigrateSessions.value();
+        return true;
+    }
+
+    @Override
+    public MigrationResponse migrateData(Long srcDataStoreId, List<Long> destDatastores, MigrationPolicy migrationPolicy) {
+        List<DataObject> files = new LinkedList<>();
+        boolean success = true;
+        String message = null;
+
+        migrationHelper.checkIfCompleteMigrationPossible(migrationPolicy, srcDataStoreId);
+        DataStore srcDatastore = dataStoreManager.getDataStore(srcDataStoreId, DataStoreRole.Image);
+        Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains = new HashMap<>();
+        files = migrationHelper.getSortedValidSourcesList(srcDatastore, snapshotChains);
+
+        if (files.isEmpty()) {
+            return new MigrationResponse("No files in Image store "+srcDatastore.getId()+ " to migrate", migrationPolicy.toString(), true);
+        }
+        Map<Long, Pair<Long, Long>> storageCapacities = new Hashtable<>();
+        for (Long storeId : destDatastores) {
+            storageCapacities.put(storeId, new Pair<>(null, null));
+        }
+        storageCapacities.put(srcDataStoreId, new Pair<>(null, null));
+        if (migrationPolicy == MigrationPolicy.COMPLETE) {
+            s_logger.debug("Setting source image store "+srcDatastore.getId()+ " to read-only");
+            storageService.updateImageStoreStatus(srcDataStoreId, true);
+        }
+
+        storageCapacities = getStorageCapacities(storageCapacities, srcDataStoreId);
+        double meanstddev = getStandardDeviation(storageCapacities);
+        double threshold = ImageStoreImbalanceThreshold.value();
+        MigrationResponse response = null;
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM, 30,
+                TimeUnit.MINUTES, new MigrateBlockingQueue<>(numConcurrentCopyTasksPerSSVM));
+        Date start = new Date();
+        if (meanstddev < threshold && migrationPolicy == MigrationPolicy.BALANCE) {
+            s_logger.debug("mean std deviation of the image stores is below threshold, no migration required");
+            response = new MigrationResponse("Migration not required as system seems balanced", migrationPolicy.toString(), true);
+            return response;
+        }
+
+        List<Future<AsyncCallFuture<DataObjectResult>>> futures = new ArrayList<>();
+        while (true) {
+            DataObject chosenFileForMigration = null;
+            if (files.size() > 0) {
+                chosenFileForMigration = files.remove(0);
+            }
+
+            storageCapacities = getStorageCapacities(storageCapacities, srcDataStoreId);
+            List<Long> orderedDS = migrationHelper.sortDataStores(storageCapacities);
+            Long destDatastoreId = orderedDS.get(0);
+
+            if (chosenFileForMigration == null || destDatastoreId == null || (destDatastoreId == srcDatastore.getId() && migrationPolicy == MigrationPolicy.BALANCE) ) {
+                Pair<String, Boolean> result = migrateCompleted(destDatastoreId, srcDatastore, files, migrationPolicy);
+                message = result.first();
+                success = result.second();
+                break;
+            }
+
+            if (migrationPolicy == MigrationPolicy.COMPLETE && destDatastoreId == srcDatastore.getId()) {
+                destDatastoreId = orderedDS.get(1);
+            }
+
+            if (chosenFileForMigration.getSize() > storageCapacities.get(destDatastoreId).first()) {
+                s_logger.debug("file: " + chosenFileForMigration.getId() + " too large to be migrated to " + destDatastoreId);
+                continue;
+            }
+
+            if (shouldMigrate(chosenFileForMigration, srcDatastore.getId(), destDatastoreId, storageCapacities, snapshotChains, migrationPolicy)) {
+                storageCapacities = migrateAway(chosenFileForMigration, storageCapacities, snapshotChains, srcDatastore, destDatastoreId, executor, futures);
+            } else {
+                if (migrationPolicy == MigrationPolicy.BALANCE) {
+                    continue;
+                }
+                message = "Complete migration failed. Please set the source Image store to read-write mode if you want to continue using it";
+                success = false;
+                break;
+            }
+        }
+        Date end = new Date();
+        handleSnapshotMigration(srcDataStoreId, start, end, migrationPolicy, futures, storageCapacities, executor);
+        return handleResponse(futures, migrationPolicy, message, success);
+    }
+
+    protected Pair<String, Boolean> migrateCompleted(Long destDatastoreId, DataStore srcDatastore, List<DataObject> files, MigrationPolicy migrationPolicy) {
+        String message = "";
+        boolean success = true;
+        if (destDatastoreId == srcDatastore.getId() && !files.isEmpty()) {
+            if (migrationPolicy == MigrationPolicy.BALANCE) {
+                s_logger.debug("Migration completed : data stores have been balanced ");
+                if (destDatastoreId == srcDatastore.getId()) {
+                    message = "Seems like source datastore has more free capacity than the destination(s)";
+                }
+                message += "Image stores have been attempted to be balanced";
+                success = true;
+            } else {
+                message = "Files not completely migrated from "+ srcDatastore.getId() + ". Datastore (source): " + srcDatastore.getId() + "has equal or more free space than destination."+
+                        " If you want to continue using the Image Store, please change the read-only status using 'update imagestore' command";
+                success = false;
+            }
+        } else {
+            message = "Migration completed";
+        }
+        return new Pair<String, Boolean>(message, success);
+    }
+
+    protected Map<Long, Pair<Long, Long>> migrateAway(DataObject chosenFileForMigration, Map<Long, Pair<Long, Long>> storageCapacities,
+                               Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains, DataStore srcDatastore, Long destDatastoreId, ThreadPoolExecutor executor,
+    List<Future<AsyncCallFuture<DataObjectResult>>> futures) {
+        Long fileSize = migrationHelper.getFileSize(chosenFileForMigration, snapshotChains);
+        storageCapacities = assumeMigrate(storageCapacities, srcDatastore.getId(), destDatastoreId, fileSize);
+        long activeSsvms = migrationHelper.activeSSVMCount(srcDatastore);
+        long totalJobs = activeSsvms * numConcurrentCopyTasksPerSSVM;
+        // Increase thread pool size with increase in number of SSVMs
+        if ( totalJobs > executor.getCorePoolSize()) {
+            executor.setMaximumPoolSize((int) (totalJobs));
+            executor.setCorePoolSize((int) (totalJobs));
+        }
+
+        MigrateDataTask task = new MigrateDataTask(chosenFileForMigration, srcDatastore, dataStoreManager.getDataStore(destDatastoreId, DataStoreRole.Image));
+        if (chosenFileForMigration instanceof SnapshotInfo ) {
+            task.setSnapshotChains(snapshotChains);
+        }
+        futures.add((executor.submit(task)));
+        s_logger.debug("Migration of file  " + chosenFileForMigration.getId() + " is initiated");
+        return storageCapacities;
+    }
+
+
+
+    private MigrationResponse handleResponse(List<Future<AsyncCallFuture<DataObjectResult>>> futures, MigrationPolicy migrationPolicy, String message, boolean success) {
+        int successCount = 0;
+        for (Future<AsyncCallFuture<DataObjectResult>> future : futures) {
+            try {
+                AsyncCallFuture<DataObjectResult> res = future.get();
+                if (res.get().isSuccess()) {
+                    successCount++;
+                }
+            } catch ( InterruptedException | ExecutionException e) {
+                s_logger.warn("Failed to get result");
+                continue;
+            }
+        }
+        message += ". successful migrations: "+successCount;
+        return new MigrationResponse(message, migrationPolicy.toString(), success);
+    }
+
+    private void handleSnapshotMigration(Long srcDataStoreId, Date start, Date end, MigrationPolicy policy,
+                                          List<Future<AsyncCallFuture<DataObjectResult>>> futures, Map<Long, Pair<Long, Long>> storageCapacities, ThreadPoolExecutor executor) {
+        DataStore srcDatastore = dataStoreManager.getDataStore(srcDataStoreId, DataStoreRole.Image);
+        List<SnapshotDataStoreVO> snaps = snapshotDataStoreDao.findSnapshots(srcDataStoreId, start, end);
+        if (!snaps.isEmpty()) {
+            for (SnapshotDataStoreVO snap : snaps) {
+                SnapshotVO snapshotVO = snapshotDao.findById(snap.getSnapshotId());
+                SnapshotInfo snapshotInfo = snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), DataStoreRole.Image);
+                SnapshotInfo parentSnapshot = snapshotInfo.getParent();
+
+                if (parentSnapshot == null && policy == MigrationPolicy.COMPLETE) {
+                    List<Long> dstores = migrationHelper.sortDataStores(storageCapacities);
+                    Long storeId = dstores.get(0);
+                    if (storeId.equals(srcDataStoreId)) {
+                        storeId = dstores.get(1);
+                    }
+                    DataStore datastore =  dataStoreManager.getDataStore(storeId, DataStoreRole.Image);
+                    futures.add(executor.submit(new MigrateDataTask(snapshotInfo, srcDatastore, datastore)));
+                }
+                if (parentSnapshot != null) {
+                    DataStore parentDS = dataStoreManager.getDataStore(parentSnapshot.getDataStore().getId(), DataStoreRole.Image);
+                    if (parentDS.getId() != snapshotInfo.getDataStore().getId()) {
+                        futures.add(executor.submit(new MigrateDataTask(snapshotInfo, srcDatastore, parentDS)));
+                    }
+                }
+            }
+        }
+    }
+
+    private Map<Long, Pair<Long, Long>> getStorageCapacities(Map<Long, Pair<Long, Long>> storageCapacities, Long srcDataStoreId) {
+        Map<Long, Pair<Long, Long>> capacities = new Hashtable<>();
+        for (Long storeId : storageCapacities.keySet()) {
+            StorageStats stats = statsCollector.getStorageStats(storeId);
+            if (stats != null) {
+                if (storageCapacities.get(storeId) == null || storageCapacities.get(storeId).first() == null || storageCapacities.get(storeId).second() == null) {
+                    capacities.put(storeId, new Pair<>(stats.getCapacityBytes() - stats.getByteUsed(), stats.getCapacityBytes()));
+                } else {
+                    long totalCapacity = stats.getCapacityBytes();
+                    Long freeCapacity = totalCapacity - stats.getByteUsed();
+                    if (storeId.equals(srcDataStoreId) || freeCapacity < storageCapacities.get(storeId).first()) {
+                        capacities.put(storeId, new Pair<>(freeCapacity, totalCapacity));
+                    } else {
+                        capacities.put(storeId, storageCapacities.get(storeId));
+                    }
+                }
+            } else {
+                throw new CloudRuntimeException("Stats Collector hasn't yet collected metrics from the Image store, kindly try again later");
+            }
+        }
+        return capacities;
+    }
+
+
+    /**
+     *
+     * @param storageCapacities Map comprising the metrics(free and total capacities) of the images stores considered
+     * @return mean standard deviation
+     */
+    private double getStandardDeviation(Map<Long, Pair<Long, Long>> storageCapacities) {
+        double[] freeCapacities = storageCapacities.values().stream().mapToDouble(x -> ((double) x.first() / x.second())).toArray();
+        double mean = calculateStorageMean(freeCapacities);
+        return (calculateStorageStandardDeviation(freeCapacities, mean) / mean);
+    }
+
+    /**
+     *
+     * @param storageCapacities Map comprising the metrics(free and total capacities) of the images stores considered
+     * @param srcDsId source image store ID from where data is to be migrated
+     * @param destDsId destination image store ID to where data is to be migrated
+     * @param fileSize size of the data object to be migrated so as to recompute the storage metrics
+     * @return a map - Key: Datastore ID ;  Value: Pair<Free Capacity , Total Capacity>
+     */
+    private Map<Long, Pair<Long, Long>> assumeMigrate(Map<Long, Pair<Long, Long>> storageCapacities, Long srcDsId, Long destDsId, Long fileSize) {
+        Map<Long, Pair<Long, Long>> modifiedCapacities = new Hashtable<>();
+        modifiedCapacities.putAll(storageCapacities);
+        Pair<Long, Long> srcDSMetrics = storageCapacities.get(srcDsId);
+        Pair<Long, Long> destDSMetrics = storageCapacities.get(destDsId);
+        modifiedCapacities.put(srcDsId, new Pair<>(srcDSMetrics.first() + fileSize, srcDSMetrics.second()));
+        modifiedCapacities.put(destDsId, new Pair<>(destDSMetrics.first() - fileSize, destDSMetrics.second()));
+        return modifiedCapacities;
+    }
+
+    /**
+     * This function determines if migration should in fact take place or not :
+     *  - For Balanced migration - the mean standard deviation is calculated before and after (supposed) migration
+     *      and a decision is made if migration is afterall beneficial
+     *  - For Complete migration - We check if the destination image store has sufficient capacity i.e., below the threshold of (90%)
+     *      and then proceed with the migration
+     * @param chosenFile file for migration
+     * @param srcDatastoreId source image store ID from where data is to be migrated
+     * @param destDatastoreId destination image store ID to where data is to be migrated
+     * @param storageCapacities Map comprising the metrics(free and total capacities) of the images stores considered
+     * @param snapshotChains Map containing details of chain of snapshots and their cumulative size
+     * @param migrationPolicy determines whether a "Balance" or "Complete" migration operation is to be performed
+     * @return
+     */
+    private boolean shouldMigrate(DataObject chosenFile, Long srcDatastoreId, Long destDatastoreId, Map<Long, Pair<Long, Long>> storageCapacities,
+                                  Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains, MigrationPolicy migrationPolicy) {
+
+        if (migrationPolicy == MigrationPolicy.BALANCE) {
+            double meanStdDevCurrent = getStandardDeviation(storageCapacities);
+
+            Long fileSize = migrationHelper.getFileSize(chosenFile, snapshotChains);
+            Map<Long, Pair<Long, Long>> proposedCapacities = assumeMigrate(storageCapacities, srcDatastoreId, destDatastoreId, fileSize);
+            double meanStdDevAfter = getStandardDeviation(proposedCapacities);
+
+            if (meanStdDevAfter > meanStdDevCurrent) {
+                s_logger.debug("migrating the file doesn't prove to be beneficial, skipping migration");
+                return false;
+            }
+
+            Double threshold = ImageStoreImbalanceThreshold.value();
+            if (meanStdDevCurrent > threshold && storageCapacityBelowThreshold(storageCapacities, destDatastoreId)) {
+                return true;
+            }
+            return  true;
+        } else {
+            if (storageCapacityBelowThreshold(storageCapacities, destDatastoreId)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean storageCapacityBelowThreshold(Map<Long, Pair<Long, Long>> storageCapacities, Long destStoreId) {
+        Pair<Long, Long> imageStoreCapacity = storageCapacities.get(destStoreId);
+        if (imageStoreCapacity != null && (imageStoreCapacity.first() / (imageStoreCapacity.second() * 1.0)) <= imageStoreCapacityThreshold) {
+            s_logger.debug("image store: " + destStoreId + " has sufficient capacity to proceed with migration of file");
+            return true;
+        }
+        s_logger.debug("Image store capacity threshold exceeded, migration not possible");
+        return false;
+    }
+
+    private double calculateStorageMean(double[] storageMetrics) {
+        return new Mean().evaluate(storageMetrics);
+    }
+
+    private double calculateStorageStandardDeviation(double[] metricValues, double mean) {
+        StandardDeviation standardDeviation = new StandardDeviation(false);
+        return standardDeviation.evaluate(metricValues, mean);
+    }
+
+    private class MigrateDataTask implements Callable<AsyncCallFuture<DataObjectResult>> {
+        private DataObject file;
+        private DataStore srcDataStore;
+        private DataStore destDataStore;
+        private Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain;
+        public MigrateDataTask(DataObject file, DataStore srcDataStore, DataStore destDataStore) {
+            this.file = file;
+            this.srcDataStore = srcDataStore;
+            this.destDataStore = destDataStore;
+        }
+
+        public void setSnapshotChains(Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+            this.snapshotChain = snapshotChain;
+        }
+
+        public Map<DataObject, Pair<List<SnapshotInfo>, Long>> getSnapshotChain() {
+            return snapshotChain;
+        }
+        public DataObject getFile() {
+            return file;
+        }
+
+        @Override
+        public AsyncCallFuture<DataObjectResult> call() throws Exception {
+            return secStgSrv.migrateData(file, srcDataStore, destDataStore, snapshotChain);
+        }
+    }
+}
diff --git a/engine/orchestration/src/main/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/src/main/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
index 3ded395..66335a6 100644
--- a/engine/orchestration/src/main/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
+++ b/engine/orchestration/src/main/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
@@ -44,6 +44,11 @@
             value="#{storagePoolAllocatorsRegistry.registered}" />
     </bean>
 
+    <bean id="storageOrchestrator"
+          class="org.apache.cloudstack.engine.orchestration.StorageOrchestrator"/>
+    <bean id="dataMigrationHelper"
+          class="org.apache.cloudstack.engine.orchestration.DataMigrationUtility"/>
+
     <bean id="clusteredVirtualMachineManagerImpl" class="com.cloud.vm.ClusteredVirtualMachineManagerImpl">
         <property name="hostAllocators" value="#{hostAllocatorsRegistry.registered}" />
         <property name="storagePoolAllocators"
diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java
index 88e8eac..3d76c8b 100644
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDao.java
@@ -39,6 +39,8 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
 
     Integer countAllByType(final Host.Type type);
 
+    Integer countAllByTypeInZone(long zoneId, final Host.Type type);
+
     /**
      * Mark all hosts associated with a certain management server
      * as disconnected.
@@ -121,4 +123,6 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
     List<HostVO> listByHostCapability(Host.Type type, Long clusterId, Long podId, long dcId, String hostCapabilty);
 
     List<HostVO> listByClusterAndHypervisorType(long clusterId, HypervisorType hypervisorType);
+
+    HostVO findByName(String name);
 }
diff --git a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
index d28357d..75304c1 100644
--- a/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java
@@ -107,6 +107,7 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
     protected SearchBuilder<HostVO> UnmanagedApplianceSearch;
     protected SearchBuilder<HostVO> MaintenanceCountSearch;
     protected SearchBuilder<HostVO> HostTypeCountSearch;
+    protected SearchBuilder<HostVO> HostTypeZoneCountSearch;
     protected SearchBuilder<HostVO> ClusterStatusSearch;
     protected SearchBuilder<HostVO> TypeNameZoneSearch;
     protected SearchBuilder<HostVO> AvailHypevisorInZone;
@@ -167,6 +168,12 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         HostTypeCountSearch.and("removed", HostTypeCountSearch.entity().getRemoved(), SearchCriteria.Op.NULL);
         HostTypeCountSearch.done();
 
+        HostTypeZoneCountSearch = createSearchBuilder();
+        HostTypeZoneCountSearch.and("type", HostTypeZoneCountSearch.entity().getType(), SearchCriteria.Op.EQ);
+        HostTypeZoneCountSearch.and("dc", HostTypeZoneCountSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+        HostTypeZoneCountSearch.and("removed", HostTypeZoneCountSearch.entity().getRemoved(), SearchCriteria.Op.NULL);
+        HostTypeZoneCountSearch.done();
+
         TypePodDcStatusSearch = createSearchBuilder();
         HostVO entity = TypePodDcStatusSearch.entity();
         TypePodDcStatusSearch.and("type", entity.getType(), SearchCriteria.Op.EQ);
@@ -448,6 +455,14 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
     }
 
     @Override
+    public Integer countAllByTypeInZone(long zoneId, Type type) {
+        SearchCriteria<HostVO> sc = HostTypeCountSearch.create();
+        sc.setParameters("type", type);
+        sc.setParameters("dc", zoneId);
+        return getCount(sc);
+    }
+
+    @Override
     public List<HostVO> listByDataCenterId(long id) {
         SearchCriteria<HostVO> sc = DcSearch.create();
         sc.setParameters("dc", id);
@@ -1261,6 +1276,13 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         return listBy(sc);
     }
 
+    @Override
+    public HostVO findByName(String name) {
+        SearchCriteria<HostVO> sc = NameSearch.create();
+        sc.setParameters("name", name);
+        return findOneBy(sc);
+    }
+
     private ResultSet executeSqlGetResultsetForMethodFindHostInZoneToExecuteCommand(HypervisorType hypervisorType, long zoneId, TransactionLegacy tx, String sql) throws SQLException {
         PreparedStatement pstmt = tx.prepareAutoCloseStatement(sql);
         pstmt.setString(1, Objects.toString(hypervisorType));
diff --git a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java b/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
index fb57563..98fc8c8 100644
--- a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
+++ b/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDao.java
@@ -22,4 +22,5 @@ import com.cloud.utils.db.GenericDao;
 
 public interface CommandExecLogDao extends GenericDao<CommandExecLogVO, Long> {
     public void expungeExpiredRecords(Date cutTime);
+    public Integer getCopyCmdCountForSSVM(Long id);
 }
diff --git a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDaoImpl.java b/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDaoImpl.java
index ac438b0..f89a1bb 100644
--- a/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDaoImpl.java
+++ b/engine/schema/src/main/java/com/cloud/secstorage/CommandExecLogDaoImpl.java
@@ -17,7 +17,7 @@
 package com.cloud.secstorage;
 
 import java.util.Date;
-
+import java.util.List;
 
 import org.springframework.stereotype.Component;
 
@@ -30,11 +30,16 @@ import com.cloud.utils.db.SearchCriteria.Op;
 public class CommandExecLogDaoImpl extends GenericDaoBase<CommandExecLogVO, Long> implements CommandExecLogDao {
 
     protected final SearchBuilder<CommandExecLogVO> ExpungeSearch;
+    protected final SearchBuilder<CommandExecLogVO> CommandSearch;
 
     public CommandExecLogDaoImpl() {
         ExpungeSearch = createSearchBuilder();
         ExpungeSearch.and("created", ExpungeSearch.entity().getCreated(), Op.LT);
         ExpungeSearch.done();
+
+        CommandSearch = createSearchBuilder();
+        CommandSearch.and("host_id", CommandSearch.entity().getHostId(), Op.EQ);
+        CommandSearch.and("command_name", CommandSearch.entity().getCommandName(), Op.EQ);
     }
 
     @Override
@@ -43,4 +48,13 @@ public class CommandExecLogDaoImpl extends GenericDaoBase<CommandExecLogVO, Long
         sc.setParameters("created", cutTime);
         expunge(sc);
     }
+
+    @Override
+    public Integer getCopyCmdCountForSSVM(Long id) {
+        SearchCriteria<CommandExecLogVO> sc = CommandSearch.create();
+        sc.setParameters("host_id", id);
+        sc.setParameters("command_name", "CopyCommand");
+        List<CommandExecLogVO> copyCmds = customSearch(sc, null);
+        return copyCmds.size();
+    }
 }
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDao.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDao.java
index 1861b21..84cba70 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDao.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDao.java
@@ -29,7 +29,7 @@ public interface ImageStoreDao extends GenericDao<ImageStoreVO, Long> {
 
     List<ImageStoreVO> findByProvider(String provider);
 
-    List<ImageStoreVO> findByScope(ZoneScope scope);
+    List<ImageStoreVO> findByZone(ZoneScope scope, Boolean readonly);
 
     List<ImageStoreVO> findRegionImageStores();
 
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDaoImpl.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDaoImpl.java
index 38124ea..6ecac5e 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDaoImpl.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreDaoImpl.java
@@ -77,9 +77,12 @@ public class ImageStoreDaoImpl extends GenericDaoBase<ImageStoreVO, Long> implem
     }
 
     @Override
-    public List<ImageStoreVO> findByScope(ZoneScope scope) {
+    public List<ImageStoreVO> findByZone(ZoneScope scope, Boolean readonly) {
         SearchCriteria<ImageStoreVO> sc = createSearchCriteria();
         sc.addAnd("role", SearchCriteria.Op.EQ, DataStoreRole.Image);
+        if (readonly != null) {
+            sc.addAnd("readonly", SearchCriteria.Op.EQ, readonly);
+        }
         if (scope.getScopeId() != null) {
             SearchCriteria<ImageStoreVO> scc = createSearchCriteria();
             scc.addOr("scope", SearchCriteria.Op.EQ, ScopeType.REGION);
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreVO.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreVO.java
index 2c70677..d245827 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreVO.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/ImageStoreVO.java
@@ -74,6 +74,9 @@ public class ImageStoreVO implements ImageStore {
     @Enumerated(value = EnumType.STRING)
     private DataStoreRole role;
 
+    @Column(name = "readonly")
+    private boolean readonly = false;
+
     @Column(name = "parent")
     private String parent;
 
@@ -165,6 +168,14 @@ public class ImageStoreVO implements ImageStore {
         return created;
     }
 
+    public void setReadonly(boolean readonly) {
+        this.readonly = readonly;
+    }
+
+    public boolean isReadonly() {
+        return readonly;
+    }
+
     public void setCreated(Date created) {
         this.created = created;
     }
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java
index 91ea07c..3263cbb 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/SnapshotDataStoreDao.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.cloudstack.storage.datastore.db;
 
+import java.util.Date;
 import java.util.List;
 
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
@@ -42,6 +43,8 @@ StateDao<ObjectInDataStoreStateMachine.State, ObjectInDataStoreStateMachine.Even
 
     SnapshotDataStoreVO findBySnapshot(long snapshotId, DataStoreRole role);
 
+    SnapshotDataStoreVO findBySourceSnapshot(long snapshotId, DataStoreRole role);
+
     List<SnapshotDataStoreVO> listDestroyed(long storeId);
 
     List<SnapshotDataStoreVO> findBySnapshotId(long snapshotId);
@@ -72,5 +75,7 @@ StateDao<ObjectInDataStoreStateMachine.State, ObjectInDataStoreStateMachine.Even
 
     List<SnapshotDataStoreVO> listByState(ObjectInDataStoreStateMachine.State... states);
 
+    List<SnapshotDataStoreVO> findSnapshots(Long storeId, Date start, Date end);
+
     SnapshotDataStoreVO findDestroyedReferenceBySnapshot(long snapshotId, DataStoreRole role);
 }
diff --git a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/TemplateDataStoreDao.java b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/TemplateDataStoreDao.java
index a6e609e..fc695f4 100644
--- a/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/TemplateDataStoreDao.java
+++ b/engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/TemplateDataStoreDao.java
@@ -66,6 +66,8 @@ public interface TemplateDataStoreDao extends GenericDao<TemplateDataStoreVO, Lo
 
     List<TemplateDataStoreVO> listByTemplate(long templateId);
 
+    List<TemplateDataStoreVO> listByTemplateNotBypassed(long templateId);
+
     TemplateDataStoreVO findByTemplateZoneReady(long templateId, Long zoneId);
 
     void duplicateCacheRecordsOnRegionStore(long storeId);
diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41310to41400.sql b/engine/schema/src/main/resources/META-INF/db/schema-41310to41400.sql
index baa7bcf..a1a2e74 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41310to41400.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41310to41400.sql
@@ -53,6 +53,33 @@ ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `backup_offering_id` bigint unsigne
 ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `backup_external_id` varchar(255) DEFAULT NULL COMMENT 'ID of external backup job or container if any';
 ALTER TABLE `cloud`.`vm_instance` ADD COLUMN `backup_volumes` text DEFAULT NULL COMMENT 'details of backedup volumes';
 
+ALTER TABLE `cloud`.`image_store` ADD COLUMN `readonly` boolean DEFAULT false COMMENT 'defines status of image store';
+
+ALTER VIEW `cloud`.`image_store_view` AS
+    select
+        image_store.id,
+        image_store.uuid,
+        image_store.name,
+        image_store.image_provider_name,
+        image_store.protocol,
+        image_store.url,
+        image_store.scope,
+        image_store.role,
+        image_store.readonly,
+        image_store.removed,
+        data_center.id data_center_id,
+        data_center.uuid data_center_uuid,
+        data_center.name data_center_name,
+        image_store_details.name detail_name,
+        image_store_details.value detail_value
+    from
+        `cloud`.`image_store`
+            left join
+        `cloud`.`data_center` ON image_store.data_center_id = data_center.id
+            left join
+        `cloud`.`image_store_details` ON image_store_details.store_id = image_store.id;
+
+
 CREATE TABLE IF NOT EXISTS `cloud`.`backups` (
   `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
   `uuid` varchar(40) NOT NULL UNIQUE,
diff --git a/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/DataMotionServiceImpl.java b/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/DataMotionServiceImpl.java
index c2724e6..ac6c855 100644
--- a/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/DataMotionServiceImpl.java
+++ b/engine/storage/datamotion/src/main/java/org/apache/cloudstack/storage/motion/DataMotionServiceImpl.java
@@ -25,12 +25,6 @@ import java.util.Map;
 
 import javax.inject.Inject;
 
-import com.cloud.storage.Volume;
-import com.cloud.storage.VolumeVO;
-import com.cloud.storage.dao.VolumeDao;
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
-
 import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionService;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionStrategy;
@@ -39,9 +33,14 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.StorageStrategyFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
 import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
 
 import com.cloud.agent.api.to.VirtualMachineTO;
 import com.cloud.host.Host;
+import com.cloud.storage.Volume;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.VolumeDao;
 import com.cloud.utils.StringUtils;
 import com.cloud.utils.exception.CloudRuntimeException;
 
diff --git a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/SecondaryStorageServiceImpl.java b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/SecondaryStorageServiceImpl.java
new file mode 100644
index 0000000..5a9c4a9
--- /dev/null
+++ b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/SecondaryStorageServiceImpl.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.image;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionService;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SecondaryStorageService;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.framework.async.AsyncCallFuture;
+import org.apache.cloudstack.framework.async.AsyncCallbackDispatcher;
+import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
+import org.apache.cloudstack.framework.async.AsyncRpcContext;
+import org.apache.cloudstack.storage.command.CopyCmdAnswer;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.secstorage.CommandExecLogDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.utils.Pair;
+
+public class SecondaryStorageServiceImpl implements SecondaryStorageService {
+
+    private static final Logger s_logger = Logger.getLogger(SecondaryStorageServiceImpl.class);
+
+    @Inject
+    DataMotionService motionSrv;
+    @Inject
+    CommandExecLogDao _cmdExecLogDao;
+    @Inject
+    TemplateDataStoreDao templateStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+
+    private class MigrateDataContext<T> extends AsyncRpcContext<T> {
+        final DataObject srcData;
+        final DataObject destData;
+        final AsyncCallFuture<DataObjectResult> future;
+
+        /**
+         * @param callback
+         */
+        public MigrateDataContext(AsyncCompletionCallback<T> callback, AsyncCallFuture<DataObjectResult> future, DataObject srcData, DataObject destData, DataStore destStore) {
+            super(callback);
+            this.srcData = srcData;
+            this.destData = destData;
+            this.future = future;
+        }
+    }
+
+    @Override
+    public AsyncCallFuture<DataObjectResult> migrateData(DataObject srcDataObject, DataStore srcDatastore, DataStore destDatastore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        AsyncCallFuture<DataObjectResult> future = new AsyncCallFuture<DataObjectResult>();
+        DataObjectResult res = new DataObjectResult(srcDataObject);
+        DataObject destDataObject = null;
+        try {
+            if (srcDataObject instanceof SnapshotInfo && snapshotChain != null && snapshotChain.containsKey(srcDataObject)) {
+                for (SnapshotInfo snapshotInfo : snapshotChain.get(srcDataObject).first()) {
+                    destDataObject = destDatastore.create(snapshotInfo);
+                    snapshotInfo.processEvent(ObjectInDataStoreStateMachine.Event.MigrateDataRequested);
+                    destDataObject.processEvent(ObjectInDataStoreStateMachine.Event.MigrateDataRequested);
+                    migrateJob(future, snapshotInfo, destDataObject, destDatastore);
+                }
+            } else {
+                // Check if template in destination store, if yes, do not proceed
+                if (srcDataObject instanceof TemplateInfo) {
+                    s_logger.debug("Checking if template present at destination");
+                    TemplateDataStoreVO templateStoreVO = templateStoreDao.findByStoreTemplate(destDatastore.getId(), srcDataObject.getId());
+                    if (templateStoreVO != null) {
+                        String msg = "Template already exists in destination store";
+                        s_logger.debug(msg);
+                        res.setResult(msg);
+                        res.setSuccess(true);
+                        future.complete(res);
+                        return future;
+                    }
+                }
+                destDataObject = destDatastore.create(srcDataObject);
+                srcDataObject.processEvent(ObjectInDataStoreStateMachine.Event.MigrateDataRequested);
+                destDataObject.processEvent(ObjectInDataStoreStateMachine.Event.MigrateDataRequested);
+                migrateJob(future, srcDataObject, destDataObject, destDatastore);
+            }
+        } catch (Exception e) {
+            s_logger.debug("Failed to copy Data", e);
+            if (destDataObject != null) {
+                destDataObject.getDataStore().delete(destDataObject);
+            }
+            if (!(srcDataObject instanceof VolumeInfo)) {
+                srcDataObject.processEvent(ObjectInDataStoreStateMachine.Event.OperationFailed);
+            } else {
+                ((VolumeInfo) srcDataObject).processEventOnly(ObjectInDataStoreStateMachine.Event.OperationFailed);
+            }
+            res.setResult(e.toString());
+            future.complete(res);
+        }
+        return future;
+    }
+
+    protected void migrateJob(AsyncCallFuture<DataObjectResult> future, DataObject srcDataObject, DataObject destDataObject, DataStore destDatastore) throws ExecutionException, InterruptedException {
+        MigrateDataContext<DataObjectResult> context = new MigrateDataContext<DataObjectResult>(null, future, srcDataObject, destDataObject, destDatastore);
+        AsyncCallbackDispatcher<SecondaryStorageServiceImpl, CopyCommandResult> caller = AsyncCallbackDispatcher.create(this);
+        caller.setCallback(caller.getTarget().migrateDataCallBack(null, null)).setContext(context);
+        motionSrv.copyAsync(srcDataObject, destDataObject, caller);
+    }
+
+    /**
+     * Callback function to handle state change of source and destination data objects based on the success or failure of the migrate task
+     */
+    protected Void migrateDataCallBack(AsyncCallbackDispatcher<SecondaryStorageServiceImpl, CopyCommandResult> callback, MigrateDataContext<DataObjectResult> context) throws ExecutionException, InterruptedException {
+        DataObject srcData = context.srcData;
+        DataObject destData = context.destData;
+        CopyCommandResult result = callback.getResult();
+        AsyncCallFuture<DataObjectResult> future = context.future;
+        DataObjectResult res = new DataObjectResult(srcData);
+        CopyCmdAnswer answer = (CopyCmdAnswer) result.getAnswer();
+        try {
+            if (!answer.getResult()) {
+                s_logger.warn("Migration failed for "+srcData.getUuid());
+                res.setResult(result.getResult());
+                if (!(srcData instanceof VolumeInfo) ) {
+                    srcData.processEvent(ObjectInDataStoreStateMachine.Event.OperationFailed);
+                    destData.processEvent(ObjectInDataStoreStateMachine.Event.MigrationFailed);
+                    destData.processEvent(ObjectInDataStoreStateMachine.Event.DestroyRequested);
+                } else {
+                    ((VolumeInfo)srcData).processEventOnly(ObjectInDataStoreStateMachine.Event.OperationFailed);
+                    ((VolumeInfo)destData).processEventOnly(ObjectInDataStoreStateMachine.Event.MigrationFailed);
+                    ((VolumeInfo)destData).processEventOnly(ObjectInDataStoreStateMachine.Event.DestroyRequested);
+                }
+
+                if (destData != null) {
+                    destData.getDataStore().delete(destData);
+                }
+
+            } else {
+                if (destData instanceof  VolumeInfo) {
+                    ((VolumeInfo) destData).processEventOnly(ObjectInDataStoreStateMachine.Event.OperationSuccessed, answer);
+                } else {
+                    destData.processEvent(ObjectInDataStoreStateMachine.Event.OperationSuccessed, answer);
+                }
+                if (destData instanceof SnapshotInfo) {
+                    SnapshotDataStoreVO snapshotStore = snapshotStoreDao.findBySourceSnapshot(srcData.getId(), DataStoreRole.Image);
+                    SnapshotDataStoreVO destSnapshotStore = snapshotStoreDao.findBySnapshot(srcData.getId(), DataStoreRole.Image);
+                    destSnapshotStore.setPhysicalSize(snapshotStore.getPhysicalSize());
+                    snapshotStoreDao.update(destSnapshotStore.getId(), destSnapshotStore);
+                }
+
+                if (destData instanceof VolumeInfo) {
+                    VolumeDataStoreVO srcVolume = volumeDataStoreDao.findByStoreVolume(srcData.getDataStore().getId(), srcData.getId());
+                    VolumeDataStoreVO destVolume = volumeDataStoreDao.findByStoreVolume(destData.getDataStore().getId(), destData.getId());
+                    destVolume.setPhysicalSize(srcVolume.getPhysicalSize());
+                    volumeDataStoreDao.update(destVolume.getId(), destVolume);
+                }
+                s_logger.debug("Deleting source data");
+                srcData.getDataStore().delete(srcData);
+                s_logger.debug("Successfully migrated "+srcData.getUuid());
+            }
+            _cmdExecLogDao.expunge(Long.parseLong(answer.getContextParam("cmd")));
+            future.complete(res);
+        } catch (Exception e) {
+            s_logger.error("Failed to process migrate data callback", e);
+            res.setResult(e.toString());
+            _cmdExecLogDao.expunge(Long.parseLong(answer.getContextParam("cmd")));
+            future.complete(res);
+        }
+        return null;
+    }
+
+}
+
+
diff --git a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateDataFactoryImpl.java b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateDataFactoryImpl.java
index 8343a74..043af9a 100644
--- a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateDataFactoryImpl.java
+++ b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateDataFactoryImpl.java
@@ -18,21 +18,17 @@
  */
 package org.apache.cloudstack.storage.image;
 
-import com.cloud.host.HostVO;
-import com.cloud.host.dao.HostDao;
 import java.util.ArrayList;
 import java.util.List;
 
 import javax.inject.Inject;
 
-import com.cloud.hypervisor.Hypervisor;
-import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.cloudstack.direct.download.DirectDownloadManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateInfo;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
@@ -41,11 +37,15 @@ import org.apache.cloudstack.storage.image.store.TemplateObject;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.VMTemplateStoragePoolVO;
 import com.cloud.storage.VMTemplateVO;
 import com.cloud.storage.dao.VMTemplateDao;
 import com.cloud.storage.dao.VMTemplatePoolDao;
+import com.cloud.utils.exception.CloudRuntimeException;
 
 @Component
 public class TemplateDataFactoryImpl implements TemplateDataFactory {
@@ -230,5 +230,4 @@ public class TemplateDataFactoryImpl implements TemplateDataFactory {
         VMTemplateVO templateVO = imageDataDao.findById(templateId);
         return templateVO.isDirectDownload();
     }
-
 }
diff --git a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateServiceImpl.java b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateServiceImpl.java
index ee05630..00bc7e4 100644
--- a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateServiceImpl.java
+++ b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateServiceImpl.java
@@ -253,7 +253,7 @@ public class TemplateServiceImpl implements TemplateService {
     @Override
     public void handleSysTemplateDownload(HypervisorType hostHyper, Long dcId) {
         Set<VMTemplateVO> toBeDownloaded = new HashSet<VMTemplateVO>();
-        List<DataStore> stores = _storeMgr.getImageStoresByScope(new ZoneScope(dcId));
+        List<DataStore> stores = _storeMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(dcId));
         if (stores == null || stores.isEmpty()) {
             return;
         }
diff --git a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/manager/ImageStoreProviderManagerImpl.java b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/manager/ImageStoreProviderManagerImpl.java
index 80e5b38..1ca155c 100644
--- a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/manager/ImageStoreProviderManagerImpl.java
+++ b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/manager/ImageStoreProviderManagerImpl.java
@@ -33,6 +33,9 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProviderManag
 import org.apache.cloudstack.engine.subsystem.api.storage.ImageStoreProvider;
 import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
 import org.apache.cloudstack.engine.subsystem.api.storage.ZoneScope;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.Configurable;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
 import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
 import org.apache.cloudstack.storage.image.ImageStoreDriver;
@@ -47,7 +50,7 @@ import com.cloud.storage.ScopeType;
 import com.cloud.storage.dao.VMTemplateDao;
 
 @Component
-public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager {
+public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager, Configurable {
     private static final Logger s_logger = Logger.getLogger(ImageStoreProviderManagerImpl.class);
     @Inject
     ImageStoreDao dataStoreDao;
@@ -57,8 +60,14 @@ public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager
     DataStoreProviderManager providerManager;
     @Inject
     StatsCollector _statsCollector;
+    @Inject
+    ConfigurationDao configDao;
+
     Map<String, ImageStoreDriver> driverMaps;
 
+    static final ConfigKey<String> ImageStoreAllocationAlgorithm = new ConfigKey<String>("Advanced", String.class, "image.store.allocation.algorithm", "firstfitleastconsumed",
+            "firstfitleastconsumed','random' : Order in which hosts within a cluster will be considered for VM/volume allocation", true, ConfigKey.Scope.Global );
+
     @PostConstruct
     public void config() {
         driverMaps = new HashMap<String, ImageStoreDriver>();
@@ -110,7 +119,7 @@ public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager
 
     @Override
     public List<DataStore> listImageStoresByScope(ZoneScope scope) {
-        List<ImageStoreVO> stores = dataStoreDao.findByScope(scope);
+        List<ImageStoreVO> stores = dataStoreDao.findByZone(scope, null);
         List<DataStore> imageStores = new ArrayList<DataStore>();
         for (ImageStoreVO store : stores) {
             imageStores.add(getImageStore(store.getId()));
@@ -119,6 +128,24 @@ public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager
     }
 
     @Override
+    public List<DataStore> listImageStoresByScopeExcludingReadOnly(ZoneScope scope) {
+        String allocationAlgorithm = ImageStoreAllocationAlgorithm.value();
+
+        List<ImageStoreVO> stores = dataStoreDao.findByZone(scope, Boolean.FALSE);
+        List<DataStore> imageStores = new ArrayList<DataStore>();
+        for (ImageStoreVO store : stores) {
+            imageStores.add(getImageStore(store.getId()));
+        }
+        if (allocationAlgorithm.equals("random")) {
+            Collections.shuffle(imageStores);
+            return imageStores;
+        } else if (allocationAlgorithm.equals("firstfitleastconsumed")) {
+            return orderImageStoresOnFreeCapacity(imageStores);
+        }
+        return null;
+    }
+
+    @Override
     public List<DataStore> listImageStoreByProvider(String provider) {
         List<ImageStoreVO> stores = dataStoreDao.findByProvider(provider);
         List<DataStore> imageStores = new ArrayList<DataStore>();
@@ -179,6 +206,31 @@ public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager
     }
 
     @Override
+    public List<DataStore> orderImageStoresOnFreeCapacity(List<DataStore> imageStores) {
+        List<DataStore> stores = new ArrayList<>();
+        if (imageStores.size() > 1) {
+            imageStores.sort(new Comparator<DataStore>() { // Sort data stores based on free capacity
+                @Override
+                public int compare(DataStore store1, DataStore store2) {
+                    return Long.compare(_statsCollector.imageStoreCurrentFreeCapacity(store1),
+                            _statsCollector.imageStoreCurrentFreeCapacity(store2));
+                }
+            });
+            for (DataStore imageStore : imageStores) {
+                // Return image store if used percentage is less then threshold value i.e. 90%.
+                if (_statsCollector.imageStoreHasEnoughCapacity(imageStore)) {
+                    stores.add(imageStore);
+                }
+            }
+        } else if (imageStores.size() == 1) {
+            if (_statsCollector.imageStoreHasEnoughCapacity(imageStores.get(0))) {
+                stores.add(imageStores.get(0));
+            }
+        }
+        return stores;
+    }
+
+    @Override
     public List<DataStore> listImageStoresWithFreeCapacity(List<DataStore> imageStores) {
         List<DataStore> stores = new ArrayList<>();
         for (DataStore imageStore : imageStores) {
@@ -195,4 +247,14 @@ public class ImageStoreProviderManagerImpl implements ImageStoreProviderManager
         }
         return stores;
     }
+
+    @Override
+    public String getConfigComponentName() {
+        return ImageStoreProviderManager.class.getSimpleName();
+    }
+
+    @Override
+    public ConfigKey<?>[] getConfigKeys() {
+        return new ConfigKey<?>[] { ImageStoreAllocationAlgorithm };
+    }
 }
diff --git a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/store/TemplateObject.java b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/store/TemplateObject.java
index 25f27a2..86030f2 100644
--- a/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/store/TemplateObject.java
+++ b/engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/store/TemplateObject.java
@@ -208,7 +208,9 @@ public class TemplateObject implements TemplateInfo {
                     CopyCmdAnswer cpyAnswer = (CopyCmdAnswer)answer;
                     TemplateObjectTO newTemplate = (TemplateObjectTO)cpyAnswer.getNewData();
                     TemplateDataStoreVO templateStoreRef = templateStoreDao.findByStoreTemplate(getDataStore().getId(), getId());
-                    templateStoreRef.setInstallPath(newTemplate.getPath());
+                    if (newTemplate.getPath() != null) {
+                        templateStoreRef.setInstallPath(newTemplate.getPath());
+                    }
                     templateStoreRef.setDownloadPercent(100);
                     templateStoreRef.setDownloadState(Status.DOWNLOADED);
                     templateStoreRef.setSize(newTemplate.getSize());
diff --git a/engine/storage/image/src/main/resources/META-INF/cloudstack/core/spring-engine-storage-image-core-context.xml b/engine/storage/image/src/main/resources/META-INF/cloudstack/core/spring-engine-storage-image-core-context.xml
index 5c7b05b..805af26 100644
--- a/engine/storage/image/src/main/resources/META-INF/cloudstack/core/spring-engine-storage-image-core-context.xml
+++ b/engine/storage/image/src/main/resources/META-INF/cloudstack/core/spring-engine-storage-image-core-context.xml
@@ -34,6 +34,10 @@
     <bean id="templateDataFactoryImpl"
         class="org.apache.cloudstack.storage.image.TemplateDataFactoryImpl" />
 
+    <bean id="secondaryStorageServiceImpl"
+          class="org.apache.cloudstack.storage.image.SecondaryStorageServiceImpl"
+          depends-on="dataMotionServiceImpl, objectInDataStoreManagerImpl" />
+
     <bean id="imageStoreHelper"
         class="org.apache.cloudstack.storage.image.datastore.ImageStoreHelper" />
     <bean id="imageFormatHelper"
@@ -41,5 +45,5 @@
 
     <bean id="imageStoreProviderMgr"
         class="org.apache.cloudstack.storage.image.manager.ImageStoreProviderManagerImpl" />
-    
+
 </beans>
diff --git a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotObject.java b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
index 65d6fa5..f107343 100644
--- a/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
+++ b/engine/storage/snapshot/src/main/java/org/apache/cloudstack/storage/snapshot/SnapshotObject.java
@@ -18,12 +18,12 @@
  */
 package org.apache.cloudstack.storage.snapshot;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import javax.inject.Inject;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
@@ -40,6 +40,7 @@ import org.apache.cloudstack.storage.datastore.ObjectInDataStoreManager;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
 import org.apache.cloudstack.storage.to.SnapshotObjectTO;
+import org.apache.log4j.Logger;
 
 import com.cloud.agent.api.Answer;
 import com.cloud.agent.api.to.DataObjectType;
@@ -130,6 +131,24 @@ public class SnapshotObject implements SnapshotInfo {
     }
 
     @Override
+    public List<SnapshotInfo> getChildren() {
+        QueryBuilder<SnapshotDataStoreVO> sc = QueryBuilder.create(SnapshotDataStoreVO.class);
+        sc.and(sc.entity().getDataStoreId(), Op.EQ, store.getId());
+        sc.and(sc.entity().getRole(), Op.EQ, store.getRole());
+        sc.and(sc.entity().getState(), Op.NIN, State.Destroying, State.Destroyed, State.Error);
+        sc.and(sc.entity().getParentSnapshotId(), Op.EQ, getId());
+        List<SnapshotDataStoreVO> vos = sc.list();
+
+        List<SnapshotInfo> children = new ArrayList<>();
+        if (vos != null) {
+            for (SnapshotDataStoreVO vo : vos) {
+                children.add(snapshotFactory.getSnapshot(vo.getSnapshotId(), DataStoreRole.Image));
+            }
+        }
+        return children;
+    }
+
+    @Override
     public boolean isRevertable() {
         SnapshotStrategy snapshotStrategy = storageStrategyFactory.getSnapshotStrategy(snapshot, SnapshotOperation.REVERT);
         if (snapshotStrategy != null) {
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/DataStoreManagerImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/DataStoreManagerImpl.java
index 51421e4..ff6c4fb 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/DataStoreManagerImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/DataStoreManagerImpl.java
@@ -73,6 +73,11 @@ public class DataStoreManagerImpl implements DataStoreManager {
     }
 
     @Override
+    public List<DataStore> getImageStoresByScopeExcludingReadOnly(ZoneScope scope) {
+        return imageDataStoreMgr.listImageStoresByScopeExcludingReadOnly(scope);
+    }
+
+    @Override
     public DataStore getRandomImageStore(long zoneId) {
         List<DataStore> stores = getImageStoresByScope(new ZoneScope(zoneId));
         if (stores == null || stores.size() == 0) {
@@ -82,8 +87,17 @@ public class DataStoreManagerImpl implements DataStoreManager {
     }
 
     @Override
+    public DataStore getRandomUsableImageStore(long zoneId) {
+        List<DataStore> stores = getImageStoresByScopeExcludingReadOnly(new ZoneScope(zoneId));
+        if (stores == null || stores.size() == 0) {
+            return null;
+        }
+        return imageDataStoreMgr.getRandomImageStore(stores);
+    }
+
+    @Override
     public DataStore getImageStoreWithFreeCapacity(long zoneId) {
-        List<DataStore> stores = getImageStoresByScope(new ZoneScope(zoneId));
+        List<DataStore> stores = getImageStoresByScopeExcludingReadOnly(new ZoneScope(zoneId));
         if (stores == null || stores.size() == 0) {
             return null;
         }
@@ -91,8 +105,16 @@ public class DataStoreManagerImpl implements DataStoreManager {
     }
 
     @Override
+    public DataStore getImageStoreWithFreeCapacity(List<DataStore> imageStores) {
+        if (imageStores.isEmpty()) {
+            return null;
+        }
+        return imageDataStoreMgr.getImageStoreWithFreeCapacity(imageStores);
+    }
+
+    @Override
     public List<DataStore> listImageStoresWithFreeCapacity(long zoneId) {
-        List<DataStore> stores = getImageStoresByScope(new ZoneScope(zoneId));
+        List<DataStore> stores = getImageStoresByScopeExcludingReadOnly(new ZoneScope(zoneId));
         if (stores == null || stores.size() == 0) {
             return null;
         }
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/ObjectInDataStoreManagerImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/ObjectInDataStoreManagerImpl.java
index 062e89a..ff8112c 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/ObjectInDataStoreManagerImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/ObjectInDataStoreManagerImpl.java
@@ -104,6 +104,14 @@ public class ObjectInDataStoreManagerImpl implements ObjectInDataStoreManager {
         // TODO: further investigate why an extra event is sent when it is
         // alreay Ready for DownloadListener
         stateMachines.addTransition(State.Ready, Event.OperationSuccessed, State.Ready);
+        // State transitions for data object migration
+        stateMachines.addTransition(State.Ready, Event.MigrateDataRequested, State.Migrating);
+        stateMachines.addTransition(State.Ready, Event.CopyRequested, State.Copying);
+        stateMachines.addTransition(State.Allocated, Event.MigrateDataRequested, State.Migrating);
+        stateMachines.addTransition(State.Migrating, Event.MigrationFailed, State.Failed);
+        stateMachines.addTransition(State.Migrating, Event.MigrationSucceeded, State.Destroyed);
+        stateMachines.addTransition(State.Migrating, Event.OperationSuccessed, State.Ready);
+        stateMachines.addTransition(State.Migrating, Event.OperationFailed, State.Ready);
     }
 
     @Override
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/endpoint/DefaultEndPointSelector.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/endpoint/DefaultEndPointSelector.java
index 6e8bdaf..09b4b1a 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/endpoint/DefaultEndPointSelector.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/endpoint/DefaultEndPointSelector.java
@@ -264,6 +264,27 @@ public class DefaultEndPointSelector implements EndPointSelector {
         return RemoteHostEndPoint.getHypervisorHostEndPoint(host);
     }
 
+    @Override
+    public List<EndPoint> findAllEndpointsForScope(DataStore store) {
+        Long dcId = null;
+        Scope storeScope = store.getScope();
+        if (storeScope.getScopeType() == ScopeType.ZONE) {
+            dcId = storeScope.getScopeId();
+        }
+        // find ssvm that can be used to download data to store. For zone-wide
+        // image store, use SSVM for that zone. For region-wide store,
+        // we can arbitrarily pick one ssvm to do that task
+        List<HostVO> ssAHosts = listUpAndConnectingSecondaryStorageVmHost(dcId);
+        if (ssAHosts == null || ssAHosts.isEmpty()) {
+            return null;
+        }
+        List<EndPoint> endPoints = new ArrayList<EndPoint>();
+        for (HostVO host: ssAHosts) {
+            endPoints.add(RemoteHostEndPoint.getHypervisorHostEndPoint(host));
+        }
+        return endPoints;
+    }
+
     private List<HostVO> listUpAndConnectingSecondaryStorageVmHost(Long dcId) {
         QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
         if (dcId != null) {
@@ -333,7 +354,7 @@ public class DefaultEndPointSelector implements EndPointSelector {
         }
     }
 
-    private EndPoint getEndPointFromHostId(Long hostId) {
+    public EndPoint getEndPointFromHostId(Long hostId) {
         HostVO host = hostDao.findById(hostId);
         return RemoteHostEndPoint.getHypervisorHostEndPoint(host);
     }
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java
index dec9b76..965c332 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/BaseImageStoreDriverImpl.java
@@ -20,21 +20,18 @@ package org.apache.cloudstack.storage.image;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
-import com.cloud.agent.api.storage.OVFPropertyTO;
-import com.cloud.storage.Upload;
-import com.cloud.storage.dao.TemplateOVFPropertiesDao;
-import com.cloud.storage.TemplateOVFPropertyVO;
-import com.cloud.utils.crypt.DBEncryptionUtil;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
 import org.apache.cloudstack.engine.subsystem.api.storage.CreateCmdResult;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
@@ -47,24 +44,42 @@ import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
 import org.apache.cloudstack.framework.async.AsyncRpcContext;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.storage.command.CommandResult;
+import org.apache.cloudstack.storage.command.CopyCommand;
 import org.apache.cloudstack.storage.command.DeleteCommand;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
 import org.apache.cloudstack.storage.endpoint.DefaultEndPointSelector;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.log4j.Logger;
 
+import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Answer;
 import com.cloud.agent.api.storage.CreateDatadiskTemplateCommand;
 import com.cloud.agent.api.storage.DownloadAnswer;
 import com.cloud.agent.api.storage.GetDatadisksAnswer;
 import com.cloud.agent.api.storage.GetDatadisksCommand;
+import com.cloud.agent.api.storage.OVFPropertyTO;
 import com.cloud.agent.api.to.DataObjectType;
 import com.cloud.agent.api.to.DataTO;
+import com.cloud.agent.api.to.DatadiskTO;
+import com.cloud.agent.api.to.NfsTO;
 import com.cloud.alert.AlertManager;
+import com.cloud.configuration.Config;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.dao.HostDao;
+import com.cloud.secstorage.CommandExecLogDao;
+import com.cloud.secstorage.CommandExecLogVO;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.TemplateOVFPropertyVO;
+import com.cloud.storage.Upload;
 import com.cloud.storage.VMTemplateStorageResourceAssoc;
 import com.cloud.storage.VMTemplateVO;
 import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.TemplateOVFPropertiesDao;
 import com.cloud.storage.dao.VMTemplateDao;
 import com.cloud.storage.dao.VMTemplateDetailsDao;
 import com.cloud.storage.dao.VMTemplateZoneDao;
@@ -72,9 +87,12 @@ import com.cloud.storage.dao.VolumeDao;
 import com.cloud.storage.download.DownloadMonitor;
 import com.cloud.user.ResourceLimitService;
 import com.cloud.user.dao.AccountDao;
-import com.cloud.agent.api.to.DatadiskTO;
-import com.cloud.utils.net.Proxy;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.crypt.DBEncryptionUtil;
+import com.cloud.utils.db.TransactionLegacy;
 import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.net.Proxy;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
 
 public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver {
     private static final Logger s_logger = Logger.getLogger(BaseImageStoreDriverImpl.class);
@@ -106,6 +124,16 @@ public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver {
     ResourceLimitService _resourceLimitMgr;
     @Inject
     TemplateOVFPropertiesDao templateOvfPropertiesDao;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    CommandExecLogDao _cmdExecLogDao;
+    @Inject
+    StorageManager storageMgr;
+    @Inject
+    protected SecondaryStorageVmDao _secStorageVmDao;
+    @Inject
+    AgentManager agentMgr;
 
     protected String _proxy = null;
 
@@ -333,10 +361,77 @@ public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver {
 
     @Override
     public void copyAsync(DataObject srcdata, DataObject destData, AsyncCompletionCallback<CopyCommandResult> callback) {
+        if (!canCopy(srcdata, destData)) {
+            return;
+        }
+
+        if ((srcdata.getType() == DataObjectType.TEMPLATE && destData.getType() == DataObjectType.TEMPLATE) ||
+                (srcdata.getType() == DataObjectType.SNAPSHOT && destData.getType() == DataObjectType.SNAPSHOT) ||
+                (srcdata.getType() == DataObjectType.VOLUME && destData.getType() == DataObjectType.VOLUME)) {
+
+            int nMaxExecutionMinutes = NumbersUtil.parseInt(configDao.getValue(Config.SecStorageCmdExecutionTimeMax.key()), 30);
+            CopyCommand cmd = new CopyCommand(srcdata.getTO(), destData.getTO(), nMaxExecutionMinutes * 60 * 1000, true);
+            Answer answer = null;
+
+            // Select host endpoint such that the load is balanced out
+            List<EndPoint> eps = _epSelector.findAllEndpointsForScope(srcdata.getDataStore());
+            if (eps == null || eps.isEmpty()) {
+                String errMsg = "No remote endpoint to send command, check if host or ssvm is down?";
+                s_logger.error(errMsg);
+                answer = new Answer(cmd, false, errMsg);
+            } else {
+                // select endpoint with least number of commands running on them
+                answer = sendToLeastBusyEndpoint(eps, cmd);
+            }
+            CopyCommandResult result = new CopyCommandResult("", answer);
+            callback.complete(result);
+        }
+    }
+
+    private Answer sendToLeastBusyEndpoint(List<EndPoint> eps, CopyCommand cmd) {
+        Answer answer = null;
+        EndPoint endPoint = null;
+        Long epId = ssvmWithLeastMigrateJobs();
+        if (epId == null) {
+            Collections.shuffle(eps);
+            endPoint = eps.get(0);
+        } else {
+            List<EndPoint> remainingEps = eps.stream().filter(ep -> ep.getId() != epId ).collect(Collectors.toList());
+            if (!remainingEps.isEmpty()) {
+                Collections.shuffle(remainingEps);
+                endPoint = remainingEps.get(0);
+            } else {
+                endPoint = _defaultEpSelector.getEndPointFromHostId(epId);
+            }
+        }
+        CommandExecLogVO execLog = new CommandExecLogVO(endPoint.getId(), _secStorageVmDao.findByInstanceName(hostDao.findById(endPoint.getId()).getName()).getId(), "DataMigrationCommand", 1);
+        Long cmdExecId = _cmdExecLogDao.persist(execLog).getId();
+        String errMsg = null;
+        try {
+            answer = agentMgr.send(endPoint.getId(), cmd);
+            answer.setContextParam("cmd", cmdExecId.toString());
+            return answer;
+        }  catch (AgentUnavailableException e) {
+            errMsg = e.toString();
+            s_logger.debug("Failed to send command, due to Agent:" + endPoint.getId() + ", " + e.toString());
+        } catch (OperationTimedoutException e) {
+            errMsg = e.toString();
+            s_logger.debug("Failed to send command, due to Agent:" + endPoint.getId() + ", " + e.toString());
+        }
+        throw new CloudRuntimeException("Failed to send command, due to Agent:" + endPoint.getId() + ", " + errMsg);
     }
 
     @Override
     public boolean canCopy(DataObject srcData, DataObject destData) {
+        DataStore srcStore = srcData.getDataStore();
+        DataStore destStore = destData.getDataStore();
+        if ((srcData.getDataStore().getTO() instanceof NfsTO && destData.getDataStore().getTO() instanceof NfsTO) &&
+                (srcStore.getRole() == DataStoreRole.Image && destStore.getRole() == DataStoreRole.Image) &&
+                ((srcData.getType() == DataObjectType.TEMPLATE && destData.getType() == DataObjectType.TEMPLATE) ||
+                (srcData.getType() == DataObjectType.SNAPSHOT && destData.getType() == DataObjectType.SNAPSHOT) ||
+                (srcData.getType() == DataObjectType.VOLUME && destData.getType() == DataObjectType.VOLUME))) {
+            return true;
+        }
         return false;
     }
 
@@ -399,4 +494,28 @@ public abstract class BaseImageStoreDriverImpl implements ImageStoreDriver {
         callback.complete(result);
         return null;
     }
+
+    private Integer getCopyCmdsCountToSpecificSSVM(Long ssvmId) {
+        return _cmdExecLogDao.getCopyCmdCountForSSVM(ssvmId);
+    }
+
+    private Long ssvmWithLeastMigrateJobs() {
+        s_logger.debug("Picking ssvm from the pool with least commands running on it");
+        String query = "select host_id, count(*) from cmd_exec_log group by host_id order by 2 limit 1;";
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
+
+        Long epId = null;
+        PreparedStatement pstmt = null;
+        try {
+            pstmt = txn.prepareAutoCloseStatement(query);
+            ResultSet rs = pstmt.executeQuery();
+            if (rs.getFetchSize() > 0) {
+                rs.absolute(1);
+                epId = (long) rs.getInt(1);
+            }
+        } catch (SQLException e) {
+            s_logger.debug("SQLException caught", e);
+        }
+        return epId;
+    }
 }
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/datastore/ImageStoreProviderManager.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/datastore/ImageStoreProviderManager.java
index 01f2100..7e2f720 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/datastore/ImageStoreProviderManager.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/datastore/ImageStoreProviderManager.java
@@ -36,6 +36,8 @@ public interface ImageStoreProviderManager {
 
     List<DataStore> listImageStoresByScope(ZoneScope scope);
 
+    List<DataStore> listImageStoresByScopeExcludingReadOnly(ZoneScope scope);
+
     List<DataStore> listImageStoreByProvider(String provider);
 
     List<DataStore> listImageCacheStores(Scope scope);
@@ -76,4 +78,6 @@ public interface ImageStoreProviderManager {
      * @return            the list of DataStore which have free capacity
      */
     List<DataStore> listImageStoresWithFreeCapacity(List<DataStore> imageStores);
+
+    List<DataStore> orderImageStoresOnFreeCapacity(List<DataStore> imageStores);
 }
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
index 151b9ba..4e5210d 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/SnapshotDataStoreDaoImpl.java
@@ -27,8 +27,6 @@ import java.util.Map;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
-import com.cloud.storage.DataStoreRole;
-import com.cloud.storage.SnapshotVO;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
 import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine.Event;
@@ -39,6 +37,8 @@ import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
 
 import com.cloud.hypervisor.Hypervisor;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
 import com.cloud.storage.dao.SnapshotDao;
 import com.cloud.utils.db.DB;
 import com.cloud.utils.db.Filter;
@@ -65,6 +65,7 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
     private SearchBuilder<SnapshotDataStoreVO> stateSearch;
     private SearchBuilder<SnapshotDataStoreVO> parentSnapshotSearch;
     private SearchBuilder<SnapshotVO> snapshotVOSearch;
+    private SearchBuilder<SnapshotDataStoreVO> snapshotCreatedSearch;
 
     public static ArrayList<Hypervisor.HypervisorType> hypervisorsSupportingSnapshotsChaining = new ArrayList<Hypervisor.HypervisorType>();
 
@@ -158,6 +159,11 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
         snapshotVOSearch.and("volume_id", snapshotVOSearch.entity().getVolumeId(), SearchCriteria.Op.EQ);
         snapshotVOSearch.done();
 
+        snapshotCreatedSearch = createSearchBuilder();
+        snapshotCreatedSearch.and("store_id", snapshotCreatedSearch.entity().getDataStoreId(), Op.EQ);
+        snapshotCreatedSearch.and("created",  snapshotCreatedSearch.entity().getCreated(), Op.BETWEEN);
+        snapshotCreatedSearch.done();
+
         return true;
     }
 
@@ -335,6 +341,15 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
     }
 
     @Override
+    public SnapshotDataStoreVO findBySourceSnapshot(long snapshotId, DataStoreRole role) {
+        SearchCriteria<SnapshotDataStoreVO> sc = snapshotSearch.create();
+        sc.setParameters("snapshot_id", snapshotId);
+        sc.setParameters("store_role", role);
+        sc.setParameters("state", State.Migrating);
+        return findOneBy(sc);
+    }
+
+    @Override
     public List<SnapshotDataStoreVO> listAllByVolumeAndDataStore(long volumeId, DataStoreRole role) {
         SearchCriteria<SnapshotDataStoreVO> sc = volumeSearch.create();
         sc.setParameters("volume_id", volumeId);
@@ -462,6 +477,15 @@ public class SnapshotDataStoreDaoImpl extends GenericDaoBase<SnapshotDataStoreVO
     }
 
     @Override
+    public List<SnapshotDataStoreVO> findSnapshots(Long storeId, Date start, Date end) {
+        SearchCriteria<SnapshotDataStoreVO> sc = snapshotCreatedSearch.create();
+        sc.setParameters("store_id", storeId);
+        if (start != null && end != null) {
+            sc.setParameters("created", start, end);
+        }
+        return search(sc, null);
+    }
+
     public SnapshotDataStoreVO findDestroyedReferenceBySnapshot(long snapshotId, DataStoreRole role) {
         SearchCriteria<SnapshotDataStoreVO> sc = snapshotSearch.create();
         sc.setParameters("snapshot_id", snapshotId);
diff --git a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/TemplateDataStoreDaoImpl.java b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/TemplateDataStoreDaoImpl.java
index 2372e84..5a0e4ee 100644
--- a/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/TemplateDataStoreDaoImpl.java
+++ b/engine/storage/src/main/java/org/apache/cloudstack/storage/image/db/TemplateDataStoreDaoImpl.java
@@ -97,6 +97,7 @@ public class TemplateDataStoreDaoImpl extends GenericDaoBase<TemplateDataStoreVO
 
         templateSearch = createSearchBuilder();
         templateSearch.and("template_id", templateSearch.entity().getTemplateId(), SearchCriteria.Op.EQ);
+        templateSearch.and("download_state", templateSearch.entity().getDownloadState(), SearchCriteria.Op.NEQ);
         templateSearch.and("destroyed", templateSearch.entity().getDestroyed(), SearchCriteria.Op.EQ);
         templateSearch.done();
 
@@ -419,6 +420,15 @@ public class TemplateDataStoreDaoImpl extends GenericDaoBase<TemplateDataStoreVO
     }
 
     @Override
+    public List<TemplateDataStoreVO> listByTemplateNotBypassed(long templateId) {
+        SearchCriteria<TemplateDataStoreVO> sc = templateSearch.create();
+        sc.setParameters("template_id", templateId);
+        sc.setParameters("download_state", Status.BYPASSED);
+        sc.setParameters("destroyed", false);
+        return search(sc, null);
+    }
+
+    @Override
     public TemplateDataStoreVO findByTemplateZone(long templateId, Long zoneId, DataStoreRole role) {
         // get all elgible image stores
         List<DataStore> imgStores = null;
diff --git a/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/volume/VolumeObject.java b/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/volume/VolumeObject.java
index 690a112..76e59d8 100644
--- a/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/volume/VolumeObject.java
+++ b/engine/storage/volume/src/main/java/org/apache/cloudstack/storage/volume/VolumeObject.java
@@ -20,9 +20,6 @@ import java.util.Date;
 
 import javax.inject.Inject;
 
-import com.cloud.storage.MigrationOptions;
-import org.apache.log4j.Logger;
-
 import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
@@ -33,6 +30,7 @@ import org.apache.cloudstack.storage.datastore.ObjectInDataStoreManager;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.log4j.Logger;
 
 import com.cloud.agent.api.Answer;
 import com.cloud.agent.api.storage.DownloadAnswer;
@@ -42,6 +40,7 @@ import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.offering.DiskOffering.DiskCacheMode;
 import com.cloud.storage.DataStoreRole;
 import com.cloud.storage.DiskOfferingVO;
+import com.cloud.storage.MigrationOptions;
 import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.Storage.ProvisioningType;
 import com.cloud.storage.Volume;
@@ -392,7 +391,9 @@ public class VolumeObject implements VolumeInfo {
                 if (event == ObjectInDataStoreStateMachine.Event.CreateOnlyRequested) {
                     volEvent = Volume.Event.UploadRequested;
                 } else if (event == ObjectInDataStoreStateMachine.Event.MigrationRequested) {
-                    volEvent = Volume.Event.CopyRequested;
+                    volEvent = Event.CopyRequested;
+                } else if (event == ObjectInDataStoreStateMachine.Event.MigrateDataRequested) {
+                    return;
                 }
             } else {
                 if (event == ObjectInDataStoreStateMachine.Event.CreateRequested || event == ObjectInDataStoreStateMachine.Event.CreateOnlyRequested) {
diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index 6ca698b..7dd7343 100644
--- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -242,7 +242,9 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
         SearchCriteria<Long> sc = asyncJobTypeSearch.create();
         sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
         sc.setParameters("job_cmd", (Object[])cmds);
-        sc.setParameters("job_info", "%" + havingInfo + "%");
+        if (havingInfo != null) {
+            sc.setParameters("job_info", "%" + havingInfo + "%");
+        }
         List<Long> results = customSearch(sc, null);
         return results.get(0);
     }
diff --git a/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java b/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java
index 92c128b..7a1fb0c 100644
--- a/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java
+++ b/plugins/integrations/prometheus/src/main/java/org/apache/cloudstack/metrics/PrometheusExporterImpl.java
@@ -220,7 +220,7 @@ public class PrometheusExporterImpl extends ManagerBase implements PrometheusExp
             metricsList.add(new ItemPool(zoneName, zoneUuid, poolName, poolPath, "primary", poolFactor, TOTAL, totalCapacity));
         }
 
-        for (final ImageStore imageStore : imageStoreDao.findByScope(new ZoneScope(dcId))) {
+        for (final ImageStore imageStore : imageStoreDao.findByZone(new ZoneScope(dcId), null)) {
             final StorageStats stats = ApiDBUtils.getSecondaryStorageStatistics(imageStore.getId());
             metricsList.add(new ItemPool(zoneName, zoneUuid, imageStore.getName(), imageStore.getUrl(), "secondary", null, USED, stats != null ? stats.getByteUsed() : 0));
             metricsList.add(new ItemPool(zoneName, zoneUuid, imageStore.getName(), imageStore.getUrl(), "secondary", null, TOTAL, stats != null ? stats.getCapacityBytes() : 0));
diff --git a/pom.xml b/pom.xml
index 262960c..c040d96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
         <cs.discovery.version>0.5</cs.discovery.version>
         <cs.lang.version>2.6</cs.lang.version>
         <cs.pool.version>2.7.0</cs.pool.version>
+        <cs.commons-math3.version>3.6.1</cs.commons-math3.version>
 
         <!-- Testing versions -->
         <!-- do not forget to also upgrade hamcrest library with junit -->
diff --git a/server/pom.xml b/server/pom.xml
index 7cd0dd1..e3d98df 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -90,6 +90,11 @@
             <artifactId>commons-codec</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>${cs.commons-math3.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.cloudstack</groupId>
             <artifactId>cloud-utils</artifactId>
             <version>${project.version}</version>
diff --git a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
index f13b795..154a293 100644
--- a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
+++ b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
@@ -118,6 +118,7 @@ import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.query.QueryService;
 import org.apache.cloudstack.resourcedetail.dao.DiskOfferingDetailsDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.log4j.Logger;
 import org.springframework.stereotype.Component;
@@ -414,10 +415,14 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
     private RouterHealthCheckResultDao routerHealthCheckResultDao;
 
     @Inject
+    private TemplateDataStoreDao templateDataStoreDao;
+
+    @Inject
     private ProjectInvitationDao projectInvitationDao;
 
     @Inject
     private UserDao userDao;
+
     /*
      * (non-Javadoc)
      *
@@ -2531,6 +2536,7 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
         Object keyword = cmd.getKeyword();
         Long startIndex = cmd.getStartIndex();
         Long pageSize = cmd.getPageSizeVal();
+        Boolean readonly = cmd.getReadonly();
 
         Filter searchFilter = new Filter(ImageStoreJoinVO.class, "id", Boolean.TRUE, startIndex, pageSize);
 
@@ -2543,6 +2549,7 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
         sb.and("protocol", sb.entity().getProtocol(), SearchCriteria.Op.EQ);
         sb.and("provider", sb.entity().getProviderName(), SearchCriteria.Op.EQ);
         sb.and("role", sb.entity().getRole(), SearchCriteria.Op.EQ);
+        sb.and("readonly", sb.entity().isReadonly(), Op.EQ);
 
         SearchCriteria<ImageStoreJoinVO> sc = sb.create();
         sc.setParameters("role", DataStoreRole.Image);
@@ -2571,7 +2578,9 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
         if (protocol != null) {
             sc.setParameters("protocol", protocol);
         }
-
+        if (readonly != null) {
+            sc.setParameters("readonly", readonly);
+        }
         // search Store details by ids
         Pair<List<ImageStoreJoinVO>, Integer> uniqueStorePair = _imageStoreJoinDao.searchAndCount(sc, searchFilter);
         Integer count = uniqueStorePair.second();
diff --git a/server/src/main/java/com/cloud/api/query/dao/ImageStoreJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/ImageStoreJoinDaoImpl.java
index 2389b57..b91398d 100644
--- a/server/src/main/java/com/cloud/api/query/dao/ImageStoreJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/ImageStoreJoinDaoImpl.java
@@ -67,6 +67,7 @@ public class ImageStoreJoinDaoImpl extends GenericDaoBase<ImageStoreJoinVO, Long
         osResponse.setName(ids.getName());
         osResponse.setProviderName(ids.getProviderName());
         osResponse.setProtocol(ids.getProtocol());
+        osResponse.setReadonly(ids.isReadonly());
         String url = ids.getUrl();
         //if store is type cifs, remove the password
         if(ids.getProtocol().equals("cifs".toString())) {
diff --git a/server/src/main/java/com/cloud/api/query/dao/TemplateJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/TemplateJoinDaoImpl.java
index 61a1017..398a63d 100644
--- a/server/src/main/java/com/cloud/api/query/dao/TemplateJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/TemplateJoinDaoImpl.java
@@ -25,10 +25,6 @@ import java.util.Set;
 
 import javax.inject.Inject;
 
-import org.apache.cloudstack.utils.security.DigestHelper;
-import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
-
 import org.apache.cloudstack.api.ResponseObject.ResponseView;
 import org.apache.cloudstack.api.response.ChildTemplateResponse;
 import org.apache.cloudstack.api.response.TemplateResponse;
@@ -36,6 +32,12 @@ import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateState;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.utils.security.DigestHelper;
+import org.apache.log4j.Logger;
+import org.springframework.stereotype.Component;
 
 import com.cloud.api.ApiDBUtils;
 import com.cloud.api.ApiResponseHelper;
@@ -70,6 +72,10 @@ public class TemplateJoinDaoImpl extends GenericDaoBaseWithTagInformation<Templa
     @Inject
     private VMTemplateDao _vmTemplateDao;
     @Inject
+    private TemplateDataStoreDao _templateStoreDao;
+    @Inject
+    private ImageStoreDao dataStoreDao;
+    @Inject
     private VMTemplateDetailsDao _templateDetailsDao;
 
     private final SearchBuilder<TemplateJoinVO> tmpltIdPairSearch;
@@ -108,6 +114,7 @@ public class TemplateJoinDaoImpl extends GenericDaoBaseWithTagInformation<Templa
         activeTmpltSearch.and("store_id", activeTmpltSearch.entity().getDataStoreId(), SearchCriteria.Op.EQ);
         activeTmpltSearch.and("type", activeTmpltSearch.entity().getTemplateType(), SearchCriteria.Op.EQ);
         activeTmpltSearch.and("templateState", activeTmpltSearch.entity().getTemplateState(), SearchCriteria.Op.EQ);
+        activeTmpltSearch.and("public", activeTmpltSearch.entity().isPublicTemplate(), SearchCriteria.Op.EQ);
         activeTmpltSearch.done();
 
         // select distinct pair (template_id, zone_id)
@@ -141,7 +148,18 @@ public class TemplateJoinDaoImpl extends GenericDaoBaseWithTagInformation<Templa
 
     @Override
     public TemplateResponse newTemplateResponse(ResponseView view, TemplateJoinVO template) {
+        List<TemplateDataStoreVO> templatesInStore = _templateStoreDao.listByTemplateNotBypassed(template.getId());
+        List<Map<String, String>> downloadProgressDetails = new ArrayList();
+        HashMap<String, String> downloadDetailInImageStores = null;
+        for (TemplateDataStoreVO templateInStore : templatesInStore) {
+            downloadDetailInImageStores = new HashMap<>();
+            downloadDetailInImageStores.put("datastore", dataStoreDao.findById(templateInStore.getDataStoreId()).getName());
+            downloadDetailInImageStores.put("downloadPercent", Integer.toString(templateInStore.getDownloadPercent()));
+            downloadDetailInImageStores.put("downloadState", (templateInStore.getDownloadState() != null ? templateInStore.getDownloadState().toString() : ""));
+            downloadProgressDetails.add(downloadDetailInImageStores);
+        }
         TemplateResponse templateResponse = new TemplateResponse();
+        templateResponse.setDownloadProgress(downloadProgressDetails);
         templateResponse.setId(template.getUuid());
         templateResponse.setName(template.getName());
         templateResponse.setDisplayText(template.getDisplayText());
@@ -478,6 +496,7 @@ public class TemplateJoinDaoImpl extends GenericDaoBaseWithTagInformation<Templa
         sc.setParameters("store_id", storeId);
         sc.setParameters("type", TemplateType.USER);
         sc.setParameters("templateState", VirtualMachineTemplate.State.Active);
+        sc.setParameters("public", Boolean.FALSE);
         return searchIncludingRemoved(sc, null, null, false);
     }
 
diff --git a/server/src/main/java/com/cloud/api/query/vo/ImageStoreJoinVO.java b/server/src/main/java/com/cloud/api/query/vo/ImageStoreJoinVO.java
index 244f89e..bcc73cb 100644
--- a/server/src/main/java/com/cloud/api/query/vo/ImageStoreJoinVO.java
+++ b/server/src/main/java/com/cloud/api/query/vo/ImageStoreJoinVO.java
@@ -67,6 +67,9 @@ public class ImageStoreJoinVO extends BaseViewVO implements InternalIdentity, Id
     @Enumerated(value = EnumType.STRING)
     private DataStoreRole role;
 
+    @Column(name = "readonly")
+    private boolean readonly = false;
+
     @Column(name = "data_center_id")
     private long zoneId;
 
@@ -128,4 +131,8 @@ public class ImageStoreJoinVO extends BaseViewVO implements InternalIdentity, Id
     public Date getRemoved() {
         return removed;
     }
+
+    public boolean isReadonly() {
+        return readonly;
+    }
 }
diff --git a/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java b/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
index 7e9c9d3..f95cc65 100755
--- a/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
+++ b/server/src/main/java/com/cloud/configuration/ConfigurationManagerImpl.java
@@ -1708,7 +1708,7 @@ public class ConfigurationManagerImpl extends ManagerBase implements Configurati
         }
 
         //check if there are any secondary stores attached to the zone
-        if(!_imageStoreDao.findByScope(new ZoneScope(zoneId)).isEmpty()) {
+        if(!_imageStoreDao.findByZone(new ZoneScope(zoneId), null).isEmpty()) {
             throw new CloudRuntimeException(errorMsg + "there are Secondary storages in this zone");
         }
 
diff --git a/server/src/main/java/com/cloud/server/ManagementServerImpl.java b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
index 32fd5b4..80b54c0 100644
--- a/server/src/main/java/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
@@ -195,8 +195,10 @@ import org.apache.cloudstack.api.command.admin.storage.ListSecondaryStagingStore
 import org.apache.cloudstack.api.command.admin.storage.ListStoragePoolsCmd;
 import org.apache.cloudstack.api.command.admin.storage.ListStorageProvidersCmd;
 import org.apache.cloudstack.api.command.admin.storage.ListStorageTagsCmd;
+import org.apache.cloudstack.api.command.admin.storage.MigrateSecondaryStorageDataCmd;
 import org.apache.cloudstack.api.command.admin.storage.PreparePrimaryStorageForMaintenanceCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateCloudToUseObjectStoreCmd;
+import org.apache.cloudstack.api.command.admin.storage.UpdateImageStoreCmd;
 import org.apache.cloudstack.api.command.admin.storage.UpdateStoragePoolCmd;
 import org.apache.cloudstack.api.command.admin.swift.AddSwiftCmd;
 import org.apache.cloudstack.api.command.admin.swift.ListSwiftsCmd;
@@ -2765,6 +2767,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
         cmdList.add(FindStoragePoolsForMigrationCmd.class);
         cmdList.add(PreparePrimaryStorageForMaintenanceCmd.class);
         cmdList.add(UpdateStoragePoolCmd.class);
+        cmdList.add(UpdateImageStoreCmd.class);
         cmdList.add(DestroySystemVmCmd.class);
         cmdList.add(ListSystemVMsCmd.class);
         cmdList.add(MigrateSystemVMCmd.class);
@@ -3158,6 +3161,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
         cmdList.add(ListTemplateOVFProperties.class);
         cmdList.add(GetRouterHealthCheckResultsCmd.class);
         cmdList.add(StartRollingMaintenanceCmd.class);
+        cmdList.add(MigrateSecondaryStorageDataCmd.class);
 
         // Out-of-band management APIs for admins
         cmdList.add(EnableOutOfBandManagementForHostCmd.class);
diff --git a/server/src/main/java/com/cloud/storage/ImageStoreServiceImpl.java b/server/src/main/java/com/cloud/storage/ImageStoreServiceImpl.java
new file mode 100644
index 0000000..43f9cd4
--- /dev/null
+++ b/server/src/main/java/com/cloud/storage/ImageStoreServiceImpl.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.api.command.admin.storage.MigrateSecondaryStorageDataCmd;
+import org.apache.cloudstack.api.response.MigrationResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.cloudstack.engine.orchestration.service.StorageOrchestrationService;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProvider;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
+import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.log4j.Logger;
+
+import com.cloud.event.ActionEvent;
+import com.cloud.event.EventTypes;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class ImageStoreServiceImpl extends ManagerBase implements ImageStoreService {
+
+    private static final Logger s_logger = Logger.getLogger(ImageStoreServiceImpl.class);
+    @Inject
+    ImageStoreDao imageStoreDao;
+    @Inject
+    private AsyncJobManager jobMgr;
+    @Inject
+    private StorageOrchestrationService stgService;
+
+    ConfigKey<Double> ImageStoreImbalanceThreshold = new ConfigKey<>("Advanced", Double.class,
+            "image.store.imbalance.threshold",
+            "0.3",
+            "The storage imbalance threshold that is compared with the standard deviation percentage for a storage utilization metric. " +
+                    "The value is a percentage in decimal format.",
+            true, ConfigKey.Scope.Global);
+
+
+    public Integer numConcurrentCopyTasksPerSSVM = null;
+
+
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+        return true;
+    }
+
+    @Override
+    @ActionEvent(eventType = EventTypes.EVENT_IMAGE_STORE_DATA_MIGRATE, eventDescription = "migrating Image store data", async = true)
+    public MigrationResponse migrateData(MigrateSecondaryStorageDataCmd cmd) {
+        Long srcImgStoreId = cmd.getId();
+        ImageStoreVO srcImageVO = imageStoreDao.findById(srcImgStoreId);
+        List<Long> destImgStoreIds = cmd.getMigrateTo();
+        List<String> imagestores = new ArrayList<String>();
+        String migrationType = cmd.getMigrationType();
+
+        // default policy is complete
+        MigrationPolicy policy = MigrationPolicy.COMPLETE;
+
+        if (migrationType != null) {
+            if (!EnumUtils.isValidEnum(MigrationPolicy.class, migrationType.toUpperCase())) {
+                throw new CloudRuntimeException("Not a valid migration policy");
+            }
+            policy = MigrationPolicy.valueOf(migrationType.toUpperCase());
+        }
+
+        String message = null;
+
+        if (srcImageVO == null) {
+            throw new CloudRuntimeException("Cannot find secondary storage with id: " + srcImgStoreId);
+        }
+
+        Long srcStoreDcId = srcImageVO.getDataCenterId();
+        imagestores.add(srcImageVO.getName());
+        if (srcImageVO.getRole() != DataStoreRole.Image) {
+            throw new CloudRuntimeException("Secondary storage is not of Image Role");
+        }
+
+        if (!srcImageVO.getProviderName().equals(DataStoreProvider.NFS_IMAGE)) {
+            throw new InvalidParameterValueException("Migration of datastore objects is supported only for NFS based image stores");
+        }
+
+        if (destImgStoreIds.contains(srcImgStoreId)) {
+            s_logger.debug("One of the destination stores is the same as the source image store ... Ignoring it...");
+            destImgStoreIds.remove(srcImgStoreId);
+        }
+
+        // Validate all the Ids correspond to valid Image stores
+        List<Long> destDatastores = new ArrayList<>();
+        for (Long id : destImgStoreIds) {
+            ImageStoreVO store = imageStoreDao.findById(id);
+            if (store == null) {
+                s_logger.warn("Secondary storage with id: " + id + "is not found. Skipping it...");
+                continue;
+            }
+            if (store.isReadonly()) {
+                s_logger.warn("Secondary storage: "+ id + " cannot be considered for migration as has read-only permission, Skipping it... ");
+                continue;
+            }
+
+            if (!store.getProviderName().equals(DataStoreProvider.NFS_IMAGE)) {
+                s_logger.warn("Destination image store : " + store.getName() + " not NFS based. Store not suitable for migration!");
+                continue;
+            }
+
+            if (srcStoreDcId != null && store.getDataCenterId() != null && !srcStoreDcId.equals(store.getDataCenterId())) {
+                s_logger.warn("Source and destination stores are not in the same zone. Skipping destination store: " + store.getName());
+                continue;
+            }
+
+            destDatastores.add(id);
+            imagestores.add(store.getName());
+        }
+
+        if (destDatastores.size() < 1) {
+            throw new CloudRuntimeException("No destination valid store(s) available to migrate. Could" +
+                    "be due to invalid store ID(s) or store(s) are read-only. Terminating Migration of data");
+        }
+
+        if (isMigrateJobRunning()){
+            message = "A migrate job is in progress, please try again later...";
+            return new MigrationResponse(message, policy.toString(), false);
+        }
+
+        CallContext.current().setEventDetails("Migrating files/data objects " + "from : " + imagestores.get(0) + " to: " + imagestores.subList(1, imagestores.size()));
+        return  stgService.migrateData(srcImgStoreId, destDatastores, policy);
+    }
+
+
+    // Ensures that only one migrate job may occur at a time, in order to reduce load
+    private boolean isMigrateJobRunning() {
+        long count = jobMgr.countPendingJobs(null, MigrateSecondaryStorageDataCmd.class.getName());
+        if (count > 1) {
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
index f327b6a..c59a26d 100644
--- a/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/main/java/com/cloud/storage/StorageManagerImpl.java
@@ -1368,7 +1368,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         // so here we don't need to issue DeleteCommand to resource anymore, only need to remove db entry.
         try {
             // Cleanup templates in template_store_ref
-            List<DataStore> imageStores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(null));
+            List<DataStore> imageStores = _dataStoreMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(null));
             for (DataStore store : imageStores) {
                 try {
                     long storeId = store.getId();
@@ -2157,6 +2157,18 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
         return discoverImageStore(name, url, providerName, null, details);
     }
 
+    @Override
+    public ImageStore updateImageStoreStatus(Long id, Boolean readonly) {
+        // Input validation
+        ImageStoreVO imageStoreVO = _imageStoreDao.findById(id);
+        if (imageStoreVO == null) {
+            throw new IllegalArgumentException("Unable to find image store with ID: " + id);
+        }
+        imageStoreVO.setReadonly(readonly);
+        _imageStoreDao.update(id, imageStoreVO);
+        return imageStoreVO;
+    }
+
     private void duplicateCacheStoreRecordsToRegionStore(long storeId) {
         _templateStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
         _snapshotStoreDao.duplicateCacheRecordsOnRegionStore(storeId);
@@ -2511,7 +2523,9 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
                 KvmStorageOnlineMigrationWait,
                 KvmAutoConvergence,
                 MaxNumberOfManagedClusteredFileSystems,
-                PRIMARY_STORAGE_DOWNLOAD_WAIT
+                PRIMARY_STORAGE_DOWNLOAD_WAIT,
+                SecStorageMaxMigrateSessions,
+                MaxDataMigrationWaitTime
         };
     }
 
diff --git a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
index dc90c37..b5c33eb 100644
--- a/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/main/java/com/cloud/storage/VolumeApiServiceImpl.java
@@ -2788,24 +2788,26 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
 
         // Copy volume from primary to secondary storage
         VolumeInfo srcVol = volFactory.getVolume(volumeId);
-        AsyncCallFuture<VolumeApiResult> cvAnswer = volService.copyVolume(srcVol, secStore);
-        // Check if you got a valid answer.
+        VolumeInfo destVol = volFactory.getVolume(volumeId, DataStoreRole.Image);
         VolumeApiResult cvResult = null;
-        try {
-            cvResult = cvAnswer.get();
-        } catch (InterruptedException e1) {
-            s_logger.debug("failed copy volume", e1);
-            throw new CloudRuntimeException("Failed to copy volume", e1);
-        } catch (ExecutionException e1) {
-            s_logger.debug("failed copy volume", e1);
-            throw new CloudRuntimeException("Failed to copy volume", e1);
-        }
-        if (cvResult == null || cvResult.isFailed()) {
-            String errorString = "Failed to copy the volume from the source primary storage pool to secondary storage.";
-            throw new CloudRuntimeException(errorString);
+        if (destVol == null) {
+            AsyncCallFuture<VolumeApiResult> cvAnswer = volService.copyVolume(srcVol, secStore);
+            // Check if you got a valid answer.
+            try {
+                cvResult = cvAnswer.get();
+            } catch (InterruptedException e1) {
+                s_logger.debug("failed copy volume", e1);
+                throw new CloudRuntimeException("Failed to copy volume", e1);
+            } catch (ExecutionException e1) {
+                s_logger.debug("failed copy volume", e1);
+                throw new CloudRuntimeException("Failed to copy volume", e1);
+            }
+            if (cvResult == null || cvResult.isFailed()) {
+                String errorString = "Failed to copy the volume from the source primary storage pool to secondary storage.";
+                throw new CloudRuntimeException(errorString);
+            }
         }
-
-        VolumeInfo vol = cvResult.getVolume();
+        VolumeInfo vol = cvResult != null ? cvResult.getVolume() : destVol;
 
         String extractUrl = secStore.createEntityExtractUrl(vol.getPath(), vol.getFormat(), vol);
         VolumeDataStoreVO volumeStoreRef = _volumeStoreDao.findByVolume(volumeId);
diff --git a/server/src/main/java/com/cloud/storage/download/DownloadListener.java b/server/src/main/java/com/cloud/storage/download/DownloadListener.java
index 51f9d42..25dffb3 100644
--- a/server/src/main/java/com/cloud/storage/download/DownloadListener.java
+++ b/server/src/main/java/com/cloud/storage/download/DownloadListener.java
@@ -297,7 +297,7 @@ public class DownloadListener implements Listener {
         }*/
         else if (cmd instanceof StartupSecondaryStorageCommand) {
             try{
-                List<DataStore> imageStores = _storeMgr.getImageStoresByScope(new ZoneScope(agent.getDataCenterId()));
+                List<DataStore> imageStores = _storeMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(agent.getDataCenterId()));
                 for (DataStore store : imageStores) {
                     _volumeSrv.handleVolumeSync(store);
                     _imageSrv.handleTemplateSync(store);
diff --git a/server/src/main/java/com/cloud/storage/secondary/SecondaryStorageVmManager.java b/server/src/main/java/com/cloud/storage/secondary/SecondaryStorageVmManager.java
index 5c50d46..b57b443 100644
--- a/server/src/main/java/com/cloud/storage/secondary/SecondaryStorageVmManager.java
+++ b/server/src/main/java/com/cloud/storage/secondary/SecondaryStorageVmManager.java
@@ -23,6 +23,7 @@ import com.cloud.agent.api.StartupCommand;
 import com.cloud.host.HostVO;
 import com.cloud.utils.Pair;
 import com.cloud.utils.component.Manager;
+import com.cloud.vm.SecondaryStorageVm;
 import com.cloud.vm.SecondaryStorageVmVO;
 
 public interface SecondaryStorageVmManager extends Manager {
@@ -31,6 +32,7 @@ public interface SecondaryStorageVmManager extends Manager {
     public static final int DEFAULT_SS_VM_CPUMHZ = 500;             // 500 MHz
     public static final int DEFAULT_SS_VM_MTUSIZE = 1500;
     public static final int DEFAULT_SS_VM_CAPACITY = 50;            // max command execution session per SSVM
+    public static final int DEFAULT_MIGRATE_SS_VM_CAPACITY = 2;     // number of concurrent migrate operations to happen per SSVM
     public static final int DEFAULT_STANDBY_CAPACITY = 10;          // standy capacity to reserve per zone
 
     public static final String ALERT_SUBJECT = "secondarystoragevm-alert";
@@ -56,4 +58,6 @@ public interface SecondaryStorageVmManager extends Manager {
     public List<HostVO> listUpAndConnectingSecondaryStorageVmHost(Long dcId);
 
     public HostVO pickSsvmHost(HostVO ssHost);
+
+    void allocCapacity(long dataCenterId, SecondaryStorageVm.Role role);
 }
diff --git a/server/src/main/java/com/cloud/template/HypervisorTemplateAdapter.java b/server/src/main/java/com/cloud/template/HypervisorTemplateAdapter.java
index 85c4a77..80ca469 100644
--- a/server/src/main/java/com/cloud/template/HypervisorTemplateAdapter.java
+++ b/server/src/main/java/com/cloud/template/HypervisorTemplateAdapter.java
@@ -16,10 +16,6 @@
 // under the License.
 package com.cloud.template;
 
-import com.cloud.agent.api.Answer;
-import com.cloud.host.HostVO;
-import com.cloud.hypervisor.Hypervisor;
-import com.cloud.resource.ResourceManager;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -29,26 +25,18 @@ import java.util.concurrent.ExecutionException;
 
 import javax.inject.Inject;
 
-import com.cloud.configuration.Config;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.db.TransactionCallback;
-import com.cloud.utils.db.TransactionStatus;
 import org.apache.cloudstack.agent.directdownload.CheckUrlAnswer;
 import org.apache.cloudstack.agent.directdownload.CheckUrlCommand;
-import org.apache.cloudstack.api.command.user.iso.GetUploadParamsForIsoCmd;
-import org.apache.cloudstack.api.command.user.template.GetUploadParamsForTemplateCmd;
-import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
-import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
-import org.apache.cloudstack.storage.command.TemplateOrVolumePostUploadCommand;
-import org.apache.cloudstack.utils.security.DigestHelper;
-import org.apache.log4j.Logger;
-import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.api.command.user.iso.DeleteIsoCmd;
+import org.apache.cloudstack.api.command.user.iso.GetUploadParamsForIsoCmd;
 import org.apache.cloudstack.api.command.user.iso.RegisterIsoCmd;
 import org.apache.cloudstack.api.command.user.template.DeleteTemplateCmd;
+import org.apache.cloudstack.api.command.user.template.GetUploadParamsForTemplateCmd;
 import org.apache.cloudstack.api.command.user.template.RegisterTemplateCmd;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
 import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
 import org.apache.cloudstack.engine.subsystem.api.storage.Scope;
 import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
@@ -62,11 +50,17 @@ import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
 import org.apache.cloudstack.framework.async.AsyncRpcContext;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
 import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.storage.command.TemplateOrVolumePostUploadCommand;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
 import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
+import org.apache.cloudstack.utils.security.DigestHelper;
+import org.apache.log4j.Logger;
 
 import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
 import com.cloud.alert.AlertManager;
+import com.cloud.configuration.Config;
 import com.cloud.configuration.Resource.ResourceType;
 import com.cloud.dc.DataCenterVO;
 import com.cloud.dc.dao.DataCenterDao;
@@ -74,11 +68,11 @@ import com.cloud.event.EventTypes;
 import com.cloud.event.UsageEventUtils;
 import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.exception.ResourceAllocationException;
+import com.cloud.host.HostVO;
+import com.cloud.hypervisor.Hypervisor;
 import com.cloud.org.Grouping;
+import com.cloud.resource.ResourceManager;
 import com.cloud.server.StatsCollector;
-import com.cloud.template.VirtualMachineTemplate.State;
-import com.cloud.user.Account;
-import com.cloud.utils.Pair;
 import com.cloud.storage.ScopeType;
 import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.Storage.TemplateType;
@@ -89,9 +83,15 @@ import com.cloud.storage.VMTemplateZoneVO;
 import com.cloud.storage.dao.VMTemplateDao;
 import com.cloud.storage.dao.VMTemplateZoneDao;
 import com.cloud.storage.download.DownloadMonitor;
+import com.cloud.template.VirtualMachineTemplate.State;
+import com.cloud.user.Account;
+import com.cloud.utils.Pair;
 import com.cloud.utils.UriUtils;
 import com.cloud.utils.db.DB;
 import com.cloud.utils.db.EntityManager;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 
 public class HypervisorTemplateAdapter extends TemplateAdapterBase {
@@ -241,14 +241,12 @@ public class HypervisorTemplateAdapter extends TemplateAdapterBase {
 
     private void createTemplateWithinZone(Long zId, TemplateProfile profile, VMTemplateVO template) {
         // find all eligible image stores for this zone scope
-        List<DataStore> imageStores = storeMgr.getImageStoresByScope(new ZoneScope(zId));
+        List<DataStore> imageStores = storeMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(zId));
         if (imageStores == null || imageStores.size() == 0) {
             throw new CloudRuntimeException("Unable to find image store to download template " + profile.getTemplate());
         }
 
         Set<Long> zoneSet = new HashSet<Long>();
-        Collections.shuffle(imageStores);
-        // For private templates choose a random store. TODO - Have a better algorithm based on size, no. of objects, load etc.
         for (DataStore imageStore : imageStores) {
             // skip data stores for a disabled zone
             Long zoneId = imageStore.getScope().getScopeId();
@@ -308,7 +306,7 @@ public class HypervisorTemplateAdapter extends TemplateAdapterBase {
                     zoneId = profile.getZoneIdList().get(0);
 
                 // find all eligible image stores for this zone scope
-                List<DataStore> imageStores = storeMgr.getImageStoresByScope(new ZoneScope(zoneId));
+                List<DataStore> imageStores = storeMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(zoneId));
                 if (imageStores == null || imageStores.size() == 0) {
                     throw new CloudRuntimeException("Unable to find image store to download template " + profile.getTemplate());
                 }
diff --git a/server/src/main/java/com/cloud/template/TemplateManagerImpl.java b/server/src/main/java/com/cloud/template/TemplateManagerImpl.java
index 749f272..86eabe4 100755
--- a/server/src/main/java/com/cloud/template/TemplateManagerImpl.java
+++ b/server/src/main/java/com/cloud/template/TemplateManagerImpl.java
@@ -754,7 +754,7 @@ public class TemplateManagerImpl extends ManagerBase implements TemplateManager,
         long tmpltId = template.getId();
         long dstZoneId = dstZone.getId();
         // find all eligible image stores for the destination zone
-        List<DataStore> dstSecStores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(dstZoneId));
+        List<DataStore> dstSecStores = _dataStoreMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(dstZoneId));
         if (dstSecStores == null || dstSecStores.isEmpty()) {
             throw new StorageUnavailableException("Destination zone is not ready, no image store associated", DataCenter.class, dstZone.getId());
         }
diff --git a/server/src/main/java/org/apache/cloudstack/diagnostics/DiagnosticsServiceImpl.java b/server/src/main/java/org/apache/cloudstack/diagnostics/DiagnosticsServiceImpl.java
index 49ad215..0184a44 100644
--- a/server/src/main/java/org/apache/cloudstack/diagnostics/DiagnosticsServiceImpl.java
+++ b/server/src/main/java/org/apache/cloudstack/diagnostics/DiagnosticsServiceImpl.java
@@ -372,7 +372,7 @@ public class DiagnosticsServiceImpl extends ManagerBase implements PluggableServ
      * @return a valid secondary storage with less than DiskQuotaPercentageThreshold set by global config
      */
     private DataStore getImageStore(Long zoneId) {
-        List<DataStore> stores = storeMgr.getImageStoresByScope(new ZoneScope(zoneId));
+        List<DataStore> stores = storeMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(zoneId));
         if (CollectionUtils.isEmpty(stores)) {
             throw new CloudRuntimeException("No Secondary storage found in Zone with Id: " + zoneId);
         }
diff --git a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
index 91522f3..672f7c0 100644
--- a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
+++ b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
@@ -262,6 +262,8 @@
 
     <bean id="vMSnapshotManagerImpl" class="com.cloud.vm.snapshot.VMSnapshotManagerImpl" />
 
+    <bean id="imageStoreServiceImpl" class="com.cloud.storage.ImageStoreServiceImpl" />
+
     <bean id="AffinityGroupServiceImpl"
         class="org.apache.cloudstack.affinity.AffinityGroupServiceImpl">
         <property name="affinityGroupProcessors"
diff --git a/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java b/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java
index e695329..8c11f77 100644
--- a/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java
+++ b/server/src/test/java/com/cloud/configuration/ConfigurationManagerTest.java
@@ -694,7 +694,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -712,7 +712,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -730,7 +730,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -744,7 +744,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -758,7 +758,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -776,7 +776,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(arrayList);
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -794,7 +794,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(arrayList);
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(new ArrayList<PhysicalNetworkVO>());
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
@@ -812,7 +812,7 @@ public class ConfigurationManagerTest {
         Mockito.when(_vmInstanceDao.listByZoneId(anyLong())).thenReturn(new ArrayList<VMInstanceVO>());
         Mockito.when(_volumeDao.findByDc(anyLong())).thenReturn(new ArrayList<VolumeVO>());
         Mockito.when(_physicalNetworkDao.listByZone(anyLong())).thenReturn(arrayList);
-        Mockito.when(_imageStoreDao.findByScope(any(ZoneScope.class))).thenReturn(new ArrayList<ImageStoreVO>());
+        Mockito.when(_imageStoreDao.findByZone(any(ZoneScope.class), anyBoolean())).thenReturn(new ArrayList<ImageStoreVO>());
 
         configurationMgr.checkIfZoneIsDeletable(new Random().nextLong());
     }
diff --git a/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/PremiumSecondaryStorageManagerImpl.java b/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/PremiumSecondaryStorageManagerImpl.java
index ecfc67e..d21ec61 100644
--- a/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/PremiumSecondaryStorageManagerImpl.java
+++ b/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/PremiumSecondaryStorageManagerImpl.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.apache.cloudstack.secondarystorage;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -27,16 +29,19 @@ import org.apache.log4j.Logger;
 
 import com.cloud.agent.api.Command;
 import com.cloud.configuration.Config;
+import com.cloud.host.Host;
 import com.cloud.host.HostVO;
 import com.cloud.host.Status;
 import com.cloud.host.dao.HostDao;
 import com.cloud.resource.ResourceManager;
 import com.cloud.secstorage.CommandExecLogDao;
 import com.cloud.secstorage.CommandExecLogVO;
+import com.cloud.storage.StorageManager;
 import com.cloud.storage.secondary.SecondaryStorageVmManager;
 import com.cloud.utils.DateUtil;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
+import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.JoinBuilder.JoinType;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
@@ -51,8 +56,13 @@ public class PremiumSecondaryStorageManagerImpl extends SecondaryStorageManagerI
     private static final Logger s_logger = Logger.getLogger(PremiumSecondaryStorageManagerImpl.class);
 
     private int _capacityPerSSVM = SecondaryStorageVmManager.DEFAULT_SS_VM_CAPACITY;
+    private int migrateCapPerSSVM = DEFAULT_MIGRATE_SS_VM_CAPACITY;
     private int _standbyCapacity = SecondaryStorageVmManager.DEFAULT_STANDBY_CAPACITY;
     private int _maxExecutionTimeMs = 1800000;
+    private int maxDataMigrationWaitTime = 900000;
+    long currentTime = DateUtil.currentGMTTime().getTime();
+    long nextSpawnTime = currentTime + maxDataMigrationWaitTime;
+    private List<SecondaryStorageVmVO> migrationSSVMS = new ArrayList<>();
 
     @Inject
     SecondaryStorageVmDao _secStorageVmDao;
@@ -63,6 +73,7 @@ public class PremiumSecondaryStorageManagerImpl extends SecondaryStorageManagerI
     @Inject
     ResourceManager _resourceMgr;
     protected SearchBuilder<CommandExecLogVO> activeCommandSearch;
+    protected SearchBuilder<CommandExecLogVO> activeCopyCommandSearch;
     protected SearchBuilder<HostVO> hostSearch;
 
     @Override
@@ -75,16 +86,27 @@ public class PremiumSecondaryStorageManagerImpl extends SecondaryStorageManagerI
         int nMaxExecutionMinutes = NumbersUtil.parseInt(_configDao.getValue(Config.SecStorageCmdExecutionTimeMax.key()), 30);
         _maxExecutionTimeMs = nMaxExecutionMinutes * 60 * 1000;
 
+        migrateCapPerSSVM = StorageManager.SecStorageMaxMigrateSessions.value();
+        int nMaxDataMigrationWaitTime = StorageManager.MaxDataMigrationWaitTime.value();
+        maxDataMigrationWaitTime = nMaxDataMigrationWaitTime * 60 * 1000;
+        nextSpawnTime = currentTime + maxDataMigrationWaitTime;
+
         hostSearch = _hostDao.createSearchBuilder();
         hostSearch.and("dc", hostSearch.entity().getDataCenterId(), Op.EQ);
         hostSearch.and("status", hostSearch.entity().getStatus(), Op.EQ);
 
         activeCommandSearch = _cmdExecLogDao.createSearchBuilder();
         activeCommandSearch.and("created", activeCommandSearch.entity().getCreated(), Op.GTEQ);
-        activeCommandSearch.join("hostSearch", hostSearch, activeCommandSearch.entity().getInstanceId(), hostSearch.entity().getId(), JoinType.INNER);
+        activeCommandSearch.join("hostSearch", hostSearch, activeCommandSearch.entity().getHostId(), hostSearch.entity().getId(), JoinType.INNER);
+
+        activeCopyCommandSearch = _cmdExecLogDao.createSearchBuilder();
+        activeCopyCommandSearch.and("created", activeCopyCommandSearch.entity().getCreated(), Op.GTEQ);
+        activeCopyCommandSearch.and("command_name", activeCopyCommandSearch.entity().getCommandName(), Op.EQ);
+        activeCopyCommandSearch.join("hostSearch", hostSearch, activeCopyCommandSearch.entity().getHostId(), hostSearch.entity().getId(), JoinType.INNER);
 
         hostSearch.done();
         activeCommandSearch.done();
+        activeCopyCommandSearch.done();
         return true;
     }
 
@@ -96,7 +118,6 @@ public class PremiumSecondaryStorageManagerImpl extends SecondaryStorageManagerI
         }
 
         Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _maxExecutionTimeMs);
-
         _cmdExecLogDao.expungeExpiredRecords(cutTime);
 
         boolean suspendAutoLoading = !reserveStandbyCapacity();
@@ -134,19 +155,54 @@ public class PremiumSecondaryStorageManagerImpl extends SecondaryStorageManagerI
                 return new Pair<AfterScanAction, Object>(AfterScanAction.nop, null);
             }
 
-            alreadyRunning = _secStorageVmDao.getSecStorageVmListInStates(null, dataCenterId, State.Running, State.Migrating, State.Starting);
 
+            alreadyRunning = _secStorageVmDao.getSecStorageVmListInStates(null, dataCenterId, State.Running, State.Migrating, State.Starting);
             List<CommandExecLogVO> activeCmds = findActiveCommands(dataCenterId, cutTime);
-            if (alreadyRunning.size() * _capacityPerSSVM - activeCmds.size() < _standbyCapacity) {
-                s_logger.info("secondary storage command execution standby capactiy low (running VMs: " + alreadyRunning.size() + ", active cmds: " + activeCmds.size() +
-                        "), starting a new one");
-                return new Pair<AfterScanAction, Object>(AfterScanAction.expand, SecondaryStorageVm.Role.commandExecutor);
-            }
+            List<CommandExecLogVO> copyCmdsInPipeline = findAllActiveCopyCommands(dataCenterId, cutTime);
+            return scaleSSVMOnLoad(alreadyRunning, activeCmds, copyCmdsInPipeline, dataCenterId);
+
         }
+        return new Pair<AfterScanAction, Object>(AfterScanAction.nop, null);
+    }
+
+    private Pair<AfterScanAction, Object> scaleSSVMOnLoad(List<SecondaryStorageVmVO> alreadyRunning, List<CommandExecLogVO> activeCmds,
+                                                    List<CommandExecLogVO> copyCmdsInPipeline, long dataCenterId) {
+        Integer hostsCount = _hostDao.countAllByTypeInZone(dataCenterId, Host.Type.Routing);
+        Integer maxSsvms = (hostsCount < MaxNumberOfSsvmsForMigration.value()) ? hostsCount : MaxNumberOfSsvmsForMigration.value();
+        int halfLimit = Math.round((float) (alreadyRunning.size() * migrateCapPerSSVM) / 2);
+        currentTime = DateUtil.currentGMTTime().getTime();
+        if (alreadyRunning.size() * _capacityPerSSVM - activeCmds.size() < _standbyCapacity) {
+            s_logger.info("secondary storage command execution standby capactiy low (running VMs: " + alreadyRunning.size() + ", active cmds: " + activeCmds.size() +
+                    "), starting a new one");
+            return new Pair<AfterScanAction, Object>(AfterScanAction.expand, SecondaryStorageVm.Role.commandExecutor);
+        }
+        else if (!copyCmdsInPipeline.isEmpty()  && copyCmdsInPipeline.size() >= halfLimit &&
+                ((Math.abs(currentTime - copyCmdsInPipeline.get(halfLimit - 1).getCreated().getTime()) > maxDataMigrationWaitTime )) &&
+                (currentTime > nextSpawnTime) &&  alreadyRunning.size() <=  maxSsvms) {
+            nextSpawnTime = currentTime + maxDataMigrationWaitTime;
+            s_logger.debug("scaling SSVM to handle migration tasks");
+            return new Pair<AfterScanAction, Object>(AfterScanAction.expand, SecondaryStorageVm.Role.commandExecutor);
 
+        }
+        scaleDownSSVMOnLoad(alreadyRunning, activeCmds, copyCmdsInPipeline);
         return new Pair<AfterScanAction, Object>(AfterScanAction.nop, null);
     }
 
+    private void scaleDownSSVMOnLoad(List<SecondaryStorageVmVO> alreadyRunning, List<CommandExecLogVO> activeCmds,
+                               List<CommandExecLogVO> copyCmdsInPipeline)  {
+        int halfLimit = Math.round((float) (alreadyRunning.size() * migrateCapPerSSVM) / 2);
+        if ((copyCmdsInPipeline.size() < halfLimit && alreadyRunning.size() * _capacityPerSSVM - activeCmds.size() > (_standbyCapacity + 5)) && alreadyRunning.size() > 1) {
+            Collections.reverse(alreadyRunning);
+            for(SecondaryStorageVmVO vm : alreadyRunning) {
+                long count = activeCmds.stream().filter(cmd -> cmd.getInstanceId() == vm.getId()).count();
+                if (count == 0 && copyCmdsInPipeline.size() == 0 && vm.getRole() != SecondaryStorageVm.Role.templateProcessor) {
+                    destroySecStorageVm(vm.getId());
+                    break;
+                }
+            }
+        }
+    }
+
     @Override
     public Pair<HostVO, SecondaryStorageVmVO> assignSecStorageVm(long zoneId, Command cmd) {
 
@@ -159,26 +215,33 @@ public class PremiumSecondaryStorageManagerImpl extends SecondaryStorageManagerI
             if (host != null && host.getStatus() == Status.Up)
                 return new Pair<HostVO, SecondaryStorageVmVO>(host, secStorageVm);
         }
-
         return null;
     }
 
     private List<CommandExecLogVO> findActiveCommands(long dcId, Date cutTime) {
         SearchCriteria<CommandExecLogVO> sc = activeCommandSearch.create();
-
         sc.setParameters("created", cutTime);
         sc.setJoinParameters("hostSearch", "dc", dcId);
         sc.setJoinParameters("hostSearch", "status", Status.Up);
-
+        List<CommandExecLogVO> result = _cmdExecLogDao.search(sc, null);
         return _cmdExecLogDao.search(sc, null);
     }
 
+    private List<CommandExecLogVO> findAllActiveCopyCommands(long dcId, Date cutTime) {
+        SearchCriteria<CommandExecLogVO> sc = activeCopyCommandSearch.create();
+        sc.setParameters("created", cutTime);
+        sc.setParameters("command_name", "DataMigrationCommand");
+        sc.setJoinParameters("hostSearch", "dc", dcId);
+        sc.setJoinParameters("hostSearch", "status", Status.Up);
+        Filter filter = new Filter(CommandExecLogVO.class, "created", true, null, null);
+        return _cmdExecLogDao.search(sc, filter);
+    }
+
     private boolean reserveStandbyCapacity() {
         String value = _configDao.getValue(Config.SystemVMAutoReserveCapacity.key());
         if (value != null && value.equalsIgnoreCase("true")) {
             return true;
         }
-
         return false;
     }
 }
diff --git a/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/SecondaryStorageManagerImpl.java b/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/SecondaryStorageManagerImpl.java
index efa43cb..5ee60eb 100644
--- a/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/SecondaryStorageManagerImpl.java
+++ b/services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/SecondaryStorageManagerImpl.java
@@ -266,6 +266,9 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
     static final ConfigKey<String> NTPServerConfig = new ConfigKey<String>(String.class, "ntp.server.list", "Advanced", null,
             "Comma separated list of NTP servers to configure in Secondary storage VM", false, ConfigKey.Scope.Global, null);
 
+    static final ConfigKey<Integer> MaxNumberOfSsvmsForMigration = new ConfigKey<Integer>("Advanced", Integer.class, "max.ssvm.count", "5",
+            "Number of additional SSVMs to handle migration of data objects concurrently", true, ConfigKey.Scope.Global);
+
     public SecondaryStorageManagerImpl() {
     }
 
@@ -720,7 +723,7 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
         return null;
     }
 
-    private void allocCapacity(long dataCenterId, SecondaryStorageVm.Role role) {
+    public void allocCapacity(long dataCenterId, SecondaryStorageVm.Role role) {
         if (s_logger.isTraceEnabled()) {
             s_logger.trace("Allocate secondary storage vm standby capacity for data center : " + dataCenterId);
         }
@@ -822,7 +825,7 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
                 return false;
             }
 
-            List<DataStore> stores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(dataCenterId));
+            List<DataStore> stores = _dataStoreMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(dataCenterId));
             if (stores.size() < 1) {
                 s_logger.debug("No image store added  in zone " + dataCenterId + ", wait until it is ready to launch secondary storage vm");
                 return false;
@@ -1374,7 +1377,7 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
             _secStorageVmDao.getSecStorageVmListInStates(SecondaryStorageVm.Role.templateProcessor, dataCenterId, State.Running, State.Migrating, State.Starting,
                 State.Stopped, State.Stopping);
         int vmSize = (ssVms == null) ? 0 : ssVms.size();
-        List<DataStore> ssStores = _dataStoreMgr.getImageStoresByScope(new ZoneScope(dataCenterId));
+        List<DataStore> ssStores = _dataStoreMgr.getImageStoresByScopeExcludingReadOnly(new ZoneScope(dataCenterId));
         int storeSize = (ssStores == null) ? 0 : ssStores.size();
         if (storeSize > vmSize) {
             s_logger.info("No secondary storage vms found in datacenter id=" + dataCenterId + ", starting a new one");
@@ -1516,7 +1519,7 @@ public class SecondaryStorageManagerImpl extends ManagerBase implements Secondar
 
     @Override
     public ConfigKey<?>[] getConfigKeys() {
-        return new ConfigKey<?>[] {NTPServerConfig};
+        return new ConfigKey<?>[] {NTPServerConfig, MaxNumberOfSsvmsForMigration};
     }
 
 }
diff --git a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java
index 7e25296..0832ab6 100644
--- a/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java
+++ b/services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java
@@ -54,6 +54,7 @@ import java.util.UUID;
 import javax.naming.ConfigurationException;
 
 import org.apache.cloudstack.framework.security.keystore.KeystoreManager;
+import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser;
 import org.apache.cloudstack.storage.command.CopyCmdAnswer;
 import org.apache.cloudstack.storage.command.CopyCommand;
 import org.apache.cloudstack.storage.command.DeleteCommand;
@@ -67,7 +68,6 @@ import org.apache.cloudstack.storage.configdrive.ConfigDrive;
 import org.apache.cloudstack.storage.configdrive.ConfigDriveBuilder;
 import org.apache.cloudstack.storage.template.DownloadManager;
 import org.apache.cloudstack.storage.template.DownloadManagerImpl;
-import org.apache.cloudstack.storage.NfsMountManagerImpl.PathParser;
 import org.apache.cloudstack.storage.template.UploadEntity;
 import org.apache.cloudstack.storage.template.UploadManager;
 import org.apache.cloudstack.storage.template.UploadManagerImpl;
@@ -1050,6 +1050,10 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
         DataStoreTO srcDataStore = srcData.getDataStore();
         DataStoreTO destDataStore = destData.getDataStore();
 
+        if (DataStoreRole.Image == srcDataStore.getRole()  && DataStoreRole.Image == destDataStore.getRole()) {
+            return copyFromNfsToNfs(cmd);
+        }
+
         if (srcData.getObjectType() == DataObjectType.SNAPSHOT && destData.getObjectType() == DataObjectType.TEMPLATE) {
             return createTemplateFromSnapshot(cmd);
         }
@@ -1254,7 +1258,6 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
     }
 
     protected File findFile(String path) {
-
         File srcFile = _storage.getFile(path);
         if (!srcFile.exists()) {
             srcFile = _storage.getFile(path + ".qcow2");
@@ -1275,6 +1278,87 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
         return srcFile;
     }
 
+    protected Answer copyFromNfsToNfs(CopyCommand cmd) {
+        final DataTO srcData = cmd.getSrcTO();
+        final DataTO destData = cmd.getDestTO();
+        DataStoreTO srcDataStore = srcData.getDataStore();
+        NfsTO srcStore = (NfsTO)srcDataStore;
+        DataStoreTO destDataStore = destData.getDataStore();
+        final NfsTO destStore = (NfsTO) destDataStore;
+        try {
+            File srcFile = new File(getDir(srcStore.getUrl(), _nfsVersion), srcData.getPath());
+            File destFile = new File(getDir(destStore.getUrl(), _nfsVersion), destData.getPath());
+            ImageFormat format = getTemplateFormat(srcFile.getName());
+
+            if (srcFile == null) {
+                return new CopyCmdAnswer("Can't find src file:" + srcFile);
+            }
+            if (srcData instanceof TemplateObjectTO || srcData instanceof VolumeObjectTO) {
+                File srcDir = null;
+                if (srcFile.isFile() || srcFile.getName().contains(".")) {
+                    srcDir = new File(srcFile.getParent());
+                }
+                File destDir = null;
+                if (destFile.isFile()) {
+                    destDir = new File(destFile.getParent());
+                }
+                try {
+                    FileUtils.copyDirectory((srcDir == null ? srcFile : srcDir), (destDir == null? destFile : destDir));
+                } catch (IOException e) {
+                    String msg = "Failed to copy file to destination";
+                    s_logger.info(msg);
+                    return new CopyCmdAnswer(msg);
+                }
+            } else {
+                destFile = new File(destFile, srcFile.getName());
+                try {
+                if (srcFile.isFile()) {
+                    FileUtils.copyFile(srcFile, destFile);
+                } else {
+                    // for vmware
+                    srcFile = new File(srcFile.getParent());
+                    FileUtils.copyDirectory(srcFile, destFile);
+                }
+                } catch (IOException e) {
+                    String msg = "Failed to copy file to destination";
+                    s_logger.info(msg);
+                    return new CopyCmdAnswer(msg);
+                }
+            }
+
+            DataTO retObj = null;
+            if (destData.getObjectType() == DataObjectType.TEMPLATE) {
+                TemplateObjectTO newTemplate = new TemplateObjectTO();
+                newTemplate.setPath(destData.getPath() + File.separator + srcFile.getName());
+                newTemplate.setSize(getVirtualSize(srcFile, format));
+                newTemplate.setPhysicalSize(srcFile.length());
+                newTemplate.setFormat(format);
+                retObj = newTemplate;
+            } else if (destData.getObjectType() == DataObjectType.VOLUME) {
+                VolumeObjectTO newVol = new VolumeObjectTO();
+                if (srcFile.isFile()) {
+                    newVol.setPath(destData.getPath() + File.separator + srcFile.getName());
+                } else {
+                    newVol.setPath(destData.getPath());
+                }
+                newVol.setSize(getVirtualSize(srcFile, format));
+                retObj = newVol;
+            } else if (destData.getObjectType() == DataObjectType.SNAPSHOT) {
+                SnapshotObjectTO newSnapshot = new SnapshotObjectTO();
+                if (srcFile.isFile()) {
+                    newSnapshot.setPath(destData.getPath() + File.separator + destFile.getName());
+                } else {
+                    newSnapshot.setPath(destData.getPath() + File.separator + destFile.getName() + File.separator + destFile.getName());
+                }
+                retObj = newSnapshot;
+            }
+            return new CopyCmdAnswer(retObj);
+            } catch (Exception e) {
+                s_logger.error("failed to copy file" + srcData.getPath(), e);
+                return new CopyCmdAnswer("failed to copy file" + srcData.getPath() + e.toString());
+        }
+    }
+
     protected Answer copyFromNfsToS3(CopyCommand cmd) {
         final DataTO srcData = cmd.getSrcTO();
         final DataTO destData = cmd.getDestTO();
@@ -2433,6 +2517,18 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
 
     }
 
+    private String getDir(String secUrl, String nfsVersion) {
+        try {
+            URI uri = new URI(secUrl);
+            String dir = mountUri(uri, nfsVersion);
+            return _parent + "/" + dir;
+        } catch (Exception e) {
+            String msg = "GetRootDir for " + secUrl + " failed due to " + e.toString();
+            s_logger.error(msg, e);
+            throw new CloudRuntimeException(msg);
+        }
+    }
+
     @Override
     synchronized public String getRootDir(String secUrl, String nfsVersion) {
         if (!_inSystemVM) {
diff --git a/test/integration/smoke/test_secondary_storage.py b/test/integration/smoke/test_secondary_storage.py
index b80b3e6..baa0f98 100644
--- a/test/integration/smoke/test_secondary_storage.py
+++ b/test/integration/smoke/test_secondary_storage.py
@@ -24,6 +24,8 @@ from marvin.lib.utils import *
 from marvin.lib.base import *
 from marvin.lib.common import *
 from nose.plugins.attrib import attr
+from marvin.cloudstackAPI import (listImageStores)
+from marvin.cloudstackAPI import (updateImageStore)
 
 #Import System modules
 import time
@@ -224,3 +226,174 @@ class TestSecStorageServices(cloudstackTestCase):
                         True,
                         "Builtin template is not ready %s in zone %s"%(template.status, zid)
                     )
+
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic", "eip", "sg"], required_hardware="false")
+    def test_03_check_read_only_flag(self):
+        """Test the secondary storage read-only flag
+        """
+
+        # Validate the following
+        # It is possible to enable/disable the read-only flag on a secondary storage and filter by it
+        # 1. Make the first secondary storage as read-only and verify its state has been changed
+        # 2. Search for the read-only storages and make sure ours is in the list
+        # 3. Make it again read/write and verify it has been set properly
+
+        first_storage = self.list_secondary_storages(self.apiclient)[0]
+        first_storage_id = first_storage['id']
+        # Step 1
+        self.update_secondary_storage(self.apiclient, first_storage_id, True)
+        updated_storage = self.list_secondary_storages(self.apiclient, first_storage_id)[0]
+        self.assertEqual(
+            updated_storage['readonly'],
+            True,
+            "Check if the secondary storage status has been set to read-only"
+        )
+
+        # Step 2
+        readonly_storages = self.list_secondary_storages(self.apiclient, readonly=True)
+        self.assertEqual(
+            isinstance(readonly_storages, list),
+            True,
+            "Check list response returns a valid list"
+        )
+        result = any(d['id'] == first_storage_id for d in readonly_storages)
+        self.assertEqual(
+            result,
+            True,
+            "Check if we are able to list storages by their read-only status"
+        )
+
+        # Step 3
+        self.update_secondary_storage(self.apiclient, first_storage_id, False)
+        updated_storage = self.list_secondary_storages(self.apiclient, first_storage_id)[0]
+        self.assertEqual(
+            updated_storage['readonly'],
+            False,
+            "Check if the secondary storage status has been set back to read-write"
+        )
+
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic", "eip", "sg"], required_hardware="false")
+    def test_04_migrate_to_read_only_storage(self):
+        """Test migrations to a read-only secondary storage
+        """
+
+        # Validate the following
+        # It is not possible to migrate a storage to a read-only one
+        # NOTE: This test requires more than one secondary storage in the system
+        # 1. Make the first storage read-only
+        # 2. Try complete migration from the second to the first storage - it should fail
+        # 3. Try balanced migration from the second to the first storage - it should fail
+        # 4. Make the first storage read-write again
+
+        storages = self.list_secondary_storages(self.apiclient)
+        if (len(storages)) < 2:
+            self.skipTest(
+                "This test requires more than one secondary storage")
+
+        first_storage = self.list_secondary_storages(self.apiclient)[0]
+        first_storage_id = first_storage['id']
+        second_storage = self.list_secondary_storages(self.apiclient)[1]
+        second_storage_id = second_storage['id']
+
+        # Set the first storage to read-only
+        self.update_secondary_storage(self.apiclient, first_storage_id, True)
+
+        # Try complete migration from second to the first storage
+
+
+        success = False
+        try:
+            self.migrate_secondary_storage(self.apiclient, second_storage_id, first_storage_id, "complete")
+        except Exception as ex:
+            if re.search("No destination valid store\(s\) available to migrate.", str(ex)):
+                success = True
+            else:
+                self.debug("Secondary storage complete migration to a read-only one\
+                            did not fail appropriately. Error was actually : " + str(ex));
+
+        self.assertEqual(success, True, "Check if a complete migration to a read-only storage one fails appropriately")
+
+        # Try balanced migration from second to the first storage
+        success = False
+        try:
+            self.migrate_secondary_storage(self.apiclient, second_storage_id, first_storage_id, "balance")
+        except Exception as ex:
+            if re.search("No destination valid store\(s\) available to migrate.", str(ex)):
+                success = True
+            else:
+                self.debug("Secondary storage balanced migration to a read-only one\
+                            did not fail appropriately. Error was actually : " + str(ex))
+
+        self.assertEqual(success, True, "Check if a balanced migration to a read-only storage one fails appropriately")
+
+        # Set the first storage back to read-write
+        self.update_secondary_storage(self.apiclient, first_storage_id, False)
+
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic", "eip", "sg"], required_hardware="false")
+    def test_05_migrate_to_less_free_space(self):
+        """Test migrations when the destination storage has less space
+        """
+
+        # Validate the following
+        # Migration to a secondary storage with less space should be refused
+        # NOTE: This test requires more than one secondary storage in the system
+        # 1. Try complete migration from a storage with more (or equal) free space - migration should be refused
+
+        storages = self.list_secondary_storages(self.apiclient)
+        if (len(storages)) < 2:
+            self.skipTest(
+                "This test requires more than one secondary storage")
+
+        first_storage = self.list_secondary_storages(self.apiclient)[0]
+        first_storage_disksizeused = first_storage['disksizeused']
+        first_storage_disksizetotal = first_storage['disksizetotal']
+        second_storage = self.list_secondary_storages(self.apiclient)[1]
+        second_storage_disksizeused = second_storage['disksizeused']
+        second_storage_disksizetotal = second_storage['disksizetotal']
+
+        first_storage_freespace = first_storage_disksizetotal - first_storage_disksizeused
+        second_storage_freespace = second_storage_disksizetotal - second_storage_disksizeused
+
+        if first_storage_freespace == second_storage_freespace:
+            self.skipTest(
+                "This test requires two secondary storages with different free space")
+
+        # Setting the storage with more free space as source storage
+        if first_storage_freespace > second_storage_freespace:
+            src_storage = first_storage['id']
+            dst_storage = second_storage['id']
+        else:
+            src_storage = second_storage['id']
+            dst_storage = first_storage['id']
+
+        response = self.migrate_secondary_storage(self.apiclient, src_storage, dst_storage, "complete")
+
+        success = False
+        if re.search("has equal or more free space than destination", str(response)):
+            success = True
+        else:
+            self.debug("Secondary storage complete migration to a storage \
+            with less space was not refused. Here is the command output : " + str(response))
+
+        self.assertEqual(success, True, "Secondary storage complete migration to a storage\
+                        with less space was properly refused.")
+
+    def list_secondary_storages(self, apiclient, id=None, readonly=None):
+        cmd = listImageStores.listImageStoresCmd()
+        cmd.id = id
+        cmd.readonly = readonly
+        return apiclient.listImageStores(cmd)
+
+    def update_secondary_storage(self, apiclient, id, readonly):
+        cmd = updateImageStore.updateImageStoreCmd()
+        cmd.id = id
+        cmd.readonly = readonly
+        apiclient.updateImageStore(cmd)
+
+    def migrate_secondary_storage(self, apiclient, first_id, second_id, type):
+        cmd = migrateSecondaryStorageData.migrateSecondaryStorageDataCmd()
+        cmd.srcpool = first_id
+        cmd.destpools = second_id
+        cmd.migrationtype = type
+        response = apiclient.migrateSecondaryStorageData(cmd)
+        return response
diff --git a/test/integration/smoke/test_templates.py b/test/integration/smoke/test_templates.py
index 9e9dd9f..ae34f76 100644
--- a/test/integration/smoke/test_templates.py
+++ b/test/integration/smoke/test_templates.py
@@ -961,6 +961,40 @@ class TestTemplates(cloudstackTestCase):
                         )
         return
 
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic", "sg"], required_hardware="false")
+    def test_09_list_templates_download_details(self):
+        """Test if list templates returns download details"""
+
+        # Validate the following
+        # 1. ListTemplates API has been extended to support viewing the download details - progress, download states and datastore
+
+        list_template_response = Template.list(
+                                    self.apiclient,
+                                    templatefilter='all',
+                                    account=self.user.name,
+                                    domainid=self.user.domainid
+                                    )
+        self.assertEqual(
+                            isinstance(list_template_response, list),
+                            True,
+                            "Check list response returns a valid list"
+                        )
+
+        self.assertNotEqual(
+                            len(list_template_response),
+                            0,
+                            "Check template available in List Templates"
+                        )
+
+        for template in list_template_response:
+            self.assertNotEqual(
+                        len(template.downloaddetails),
+                        0,
+                        "Not all templates have download details"
+                        )
+
+        return
+
 class TestCopyAndDeleteTemplatesAcrossZones(cloudstackTestCase):
 
     @classmethod
diff --git a/tools/apidoc/gen_toc.py b/tools/apidoc/gen_toc.py
index ef98b13..ca1c44f 100644
--- a/tools/apidoc/gen_toc.py
+++ b/tools/apidoc/gen_toc.py
@@ -155,6 +155,7 @@ known_categories = {
     'createSecondaryStagingStore': 'Image Store',
     'deleteSecondaryStagingStore': 'Image Store',
     'listSecondaryStagingStores': 'Image Store',
+    'updateImageStore': 'Image Store',
     'InternalLoadBalancer': 'Internal LB',
 	'DeploymentPlanners': 'Configuration',
 	'ObjectStore': 'Image Store',