You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2022/10/17 01:57:53 UTC

[airavata-mft] branch master updated: Simplifying MFT transfer API. Removing intermediate resource creation for transfer submission

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fd60fc  Simplifying MFT transfer API. Removing intermediate resource creation for transfer submission
9fd60fc is described below

commit 9fd60fc4b48628171353a7be496ffe7cc6060c09
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Sun Oct 16 21:57:38 2022 -0400

    Simplifying MFT transfer API. Removing intermediate resource creation for transfer submission
---
 .../org/apache/airavata/mft/agent/AppConfig.java   |  14 ++
 .../org/apache/airavata/mft/agent/MFTAgent.java    |  29 ++-
 .../airavata/mft/agent/TransportMediator.java      |  22 +-
 .../mft/agent/http/AgentHttpDownloadData.java      |  16 --
 .../airavata/mft/agent/http/HttpServerHandler.java |   4 +-
 .../apache/airavata/mft/agent/rpc/RPCParser.java   |  92 +++-----
 .../airavata/mft/api/client/examples/Example.java  |  20 --
 .../airavata/mft/api/handler/MFTApiHandler.java    |  76 ++-----
 api/stub/src/main/proto/MFTTransferApi.proto       |  47 ++--
 .../line/sub/odata/ODataRemoteAddSubCommand.java   |   8 +-
 .../mft/command/line/sub/s3/S3SubCommand.java      |   3 +-
 .../line/sub/s3/secret/S3SecretSubCommand.java     |  27 ---
 .../sub/s3/storage/S3StorageAddSubCommand.java     |  12 +-
 .../command/line/sub/swift/SwiftAddSubCommand.java |  18 +-
 .../sub/transfer/SubmitTransferSubCommand.java     |  35 +--
 .../airavata/mft/core/api/ConnectorConfig.java     |  48 ++++-
 .../mft/core/api/IncomingStreamingConnector.java   |   1 -
 .../airavata/mft/core/api/MetadataCollector.java   |  55 +----
 .../mft/core/api/OutgoingStreamingConnector.java   |   1 -
 examples/pom.xml                                   |  43 ----
 .../mft/examples/http/DownloadExample.java         |  43 ----
 .../airavata/mft/examples/metadata/SCPExample.java |  71 ------
 .../mft/examples/transfer/LocalExample.java        |  66 ------
 .../airavata/mft/examples/transfer/S3Example.java  |  68 ------
 .../airavata/mft/examples/transfer/SCPExample.java |  68 ------
 examples/src/main/python/.gitignore                |   4 -
 examples/src/main/python/directory_browse.py       |  18 --
 examples/src/main/python/download_http.py          |  19 --
 examples/src/main/python/readme.md                 |  11 -
 examples/src/main/python/requirements.txt          |   3 -
 examples/src/main/resources/logback.xml            |  54 -----
 pom.xml                                            |   1 -
 .../mft/resource/client/StorageServiceClient.java  |   4 +
 .../resource/server/backend/ResourceBackend.java   |   4 +
 .../backend/file/FileBasedResourceBackend.java     |   7 +
 .../server/backend/sql/SQLResourceBackend.java     |  47 ++++
 .../backend/sql/entity/ResolveStorageEntity.java   |  56 +++++
 .../sql/repository/ResolveStorageRepository.java   |   8 +-
 .../handler/StorageCommonServiceHandler.java       |  55 +++++
 .../stub/src/main/proto/common/StorageCommon.proto |  16 +-
 .../transport/azure/AzureMetadataCollector.java    |  83 ++++---
 .../mft/transport/box/BoxMetadataCollector.java    |  69 ++----
 .../dropbox/DropboxMetadataCollector.java          |  68 ++----
 .../mft/transport/ftp/FTPMetadataCollector.java    | 101 ++++-----
 .../mft/transport/gcp/GCSMetadataCollector.java    |  88 +++-----
 .../transport/local/LocalMetadataCollector.java    |  60 +-----
 .../transport/odata/ODataIncomingConnector.java    |  29 +--
 .../transport/odata/ODataMetadataCollector.java    |  60 +++---
 .../mft/transport/s3/S3IncomingConnector.java      |  38 ++--
 .../mft/transport/s3/S3MetadataCollector.java      |  95 ++++----
 .../mft/transport/s3/S3OutgoingConnector.java      |  54 +++--
 .../transport/s3/S3OutgoingStreamingConnector.java |  42 ++--
 .../mft/transport/scp/SCPIncomingConnector.java    |  51 +----
 .../mft/transport/scp/SCPMetadataCollector.java    | 239 +++++----------------
 .../mft/transport/scp/SCPOutgoingConnector.java    |  50 +----
 .../transport/swift/SwiftIncomingConnector.java    |  32 ++-
 .../transport/swift/SwiftMetadataCollector.java    |  73 +++----
 .../transport/swift/SwiftOutgoingConnector.java    |  32 ++-
 58 files changed, 795 insertions(+), 1663 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
index 49b3ddf..e1ee354 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
@@ -20,6 +20,8 @@ package org.apache.airavata.mft.agent;
 import org.apache.airavata.mft.admin.MFTConsulClient;
 import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.agent.rpc.RPCParser;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -32,6 +34,12 @@ public class AppConfig {
     @org.springframework.beans.factory.annotation.Value("${consul.port}")
     Integer consulPort;
 
+    @org.springframework.beans.factory.annotation.Value("${resource.service.host}")
+    private String resourceServiceHost;
+
+    @org.springframework.beans.factory.annotation.Value("${resource.service.port}")
+    private int resourceServicePort;
+
     @Bean
     public MFTConsulClient mftConsulClient() {
         return new MFTConsulClient(consulHost, consulPort);
@@ -46,4 +54,10 @@ public class AppConfig {
     public HttpTransferRequestsStore transferRequestStore() {
         return new HttpTransferRequestsStore();
     }
+
+    @Bean
+    public StorageServiceClient storageServiceClient() {
+        return StorageServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+    }
+
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index a817f91..2bc5ff9 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -39,6 +39,9 @@ import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
 import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,6 +140,9 @@ public class MFTAgent implements CommandLineRunner {
     @Autowired
     private HttpTransferRequestsStore transferRequestsStore;
 
+    @Autowired
+    private StorageServiceClient storageServiceClient;
+
     private final AtomicLong totalRunningTransfers = new AtomicLong(0);
     private final AtomicLong totalPendingTransfers = new AtomicLong(0);
 
@@ -190,17 +196,22 @@ public class MFTAgent implements CommandLineRunner {
                     .setPublisher(agentId)
                     .setDescription("Starting the transfer"));
 
-            Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
+            StorageTypeResolveResponse sourceStorageType = storageServiceClient.common()
+                    .resolveStorageType(StorageTypeResolveRequest.newBuilder()
+                    .setStorageId(request.getSourceStorageId()).build());
+            Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver
+                    .resolveMetadataCollector(sourceStorageType.getStorageType());
             MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
             srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
-            Optional<MetadataCollector> dstMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getDestinationType());
-            MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
-            dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+            StorageTypeResolveResponse destStorageType = storageServiceClient.common()
+                    .resolveStorageType(StorageTypeResolveRequest.newBuilder()
+                    .setStorageId(request.getSourceStorageId()).build());
 
             FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
                     request.getMftAuthorizationToken(),
-                    request.getSourceResourceId(),
+                    request.getSourcePath(),
+                    request.getSourceStorageId(),
                     request.getSourceToken());
 
 
@@ -211,7 +222,9 @@ public class MFTAgent implements CommandLineRunner {
                     .withSecretServiceHost(secretServiceHost)
                     .withSecretServicePort(secretServicePort)
                     .withTransferId(transferId)
-                    .withResourceId(request.getSourceResourceId())
+                    .withStorageId(request.getSourceStorageId())
+                    .withResourcePath(request.getSourcePath())
+                    .withStorageType(sourceStorageType.getStorageType())
                     .withCredentialToken(request.getSourceToken())
                     .withMetadata(srcMetadata).build();
 
@@ -222,7 +235,9 @@ public class MFTAgent implements CommandLineRunner {
                     .withSecretServiceHost(secretServiceHost)
                     .withSecretServicePort(secretServicePort)
                     .withTransferId(transferId)
-                    .withResourceId(request.getDestinationResourceId())
+                    .withStorageId(request.getDestinationStorageId())
+                    .withResourcePath(request.getDestinationPath())
+                    .withStorageType(destStorageType.getStorageType())
                     .withCredentialToken(request.getDestinationToken())
                     .withMetadata(srcMetadata).build();
 
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index e7480c5..bc524fc 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -75,14 +75,14 @@ public class TransportMediator {
             logger.info("Stating transfer {}", transferId);
 
             Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
-                    .resolveIncomingStreamingConnector(request.getSourceType());
+                    .resolveIncomingStreamingConnector(srcCC.getStorageType());
             Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
-                    .resolveOutgoingStreamingConnector(request.getDestinationType());
+                    .resolveOutgoingStreamingConnector(dstCC.getStorageType());
 
             Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
-                    .resolveIncomingChunkedConnector(request.getSourceType());
+                    .resolveIncomingChunkedConnector(srcCC.getStorageType());
             Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
-                    .resolveOutgoingChunkedConnector(request.getDestinationType());
+                    .resolveOutgoingChunkedConnector(dstCC.getStorageType());
 
 
 
@@ -109,10 +109,10 @@ public class TransportMediator {
                 int chunkIdx = 0;
 
                 IncomingChunkedConnector inConnector = inChunkedConnectorOp
-                        .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));
+                        .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + srcCC.getStorageType()));
 
                 OutgoingChunkedConnector outConnector = outChunkedConnectorOp
-                        .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));
+                        .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + dstCC.getStorageType()));
 
                 inConnector.init(srcCC);
                 outConnector.init(dstCC);
@@ -152,20 +152,18 @@ public class TransportMediator {
 
                 logger.info("Starting streaming transfer for transfer {}", transferId);
                 IncomingStreamingConnector inConnector = inStreamingConnectorOp
-                        .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));
+                        .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + srcCC.getStorageType()));
 
                 OutgoingStreamingConnector outConnector = outStreamingConnectorOp
-                        .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));
+                        .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + dstCC.getStorageType()));
 
                 inConnector.init(srcCC);
                 outConnector.init(dstCC);
 
                 try {
-                    String srcChild = request.getSourceChildResourcePath();
-                    String dstChild = request.getDestinationChildResourcePath();
 
-                    InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
-                    OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+                    InputStream inputStream = inConnector.fetchInputStream();
+                    OutputStream outputStream = outConnector.fetchOutputStream();
 
                     long count = 0;
                     final AtomicLong countAtomic = new AtomicLong();
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
index db7a444..19df97e 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -25,7 +25,6 @@ public class AgentHttpDownloadData {
     private IncomingStreamingConnector incomingStreamingConnector;
     private IncomingChunkedConnector incomingChunkedConnector;
     private ConnectorConfig connectorConfig;
-    private String childResourcePath;
     private long createdTime = System.currentTimeMillis();
 
     public IncomingStreamingConnector getIncomingStreamingConnector() {
@@ -52,14 +51,6 @@ public class AgentHttpDownloadData {
         this.connectorConfig = connectorConfig;
     }
 
-    public String getChildResourcePath() {
-        return childResourcePath;
-    }
-
-    public void setChildResourcePath(String childResourcePath) {
-        this.childResourcePath = childResourcePath;
-    }
-
     public long getCreatedTime() {
         return createdTime;
     }
@@ -73,7 +64,6 @@ public class AgentHttpDownloadData {
         private IncomingStreamingConnector incomingStreamingConnector;
         private IncomingChunkedConnector incomingChunkedConnector;
         private ConnectorConfig connectorConfig;
-        private String childResourcePath;
         private long createdTime = System.currentTimeMillis();
 
         private AgentHttpDownloadDataBuilder() {
@@ -98,11 +88,6 @@ public class AgentHttpDownloadData {
             return this;
         }
 
-        public AgentHttpDownloadDataBuilder withChildResourcePath(String childResourcePath) {
-            this.childResourcePath = childResourcePath;
-            return this;
-        }
-
         public AgentHttpDownloadDataBuilder withCreatedTime(long createdTime) {
             this.createdTime = createdTime;
             return this;
@@ -114,7 +99,6 @@ public class AgentHttpDownloadData {
             agentHttpDownloadData.setIncomingStreamingConnector(incomingStreamingConnector);
             agentHttpDownloadData.setIncomingChunkedConnector(incomingChunkedConnector);
             agentHttpDownloadData.setConnectorConfig(connectorConfig);
-            agentHttpDownloadData.setChildResourcePath(childResourcePath);
             agentHttpDownloadData.setCreatedTime(createdTime);
             return agentHttpDownloadData;
         }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 655eb68..2bc173f 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -92,9 +92,7 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
 
             IncomingStreamingConnector incomingStreamingConnector = downloadData.getIncomingStreamingConnector();
             incomingStreamingConnector.init(downloadData.getConnectorConfig());
-            InputStream inputStream = downloadData.getChildResourcePath().equals("")?
-                    incomingStreamingConnector.fetchInputStream() :
-                    incomingStreamingConnector.fetchInputStream(downloadData.getChildResourcePath());
+            InputStream inputStream = incomingStreamingConnector.fetchInputStream();
 
             sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
                     ctx.newProgressivePromise());
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index 03466d4..f41ecdb 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -32,6 +32,9 @@ import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
 import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -60,6 +63,9 @@ public class RPCParser {
     @Autowired
     private HttpTransferRequestsStore httpTransferRequestsStore;
 
+    @Autowired
+    private StorageServiceClient storageServiceClient;
+
     public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
         // TODO implement using the reflection
         ObjectMapper mapper = new ObjectMapper();
@@ -67,96 +73,67 @@ public class RPCParser {
 
         switch (request.getMethod()) {
             case "getFileResourceMetadata":
-                String resourceId = request.getParameters().get("resourceId");
-                String resourceType = request.getParameters().get("resourceType");
+                String resourcePath = request.getParameters().get("resourcePath");
+                String storageId = request.getParameters().get("storageId");
                 String resourceToken = request.getParameters().get("resourceToken");
 
                 AuthToken.Builder tokenBuilder = AuthToken.newBuilder();
                 JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
                 AuthToken mftAuthorizationToken = tokenBuilder.build();
 
-                Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
-                if (metadataCollectorOp.isPresent()) {
-                    MetadataCollector metadataCollector = metadataCollectorOp.get();
-                    metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-                    FileResourceMetadata fileResourceMetadata = metadataCollector
-                            .getFileResourceMetadata(mftAuthorizationToken, resourceId, resourceToken);
-                    return mapper.writeValueAsString(fileResourceMetadata);
-                }
-                break;
+                StorageTypeResolveResponse storageType = storageServiceClient.common()
+                        .resolveStorageType(StorageTypeResolveRequest.newBuilder().setStorageId(storageId).build());
 
-            case "getChildFileResourceMetadata":
-                resourceId = request.getParameters().get("resourceId");
-                resourceType = request.getParameters().get("resourceType");
-                resourceToken = request.getParameters().get("resourceToken");
-                String childPath = request.getParameters().get("childPath");
+                Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver
+                        .resolveMetadataCollector(storageType.getStorageType());
 
-                tokenBuilder = AuthToken.newBuilder();
-                JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
-                mftAuthorizationToken = tokenBuilder.build();
-
-                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
                 if (metadataCollectorOp.isPresent()) {
                     MetadataCollector metadataCollector = metadataCollectorOp.get();
                     metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
                     FileResourceMetadata fileResourceMetadata = metadataCollector
-                            .getFileResourceMetadata(mftAuthorizationToken, resourceId, childPath, resourceToken);
+                            .getFileResourceMetadata(mftAuthorizationToken, resourcePath, storageId, resourceToken);
                     return mapper.writeValueAsString(fileResourceMetadata);
                 }
                 break;
 
+
             case "getDirectoryResourceMetadata":
-                resourceId = request.getParameters().get("resourceId");
-                resourceType = request.getParameters().get("resourceType");
+                resourcePath = request.getParameters().get("resourcePath");
+                storageId = request.getParameters().get("storageId");
                 resourceToken = request.getParameters().get("resourceToken");
 
                 tokenBuilder = AuthToken.newBuilder();
                 JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
                 mftAuthorizationToken = tokenBuilder.build();
 
-                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
-                if (metadataCollectorOp.isPresent()) {
-                    MetadataCollector metadataCollector = metadataCollectorOp.get();
-                    metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-                    DirectoryResourceMetadata dirResourceMetadata = metadataCollector
-                            .getDirectoryResourceMetadata(mftAuthorizationToken, resourceId, resourceToken);
-                    return mapper.writeValueAsString(dirResourceMetadata);
-                }
-                break;
+                storageType = storageServiceClient.common()
+                        .resolveStorageType(StorageTypeResolveRequest.newBuilder().setStorageId(storageId).build());
 
-            case "getChildDirectoryResourceMetadata":
-                resourceId = request.getParameters().get("resourceId");
-                resourceType = request.getParameters().get("resourceType");
-                resourceToken = request.getParameters().get("resourceToken");
-                childPath = request.getParameters().get("childPath");
-
-                tokenBuilder = AuthToken.newBuilder();
-                JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
-                mftAuthorizationToken = tokenBuilder.build();
-
-                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
+                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storageType.getStorageType());
                 if (metadataCollectorOp.isPresent()) {
                     MetadataCollector metadataCollector = metadataCollectorOp.get();
                     metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
                     DirectoryResourceMetadata dirResourceMetadata = metadataCollector
-                            .getDirectoryResourceMetadata(mftAuthorizationToken, resourceId, childPath, resourceToken);
+                            .getDirectoryResourceMetadata(mftAuthorizationToken, resourcePath, storageId, resourceToken);
                     return mapper.writeValueAsString(dirResourceMetadata);
                 }
                 break;
 
             case "submitHttpDownload":
-                resourceId = request.getParameters().get("resourceId");
-                String childResourcePath = request.getParameters().get("childResourcePath");
+                resourcePath = request.getParameters().get("resourcePath");
+                String sourceStorageId = request.getParameters().get("sourceStorageId");
                 String sourceToken = request.getParameters().get("sourceToken");
-                String storeType = request.getParameters().get("storeType");
 
                 tokenBuilder = AuthToken.newBuilder();
                 JsonFormat.parser().merge(request.getParameters().get("mftAuthorizationToken"), tokenBuilder);
                 mftAuthorizationToken = tokenBuilder.build();
 
-                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
-                Optional<IncomingStreamingConnector> connectorStreamingOp = ConnectorResolver.resolveIncomingStreamingConnector(storeType);
-                Optional<IncomingChunkedConnector> connectorChunkedOp = ConnectorResolver.resolveIncomingChunkedConnector(storeType);
+                storageType = storageServiceClient.common()
+                        .resolveStorageType(StorageTypeResolveRequest.newBuilder().setStorageId(sourceStorageId).build());
+
+                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storageType.getStorageType());
+                Optional<IncomingStreamingConnector> connectorStreamingOp = ConnectorResolver.resolveIncomingStreamingConnector(storageType.getStorageType());
+                Optional<IncomingChunkedConnector> connectorChunkedOp = ConnectorResolver.resolveIncomingChunkedConnector(storageType.getStorageType());
 
                 if (metadataCollectorOp.isPresent() && (connectorStreamingOp.isPresent() || connectorChunkedOp.isPresent())) {
 
@@ -165,18 +142,19 @@ public class RPCParser {
 
                     FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(
                             mftAuthorizationToken,
-                            resourceId,
-                            childResourcePath,
+                            resourcePath,
+                            sourceStorageId,
                             sourceToken);
 
                     AgentHttpDownloadData.AgentHttpDownloadDataBuilder agentHttpDownloadDataBuilder = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
-                            .withChildResourcePath(childResourcePath)
                             .withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
                                     .withResourceServiceHost(resourceServiceHost)
                                     .withResourceServicePort(resourceServicePort)
                                     .withSecretServiceHost(secretServiceHost)
                                     .withSecretServicePort(secretServicePort)
-                                    .withResourceId(resourceId)
+                                    .withStorageId(sourceStorageId)
+                                    .withStorageType(storageType.getStorageType())
+                                    .withResourcePath(resourcePath)
                                     .withCredentialToken(sourceToken)
                                     .withAuthToken(mftAuthorizationToken)
                                     .withMetadata(fileResourceMetadata).build());
@@ -190,8 +168,8 @@ public class RPCParser {
 
                     return (agentAdvertisedUrl.endsWith("/")? agentAdvertisedUrl : agentAdvertisedUrl + "/") + url;
                 } else {
-                    logger.error("Medata collector or connector is not available for store type {}", storeType);
-                    throw new Exception("Medata collector or connector is not available for store type " + storeType);
+                    logger.error("Medata collector or connector is not available for store type {}", storageType.getStorageType());
+                    throw new Exception("Medata collector or connector is not available for store type " + storageType.getStorageType());
                 }
         }
         logger.error("Unknown method type specified {}", request.getMethod());
diff --git a/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java b/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
deleted file mode 100644
index 52c7b60..0000000
--- a/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.airavata.mft.api.client.examples;
-
-import org.apache.airavata.mft.api.client.MFTApiClient;
-import org.apache.airavata.mft.api.service.MFTTransferServiceGrpc;
-import org.apache.airavata.mft.api.service.ResourceAvailabilityRequest;
-
-public class Example {
-
-    public static void main(String a[]) {
-        MFTTransferServiceGrpc.MFTTransferServiceBlockingStub mftClient =  MFTApiClient.MFTApiClientBuilder
-                .newBuilder().build().getTransferClient();
-        mftClient.getResourceAvailability(ResourceAvailabilityRequest.newBuilder()
-                .setResourceId("a")
-                .setResourceToken("b")
-                .setResourceType("SCP")
-                .setResourceBackend("AIRAVATA")
-                .setResourceCredentialBackend("AIRAVATA").build());
-        System.out.println("Hooooo");
-    }
-}
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 7e5d525..08eabab 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -102,7 +102,7 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
         try {
             // TODO : Automatically derive agent if the target agent is empty
 
-            logger.info("Processing submit http download for resource {}", request.getSourceResourceId());
+            logger.info("Processing submit http download for resource path {}", request.getResourcePath());
 
             String targetAgent = derriveTargetAgent(request.getTargetAgent());
 
@@ -110,10 +110,9 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
                     .withAgentId(targetAgent)
                     .withMessageId(UUID.randomUUID().toString())
                     .withMethod("submitHttpDownload")
-                    .withParameter("resourceId", request.getSourceResourceId())
-                    .withParameter("childResourcePath", request.getSourceResourceChildPath())
+                    .withParameter("resourcePath", request.getResourcePath())
+                    .withParameter("sourceStorageId", request.getSourceStorageId())
                     .withParameter("sourceToken", request.getSourceToken())
-                    .withParameter("storeType", request.getSourceType())
                     .withParameter("mftAuthorizationToken", JsonFormat.printer().print(request.getMftAuthorizationToken()));
 
             SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
@@ -128,8 +127,8 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
                     responseObserver.onCompleted();
                     return;
                 case FAIL:
-                    logger.error("Errored while processing the download request to resource {}. Error msg : {}",
-                            request.getSourceResourceId(), rpcResponse.getErrorAsStr());
+                    logger.error("Errored while processing the download request to resource path {}. Error msg : {}",
+                            request.getResourcePath(), rpcResponse.getErrorAsStr());
 
                     responseObserver.onError(Status.INTERNAL
                             .withDescription("Errored while processing the the fetch file metadata response. Error msg : " +
@@ -138,8 +137,8 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
             }
 
         } catch (Exception e) {
-            logger.error("Error while submitting http download request to resource {}",
-                                                request.getSourceResourceId() , e);
+            logger.error("Error while submitting http download request to resource path {}",
+                                                request.getResourcePath() , e);
             responseObserver.onError(Status.INTERNAL
                     .withDescription("Failed to submit http download request. " + e.getMessage())
                     .asException());
@@ -187,27 +186,6 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
         }
     }
 
-    @Override
-    public void getResourceAvailability(ResourceAvailabilityRequest request, StreamObserver<ResourceAvailabilityResponse> responseObserver) {
-        try {
-            Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getResourceType());
-            MetadataCollector metadataCollector = metadataCollectorOp.orElseThrow(
-                    () -> new Exception("Could not find a metadata collector for resource " + request.getResourceId()));
-
-            metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-            Boolean available = metadataCollector.isAvailable(request.getMftAuthorizationToken(),
-                    request.getResourceId(), request.getResourceToken());
-            responseObserver.onNext(ResourceAvailabilityResponse.newBuilder().setAvailable(available).build());
-            responseObserver.onCompleted();
-
-        } catch (Exception e) {
-            logger.error("Error while checking availability of resource " + request.getResourceId(), e);
-            responseObserver.onError(Status.INTERNAL
-                    .withDescription("Failed to check the availability. " + e.getMessage())
-                    .asException());
-        }
-    }
-
     /**
      * Fetches metadata for a specified file resource.  This has 2 modes
      * 1. Fetch the metadata of the exact file pointed in the resourceId. This assumes resourceId is an id of a file resource
@@ -219,25 +197,19 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
 
         try {
 
-            logger.info("Calling get file resource metadata for resource {}", request.getResourceId());
+            logger.info("Calling get file resource metadata for resource path {}", request.getResourcePath());
 
             String targetAgent = derriveTargetAgent(request.getTargetAgentId());
 
             SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
                     .withAgentId(targetAgent)
                     .withMessageId(UUID.randomUUID().toString())
-                    .withParameter("resourceId", request.getResourceId())
-                    .withParameter("resourceType", request.getResourceType())
+                    .withParameter("resourcePath", request.getResourcePath())
+                    .withParameter("storageId", request.getStorageId())
                     .withParameter("resourceToken", request.getResourceToken())
                     .withParameter("mftAuthorizationToken", JsonFormat.printer().print(request.getMftAuthorizationToken()));
 
-            if (request.getChildPath().isEmpty()) {
-                requestBuilder.withMethod("getFileResourceMetadata");
-            } else {
-                // If a childPath is specified, look for child directories in the given parent resource id
-                requestBuilder.withMethod("getChildFileResourceMetadata");
-                requestBuilder.withParameter("childPath", request.getChildPath());
-            }
+            requestBuilder.withMethod("getFileResourceMetadata");
 
             SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
 
@@ -250,15 +222,15 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
                     responseObserver.onCompleted();
                     return;
                 case FAIL:
-                    logger.error("Errored while processing the fetch file metadata response for resource id {}. Error msg : {}",
-                            request.getResourceId(), rpcResponse.getErrorAsStr());
+                    logger.error("Errored while processing the fetch file metadata response for resource path {}. Error msg : {}",
+                            request.getResourcePath(), rpcResponse.getErrorAsStr());
                     responseObserver.onError(Status.INTERNAL
                             .withDescription("Errored while processing the the fetch file metadata response. Error msg : " +
                                     rpcResponse.getErrorAsStr())
                             .asException());
             }
         } catch (Exception e) {
-            logger.error("Error while fetching resource metadata for file resource " + request.getResourceId(), e);
+            logger.error("Error while fetching resource metadata for file resource " + request.getResourcePath(), e);
             responseObserver.onError(Status.INTERNAL
                     .withDescription("Failed to fetch file resource metadata. " + e.getMessage())
                     .asException());
@@ -276,24 +248,18 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
     public void getDirectoryResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<DirectoryMetadataResponse> responseObserver) {
         try {
 
-            logger.info("Calling get directory metadata for resource {}", request.getResourceId());
+            logger.info("Calling get directory metadata for resource path {}", request.getResourcePath());
             String targetAgent = derriveTargetAgent(request.getTargetAgentId());
 
             SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
                     .withAgentId(targetAgent)
                     .withMessageId(UUID.randomUUID().toString())
-                    .withParameter("resourceId", request.getResourceId())
-                    .withParameter("resourceType", request.getResourceType())
+                    .withParameter("resourcePath", request.getResourcePath())
+                    .withParameter("storageId", request.getStorageId())
                     .withParameter("resourceToken", request.getResourceToken())
                     .withParameter("mftAuthorizationToken", JsonFormat.printer().print(request.getMftAuthorizationToken()));
 
-            if (request.getChildPath().isEmpty()) {
-                requestBuilder.withMethod("getDirectoryResourceMetadata");
-            } else {
-                // If a childPath is specified, look for child directories in the given parent resource id
-                requestBuilder.withMethod("getChildDirectoryResourceMetadata");
-                requestBuilder.withParameter("childPath", request.getChildPath());
-            }
+            requestBuilder.withMethod("getDirectoryResourceMetadata");
 
             SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
 
@@ -320,15 +286,15 @@ public class MFTApiHandler extends MFTTransferServiceGrpc.MFTTransferServiceImpl
                     responseObserver.onCompleted();
                     return;
                 case FAIL:
-                    logger.error("Errored while processing the fetch directory metadata response for resource id {}. Error msg : {}",
-                            request.getResourceId(), rpcResponse.getErrorAsStr());
+                    logger.error("Errored while processing the fetch directory metadata response for resource path {}. Error msg : {}",
+                            request.getResourcePath(), rpcResponse.getErrorAsStr());
                     responseObserver.onError(Status.INTERNAL
                             .withDescription("Errored while processing the the fetch directory metadata response. Error msg : " +
                                     rpcResponse.getErrorAsStr())
                             .asException());
             }
         } catch (Exception e) {
-            logger.error("Error while fetching directory resource metadata for resource " + request.getResourceId(), e);
+            logger.error("Error while fetching directory resource metadata for resource path {}", request.getResourcePath(), e);
             responseObserver.onError(Status.INTERNAL
                     .withDescription("Failed to fetch directory resource metadata. " + e.getMessage())
                     .asException());
diff --git a/api/stub/src/main/proto/MFTTransferApi.proto b/api/stub/src/main/proto/MFTTransferApi.proto
index 3a3ec44..f62f3a2 100644
--- a/api/stub/src/main/proto/MFTTransferApi.proto
+++ b/api/stub/src/main/proto/MFTTransferApi.proto
@@ -15,18 +15,16 @@ message CallbackEndpoint {
 }
 
 message TransferApiRequest {
-    string sourceResourceId = 1;
-    string sourceChildResourcePath = 2;
-    string sourceType = 3;
-    string sourceToken = 4;
-    string destinationResourceId = 5;
-    string destinationChildResourcePath = 6;
-    string destinationType = 7;
-    string destinationToken = 8;
-    bool affinityTransfer = 9;
-    map<string, int32> targetAgents = 10;
-    org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 11;
-    repeated CallbackEndpoint callbackEndpoints = 12;
+    string sourcePath = 1;
+    string sourceStorageId = 2;
+    string sourceToken = 3;
+    string destinationPath = 4;
+    string destinationStorageId = 5;
+    string destinationToken = 6;
+    bool affinityTransfer = 7;
+    map<string, int32> targetAgents = 8;
+    org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 9;
+    repeated CallbackEndpoint callbackEndpoints = 10;
 }
 
 message TransferApiResponse {
@@ -34,10 +32,9 @@ message TransferApiResponse {
 }
 
 message HttpUploadApiRequest {
-    string destinationResourceId = 1;
-    string destinationResourceChildPath = 2;
+    string destinationStorageId = 1;
+    string resourcePath = 2;
     string destinationToken = 3;
-    string destinationType = 4;
     string targetAgent = 5;
     org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 6;
 }
@@ -48,10 +45,9 @@ message HttpUploadApiResponse {
 }
 
 message HttpDownloadApiRequest {
-    string sourceResourceId = 1;
-    string sourceResourceChildPath = 2;
+    string resourcePath = 1;
+    string sourceStorageId = 2;
     string sourceToken = 3;
-    string sourceType = 4;
     string targetAgent = 5;
     org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 6;
 }
@@ -74,12 +70,9 @@ message TransferStateApiResponse {
 }
 
 message ResourceAvailabilityRequest {
-    string resourceId = 1;
-    string childResourcePath = 2;
-    string resourceType = 3;
+    string storageId = 1;
+    string resourcePath = 2;
     string resourceToken = 4;
-    string resourceBackend = 5;
-    string resourceCredentialBackend = 6;
     org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 7;
 }
 
@@ -111,14 +104,10 @@ message DirectoryMetadataResponse {
 }
 
 message FetchResourceMetadataRequest {
-    string resourceId = 1;
-    string childResourcePath = 2;
-    string resourceType = 3;
+    string resourcePath = 1;
     string resourceToken = 4;
-    string resourceBackend = 5;
-    string resourceCredentialBackend = 6;
+    string storageId = 5;
     string targetAgentId = 7;
-    string childPath = 8; // if the child entities of the parent resource are required, set this field
     org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 9;
 }
 
diff --git a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/odata/ODataRemoteAddSubCommand.java b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/odata/ODataRemoteAddSubCommand.java
index 226d194..acdbc41 100644
--- a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/odata/ODataRemoteAddSubCommand.java
+++ b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/odata/ODataRemoteAddSubCommand.java
@@ -33,16 +33,16 @@ import java.util.concurrent.Callable;
 @CommandLine.Command(name = "add")
 public class ODataRemoteAddSubCommand implements Callable<Integer> {
 
-    @CommandLine.Option(names = {"-n", "--name"}, description = "Storage Name")
+    @CommandLine.Option(names = {"-n", "--name"}, description = "Storage Name", required = true)
     private String remoteName;
 
-    @CommandLine.Option(names = {"-U", "--url"}, description = "Base URL for OData Endpoint")
+    @CommandLine.Option(names = {"-U", "--url"}, description = "Base URL for OData Endpoint", required = true)
     private String baseURL;
 
-    @CommandLine.Option(names = {"-u", "--user"}, description = "User Name")
+    @CommandLine.Option(names = {"-u", "--user"}, description = "User Name", required = true)
     private String userName;
 
-    @CommandLine.Option(names = {"-p", "--password"}, description = "Password")
+    @CommandLine.Option(names = {"-p", "--password"}, description = "Password", required = true)
     private String password;
 
 
diff --git a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/S3SubCommand.java b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/S3SubCommand.java
index d774c30..5340b81 100644
--- a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/S3SubCommand.java
+++ b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/S3SubCommand.java
@@ -1,11 +1,10 @@
 package org.apache.airavata.mft.command.line.sub.s3;
 
-import org.apache.airavata.mft.command.line.sub.s3.secret.S3SecretSubCommand;
 import org.apache.airavata.mft.command.line.sub.s3.storage.S3StorageSubCommand;
 import picocli.CommandLine;
 
 @CommandLine.Command(name = "s3", description = "Manage S3 resources and credentials",
-        subcommands = {S3StorageSubCommand.class, S3SecretSubCommand.class})
+        subcommands = {S3StorageSubCommand.class})
 public class S3SubCommand {
 
 
diff --git a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/secret/S3SecretSubCommand.java b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/secret/S3SecretSubCommand.java
deleted file mode 100644
index 175ff76..0000000
--- a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/secret/S3SecretSubCommand.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.airavata.mft.command.line.sub.s3.secret;
-
-import picocli.CommandLine;
-
-@CommandLine.Command(name = "secret")
-public class S3SecretSubCommand {
-    @CommandLine.Command(name = "add")
-    void addS3Secret() {
-        System.out.println("Adding S3 Secret");
-    }
-
-    @CommandLine.Command(name = "delete")
-    void deleteS3Secret(@CommandLine.Parameters(index = "0") String secretId) {
-        System.out.println("Deleting S3 Secret " + secretId);
-    }
-
-    @CommandLine.Command(name = "list")
-    void listS3Secret() {
-        System.out.println("Listing S3 Resource");
-    }
-
-    @CommandLine.Command(name = "get")
-    void getS3Secret(@CommandLine.Parameters(index = "0") String secretId) {
-        System.out.println("Getting S3 Secret " + secretId);
-    }
-}
-
diff --git a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/storage/S3StorageAddSubCommand.java b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/storage/S3StorageAddSubCommand.java
index e3e24f5..7dcf52c 100644
--- a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/storage/S3StorageAddSubCommand.java
+++ b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/s3/storage/S3StorageAddSubCommand.java
@@ -17,22 +17,22 @@ import java.util.concurrent.Callable;
 @CommandLine.Command(name = "add")
 public class S3StorageAddSubCommand implements Callable<Integer> {
 
-    @CommandLine.Option(names = {"-n", "--name"}, description = "Storage Name")
+    @CommandLine.Option(names = {"-n", "--name"}, description = "Storage Name", required = true)
     private String remoteName;
 
-    @CommandLine.Option(names = {"-b", "--bucket"}, description = "Bucket Name")
+    @CommandLine.Option(names = {"-b", "--bucket"}, description = "Bucket Name", required = true)
     private String bucket;
 
-    @CommandLine.Option(names = {"-r", "--region"}, description = "Region")
+    @CommandLine.Option(names = {"-r", "--region"}, description = "Region", required = true)
     private String region;
 
-    @CommandLine.Option(names = {"-e", "--endpoint"}, description = "S3 API Endpoint")
+    @CommandLine.Option(names = {"-e", "--endpoint"}, description = "S3 API Endpoint. For AWS S3 use https://s3.<REGION>.amazonaws.com", required = true)
     private String endpoint;
 
-    @CommandLine.Option(names = {"-k", "--key"}, description = "Access Key")
+    @CommandLine.Option(names = {"-k", "--key"}, description = "Access Key", required = true)
     private String accessKey;
 
-    @CommandLine.Option(names = {"-s", "--secret"}, description = "Access Secret")
+    @CommandLine.Option(names = {"-s", "--secret"}, description = "Access Secret", required = true)
     private String accessSecret;
 
     @CommandLine.Option(names = {"-t", "--token"}, description = "Session Token", defaultValue = "")
diff --git a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/swift/SwiftAddSubCommand.java b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/swift/SwiftAddSubCommand.java
index 689ab6a..30dbb7f 100644
--- a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/swift/SwiftAddSubCommand.java
+++ b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/swift/SwiftAddSubCommand.java
@@ -17,31 +17,31 @@ import java.util.concurrent.Callable;
 @CommandLine.Command(name = "add")
 public class SwiftAddSubCommand implements Callable<Integer> {
 
-    @CommandLine.Option(names = {"-n", "--name"}, description = "Storage Name")
+    @CommandLine.Option(names = {"-n", "--name"}, description = "Storage Name", required = true)
     private String remoteName;
 
-    @CommandLine.Option(names = {"-c", "--container"}, description = "Swift Container Name")
+    @CommandLine.Option(names = {"-c", "--container"}, description = "Swift Container Name", required = true)
     private String container;
 
-    @CommandLine.Option(names = {"-e", "--endpoint"}, description = "Endpoint Name")
+    @CommandLine.Option(names = {"-e", "--endpoint"}, description = "Endpoint Name", required = true)
     private String endpoint;
 
-    @CommandLine.Option(names = {"-r", "--region"}, description = "Region")
+    @CommandLine.Option(names = {"-r", "--region"}, description = "Region", required = true)
     private String region;
 
-    @CommandLine.Option(names = {"-v", "--keystoneversion"}, description = "Keystone Version")
+    @CommandLine.Option(names = {"-v", "--keystoneversion"}, description = "Keystone Version", required = true)
     private int keystoneVersion;
 
-    @CommandLine.Option(names = {"-u", "--user"}, description = "User Name (Password Credentials")
+    @CommandLine.Option(names = {"-u", "--user"}, description = "User Name (Password Credentials", required = true)
     private String userName;
 
-    @CommandLine.Option(names = {"-p", "--password"}, description = "Password (Password Credentials")
+    @CommandLine.Option(names = {"-p", "--password"}, description = "Password (Password Credentials", required = true)
     private String password;
 
-    @CommandLine.Option(names = {"-pid", "--projectId"}, description = "Project Id (Password Credentials")
+    @CommandLine.Option(names = {"-pid", "--projectId"}, description = "Project Id (Password Credentials", required = true)
     private String projectId;
 
-    @CommandLine.Option(names = {"-d", "--domainId"}, description = "Domain Id (Password Credentials")
+    @CommandLine.Option(names = {"-d", "--domainId"}, description = "Domain Id (Password Credentials", required = true)
     private String domainId;
 
 
diff --git a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
index db0cf54..7ed9764 100644
--- a/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
+++ b/command-line/src/main/java/org/apache/airavata/mft/command/line/sub/transfer/SubmitTransferSubCommand.java
@@ -16,25 +16,18 @@ import java.util.concurrent.Callable;
 @CommandLine.Command(name = "submit", description = "Submit a data transfer job")
 public class SubmitTransferSubCommand implements Callable<Integer> {
 
-    @CommandLine.Option(names = {"-st", "--source-type"}, description = "S3, SCP, LOCAL, FTP ...")
-    private String sourceType;
-
-    @CommandLine.Option(names = {"-dt", "--destination-type"}, description = "S3, SCP, LOCAL, FTP ...")
-    private String destinationType;
-
-    @CommandLine.Option(names = {"-s", "--source"}, description = "Source Storage Id")
+    @CommandLine.Option(names = {"-s", "--source"}, description = "Source Storage Id", required = true)
     private String sourceStorageId;
 
-    @CommandLine.Option(names = {"-d", "--destination"}, description = "Destination Storage Id")
+    @CommandLine.Option(names = {"-d", "--destination"}, description = "Destination Storage Id", required = true)
     private String destinationStorageId;
 
-    @CommandLine.Option(names = {"-sp", "--source-path"}, description = "Source Path")
+    @CommandLine.Option(names = {"-sp", "--source-path"}, description = "Source Path", required = true)
     private String sourcePath;
 
-    @CommandLine.Option(names = {"-dp", "--destination-path"}, description = "Destination Path")
+    @CommandLine.Option(names = {"-dp", "--destination-path"}, description = "Destination Path", required = true)
     private String destinationPath;
 
-
     @Override
     public Integer call() throws Exception {
         System.out.println("Transferring data from " + sourceStorageId + " to " + destinationStorageId);
@@ -51,25 +44,13 @@ public class SubmitTransferSubCommand implements Callable<Integer> {
                 .searchStorageSecret(StorageSecretSearchRequest.newBuilder()
                         .setAuthzToken(token).setStorageId(destinationStorageId).build());
 
-        GenericResource sourceResource = mftApiClient.getResourceClient().createGenericResource(GenericResourceCreateRequest.newBuilder()
-                .setAuthzToken(token)
-                .setFile(FileResource.newBuilder().setResourcePath(sourcePath).build())
-                .setStorageId(sourceStorageId)
-                .setStorageType(GenericResourceCreateRequest.StorageType.valueOf(sourceType)).build());
-
-        GenericResource destResource = mftApiClient.getResourceClient().createGenericResource(GenericResourceCreateRequest.newBuilder()
-                .setAuthzToken(token)
-                .setFile(FileResource.newBuilder().setResourcePath(destinationPath).build())
-                .setStorageId(destinationStorageId)
-                .setStorageType(GenericResourceCreateRequest.StorageType.valueOf(destinationType)).build());
-
         TransferApiResponse transferResp = mftApiClient.getTransferClient().submitTransfer(TransferApiRequest.newBuilder()
                 .setSourceToken(sourceSecret.getStorageSecret().getSecretId())
                 .setDestinationToken(destSecret.getStorageSecret().getSecretId())
-                .setDestinationResourceId(destResource.getResourceId())
-                .setSourceResourceId(sourceResource.getResourceId())
-                .setSourceType(sourceType)
-                .setDestinationType(destinationType).build());
+                .setDestinationStorageId(destinationStorageId)
+                .setDestinationPath(destinationPath)
+                .setSourceStorageId(sourceStorageId)
+                .setSourcePath(sourcePath).build());
 
         System.out.println("Submitted Transfer " + transferResp.getTransferId());
         return 0;
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
index bc754e8..bab0166 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -8,11 +8,13 @@ public class ConnectorConfig {
     private int resourceServicePort;
     private String secretServiceHost;
     private int secretServicePort;
-    private String resourceId;
+    private String storageId;
     private String credentialToken;
     private AuthToken authToken;
     private String transferId;
     private FileResourceMetadata metadata;
+    private String resourcePath;
+    private String storageType;
 
     public String getResourceServiceHost() {
         return resourceServiceHost;
@@ -46,12 +48,20 @@ public class ConnectorConfig {
         this.secretServicePort = secretServicePort;
     }
 
-    public String getResourceId() {
-        return resourceId;
+    public String getStorageId() {
+        return storageId;
     }
 
-    public void setResourceId(String resourceId) {
-        this.resourceId = resourceId;
+    public void setStorageId(String storageId) {
+        this.storageId = storageId;
+    }
+
+    public String getResourcePath() {
+        return resourcePath;
+    }
+
+    public void setResourcePath(String resourcePath) {
+        this.resourcePath = resourcePath;
     }
 
     public String getCredentialToken() {
@@ -86,17 +96,26 @@ public class ConnectorConfig {
         this.metadata = metadata;
     }
 
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
 
     public static final class ConnectorConfigBuilder {
         private String resourceServiceHost;
         private int resourceServicePort;
         private String secretServiceHost;
         private int secretServicePort;
-        private String resourceId;
+        private String stroageId;
         private String credentialToken;
         private AuthToken authToken;
         private String transferId;
+        private String resourcePath;
         private FileResourceMetadata metadata;
+        private String storageType;
 
         private ConnectorConfigBuilder() {
         }
@@ -125,8 +144,18 @@ public class ConnectorConfig {
             return this;
         }
 
-        public ConnectorConfigBuilder withResourceId(String resourceId) {
-            this.resourceId = resourceId;
+        public ConnectorConfigBuilder withStorageId(String storageId) {
+            this.stroageId = storageId;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withResourcePath(String resourcePath) {
+            this.resourcePath = resourcePath;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withStorageType(String storageType) {
+            this.storageType = storageType;
             return this;
         }
 
@@ -156,7 +185,8 @@ public class ConnectorConfig {
             connectorConfig.setResourceServicePort(resourceServicePort);
             connectorConfig.setSecretServiceHost(secretServiceHost);
             connectorConfig.setSecretServicePort(secretServicePort);
-            connectorConfig.setResourceId(resourceId);
+            connectorConfig.setStorageId(stroageId);
+            connectorConfig.setStorageType(storageType);
             connectorConfig.setCredentialToken(credentialToken);
             connectorConfig.setAuthToken(authToken);
             connectorConfig.setTransferId(transferId);
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
index 3bc650a..3baad37 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -21,5 +21,4 @@ import java.io.InputStream;
 
 public interface IncomingStreamingConnector extends BasicConnector {
     public InputStream fetchInputStream() throws Exception;
-    public InputStream fetchInputStream(String childPath) throws Exception;
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java b/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
index 183fcfe..3f2b592 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
@@ -38,75 +38,38 @@ public interface MetadataCollector {
      *
      *
      * @param authZToken
-     * @param resourceId id of the resource
+     * @param resourcePath path of the resource
+     * @param storageId Storage id
      * @param credentialToken credential token for the resource
      * @return an object of {@link FileResourceMetadata}
      * @throws Exception if the resource id is not a File Resource type or the resource can't be fetched from the resource service
      */
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception;
-
-    /*
-     * Fetches a metadata of given File Resource inside a registered directory resource. Target file might be living in
-     * multiple level below the parent directory
-     *
-     * @param parentResourceId parent directory resource id
-     * @param resourcePath     path of the target resource. This should be a child path of the parent resource
-     * @param credentialToken  credential token for the resource
-     * @return an object of {@link FileResourceMetadata}
-     * @throws Exception if the parent resource is not a Directory resource or the target resource is not a File Resource type
-     *                   or the resource can't be fetched from the resource service
-     */
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception;
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception;
 
     /**
      * Fetches a metadata of given Directory Resource
      *
      *
      * @param authZToken
-     * @param resourceId      id of the resource
+     * @param resourcePath path of the resource
+     * @param storageId Storage id
      * @param credentialToken credential token for the resource
      * @return an object of {@link DirectoryResourceMetadata}
      * @throws Exception if the resource id is not a Directory Resource type or the resource can't be fetched from the resource service
      */
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception;
-
-    /**
-     * Fetches a metadata of given Directory Resource inside a registered directory resource. Target directory might be living in
-     * multiple level below the parent directory
-     *
-     *
-     * @param authZToken
-     * @param parentResourceId parent directory resource id
-     * @param resourcePath     path of the target resource. This should be a child path of the parent resource
-     * @param credentialToken  credential token for the resource
-     * @return an object of {@link DirectoryResourceMetadata}
-     * @throws Exception if the parent resource is not a Directory resource or the target resource is not a Directory Resource type
-     *                   or the resource can't be fetched from the resource service
-     */
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception;
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception;
 
     /**
      * Check whether the resource is available in the actual storage
      *
      *
      * @param authZToken
-     * @param resourceId      id of the resource
+     * @param resourcePath      path of the resource
+     * @param storageId Storage id
      * @param credentialToken credential token for the resource
      * @return true of the resource is available false otherwise
      * @throws Exception if the resource details can not be fetched from the resource service
      */
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception;
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception;
 
-    /**
-     * Check whether the resource is available in the actual storage
-     *
-     *
-     * @param authToken
-     * @param parentResourceId id of the storage
-     * @param resourcePath resource path
-     * @param credentialToken credential token for the resource
-     * @return true of the resource is available false otherwise
-     * @throws Exception if the resource details can not be fetched from the resource service
-     */
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception;
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
index 1e2633d..d5db03d 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
@@ -21,5 +21,4 @@ import java.io.OutputStream;
 
 public interface OutgoingStreamingConnector extends BasicConnector {
     public OutputStream fetchOutputStream() throws Exception;
-    public OutputStream fetchOutputStream(String childPath) throws Exception;
 }
diff --git a/examples/pom.xml b/examples/pom.xml
deleted file mode 100644
index 28cc03a..0000000
--- a/examples/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>airavata-mft</artifactId>
-        <groupId>org.apache.airavata</groupId>
-        <version>0.01-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>mft-examples</artifactId>
-
-
-    <dependencies>
-        <dependency>
-            <artifactId>mft-api-client</artifactId>
-            <groupId>org.apache.airavata</groupId>
-            <version>0.01-SNAPSHOT</version>
-        </dependency>
-    </dependencies>
-</project>
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
deleted file mode 100644
index 77b3cb6..0000000
--- a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.examples.http;
-
-import org.apache.airavata.mft.api.client.MFTApiClient;
-import org.apache.airavata.mft.api.service.HttpDownloadApiRequest;
-import org.apache.airavata.mft.api.service.HttpDownloadApiResponse;
-import org.apache.airavata.mft.api.service.MFTTransferServiceGrpc;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.common.UserTokenAuth;
-
-public class DownloadExample {
-    public static void main(String args[]) {
-        AuthToken mftAuthorizationToken = AuthToken.newBuilder().setUserTokenAuth(UserTokenAuth.newBuilder().setToken("43ff79ac-e4f2-473c-9ea1-04eee9509a53").build()).build();
-
-        MFTTransferServiceGrpc.MFTTransferServiceBlockingStub mftClient = MFTApiClient.MFTApiClientBuilder
-                .newBuilder().build().getTransferClient();
-        HttpDownloadApiResponse httpDownloadApiResponse = mftClient.submitHttpDownload(HttpDownloadApiRequest.newBuilder()
-                .setTargetAgent("agent0")
-                .setSourceResourceId("remote-ssh-resource")
-                .setSourceToken("local-ssh-cred")
-                .setSourceType("SCP")
-                .setMftAuthorizationToken(mftAuthorizationToken)
-                .build());
-
-        System.out.println(httpDownloadApiResponse);
-    }
-}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
deleted file mode 100644
index f90a938..0000000
--- a/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.airavata.mft.examples.metadata;
-
-import org.apache.airavata.mft.api.client.MFTApiClient;
-import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
-import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
-import org.apache.airavata.mft.api.service.FileMetadataResponse;
-import org.apache.airavata.mft.api.service.MFTTransferServiceGrpc;
-
-public class SCPExample {
-    public static void main(String args[]) throws Exception {
-        MFTTransferServiceGrpc.MFTTransferServiceBlockingStub client = MFTApiClient.MFTApiClientBuilder
-                .newBuilder().build().getTransferClient();
-
-        // File metadata
-        long startTime = System.currentTimeMillis();
-        FileMetadataResponse fileResourceMetadata = client.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
-                .setResourceId("remote-ssh-resource2")
-                .setResourceType("SCP")
-                .setResourceToken("local-ssh-cred")
-                .setTargetAgentId("agent0")
-                .build());
-        long endTime = System.currentTimeMillis();
-        System.out.println("File metadata response ");
-        System.out.println(fileResourceMetadata);
-        System.out.println("Time for processing : " + (endTime - startTime) + " ms");
-
-        // Directory metadata
-        startTime = System.currentTimeMillis();
-        DirectoryMetadataResponse directoryMetadataResponse = client.getDirectoryResourceMetadata(FetchResourceMetadataRequest.newBuilder()
-                .setResourceId("remote-ssh-dir-resource")
-                .setResourceType("SCP")
-                .setResourceToken("local-ssh-cred")
-                .setTargetAgentId("agent0")
-                .build());
-        endTime = System.currentTimeMillis();
-
-        System.out.println("Directory metadata response ");
-        System.out.println(directoryMetadataResponse);
-        System.out.println("Time for processing : " + (endTime - startTime) + " ms");
-
-        // Child file inside parent directoru
-        startTime = System.currentTimeMillis();
-        fileResourceMetadata = client.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
-                .setResourceId("remote-ssh-dir-resource") // Parent directory resource id
-                .setResourceType("SCP")
-                .setResourceToken("local-ssh-cred")
-                .setTargetAgentId("agent0")
-                .setChildPath("/tmp/10mb.txt")  // Child file path
-                .build());
-        endTime = System.currentTimeMillis();
-        System.out.println("Child file metadata response ");
-        System.out.println(fileResourceMetadata);
-        System.out.println("Time for processing : " + (endTime - startTime) + " ms");
-
-        // Child directory inside parent directoru
-        startTime = System.currentTimeMillis();
-        directoryMetadataResponse = client.getDirectoryResourceMetadata(FetchResourceMetadataRequest.newBuilder()
-                .setResourceId("remote-ssh-dir-resource") // Parent directory resource id
-                .setResourceType("SCP")
-                .setResourceToken("local-ssh-cred")
-                .setTargetAgentId("agent0")
-                .setChildPath("/tmp/hsperfdata_root") // Child directory path
-                .build());
-        endTime = System.currentTimeMillis();
-
-        System.out.println("Child directory metadata response ");
-        System.out.println(directoryMetadataResponse);
-        System.out.println("Time for processing : " + (endTime - startTime) + " ms");
-
-    }
-}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
deleted file mode 100644
index 4289c58..0000000
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.examples.transfer;
-
-import org.apache.airavata.mft.api.client.MFTApiClient;
-import org.apache.airavata.mft.api.service.*;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.common.UserTokenAuth;
-
-import java.util.Iterator;
-
-public class LocalExample {
-    public static void main(String args[]) throws Exception {
-        MFTTransferServiceGrpc.MFTTransferServiceBlockingStub client = MFTApiClient.MFTApiClientBuilder
-                .newBuilder().build().getTransferClient();
-
-        String sourceResourceId = "remote-ssh-resource";
-        String sourceToken = "local-ssh-cred";
-        String destResource = "local-resource-1";
-        String destToken = "";
-        AuthToken mftAuthorizationToken = AuthToken.newBuilder().setUserTokenAuth(UserTokenAuth.newBuilder().setToken("43ff79ac-e4f2-473c-9ea1-04eee9509a53").build()).build();
-
-        TransferApiRequest request = TransferApiRequest.newBuilder()
-                .setMftAuthorizationToken(mftAuthorizationToken)
-                .setSourceResourceId(sourceResourceId)
-                .setSourceToken(sourceToken)
-                .setSourceType("SCP")
-                .setDestinationResourceId(destResource)
-                .setDestinationToken(destToken)
-                .setDestinationType("LOCAL")
-                .setAffinityTransfer(false).build();
-
-        TransferApiResponse transferApiResponse = client.submitTransfer(request);
-        while(true) {
-
-            try {
-                Iterator<TransferStateApiResponse> transferStates = client.getTransferStates(TransferStateApiRequest.newBuilder().setTransferId(transferApiResponse.getTransferId()).build());
-                System.out.println("Got " + transferStates.next().getState());
-                TransferStateApiResponse transferState = client.getTransferState(TransferStateApiRequest.newBuilder().setTransferId(transferApiResponse.getTransferId()).build());
-                System.out.println("State " + transferState.getState());
-                if ("COMPLETED".equals(transferState.getState()) || "FAILED".equals(transferState.getState())) {
-                    break;
-                }
-
-            } catch (Exception e) {
-                System.out.println(e.getMessage());
-            }
-            Thread.sleep(1000);
-        }
-    }
-}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
deleted file mode 100644
index 63cd455..0000000
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.examples.transfer;
-
-import org.apache.airavata.mft.api.client.MFTApiClient;
-import org.apache.airavata.mft.api.service.*;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.common.UserTokenAuth;
-
-import java.util.Iterator;
-
-public class S3Example {
-    public static void main(String args[]) throws Exception {
-        MFTTransferServiceGrpc.MFTTransferServiceBlockingStub client = MFTApiClient.MFTApiClientBuilder
-                .newBuilder().build().getTransferClient();
-
-        String sourceResourceId = "remote-ssh-storage";
-        String sourceResourcePath = "/tmp/1mb.txt";
-        String sourceToken = "ssh-cred";
-        String destResourceId = "s3-storage-1";
-        String destResourcePath = "1mb-copy.txt";
-        String destToken = "s3-cred";
-        AuthToken mftAuthorizationToken = AuthToken.newBuilder().setUserTokenAuth(UserTokenAuth.newBuilder().setToken("43ff79ac-e4f2-473c-9ea1-04eee9509a53").build()).build();
-
-        TransferApiRequest request = TransferApiRequest.newBuilder()
-                .setMftAuthorizationToken(mftAuthorizationToken)
-                .setSourceResourceId(sourceResourceId)
-                .setSourceToken(sourceToken)
-                .setSourceType("SCP")
-                .setDestinationResourceId(destResourceId)
-                .setDestinationToken(destToken)
-                .setDestinationType("S3")
-                .setAffinityTransfer(false).build();
-
-        TransferApiResponse transferApiResponse = client.submitTransfer(request);
-        while(true) {
-
-            try {
-                Iterator<TransferStateApiResponse> transferStates = client.getTransferStates(TransferStateApiRequest.newBuilder().setTransferId(transferApiResponse.getTransferId()).build());
-                System.out.println("Got " + transferStates.next().getState());
-                TransferStateApiResponse transferState = client.getTransferState(TransferStateApiRequest.newBuilder().setTransferId(transferApiResponse.getTransferId()).build());
-                System.out.println("State " + transferState.getState());
-                if ("COMPLETED".equals(transferState.getState()) || "FAILED".equals(transferState.getState())) {
-                    break;
-                }
-
-            } catch (Exception e) {
-                System.out.println(e.getMessage());
-            }
-            Thread.sleep(1000);
-        }
-    }
-}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
deleted file mode 100644
index 59591e3..0000000
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.airavata.mft.examples.transfer;
-
-import org.apache.airavata.mft.api.client.MFTApiClient;
-import org.apache.airavata.mft.api.service.*;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.common.UserTokenAuth;
-
-import java.util.Iterator;
-
-public class SCPExample {
-    public static void main(String args[]) throws Exception {
-        MFTTransferServiceGrpc.MFTTransferServiceBlockingStub client = MFTApiClient.MFTApiClientBuilder
-                .newBuilder().build().getTransferClient();
-
-        String sourceResourceId = "remote-ssh-resource-1";
-        String sourceResourcePath = "/tmp/1mb.txt";
-        String sourceToken = "ssh-cred-1";
-        String destResourceId = "remote-ssh-resource-2";
-        String destToken = "ssh-cred-2";
-        AuthToken mftAuthorizationToken = AuthToken.newBuilder().setUserTokenAuth(UserTokenAuth.newBuilder().setToken("43ff79ac-e4f2-473c-9ea1-04eee9509a53").build()).build();
-
-
-        TransferApiRequest request = TransferApiRequest.newBuilder()
-                .setMftAuthorizationToken(mftAuthorizationToken)
-                .setSourceResourceId(sourceResourceId)
-                .setSourceToken(sourceToken)
-                .setSourceType("SCP")
-                .setDestinationResourceId(destResourceId)
-                .setDestinationToken(destToken)
-                .setDestinationType("SCP")
-                .setAffinityTransfer(false).build();
-
-        TransferApiResponse transferApiResponse = client.submitTransfer(request);
-        while(true) {
-
-            try {
-                Iterator<TransferStateApiResponse> transferStates = client.getTransferStates(TransferStateApiRequest.newBuilder().setTransferId(transferApiResponse.getTransferId()).build());
-                System.out.println("Got " + transferStates.next().getState());
-                TransferStateApiResponse transferState = client.getTransferState(TransferStateApiRequest.newBuilder().setTransferId(transferApiResponse.getTransferId()).build());
-                System.out.println("State " + transferState.getState());
-                if ("COMPLETED".equals(transferState.getState()) || "FAILED".equals(transferState.getState())) {
-                    break;
-                }
-
-            } catch (Exception e) {
-                System.out.println(e.getMessage());
-            }
-            Thread.sleep(1000);
-        }
-    }
-}
diff --git a/examples/src/main/python/.gitignore b/examples/src/main/python/.gitignore
deleted file mode 100644
index 6e2313d..0000000
--- a/examples/src/main/python/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-venv
-MFTApi_pb2.py
-MFTApi_pb2_grpc.py
-google/*
\ No newline at end of file
diff --git a/examples/src/main/python/directory_browse.py b/examples/src/main/python/directory_browse.py
deleted file mode 100644
index d4a648a..0000000
--- a/examples/src/main/python/directory_browse.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import grpc
-import MFTApi_pb2
-import MFTApi_pb2_grpc
-
-channel = grpc.insecure_channel('localhost:7004')
-stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
-
-request = MFTApi_pb2.FetchResourceMetadataRequest(resourceId= "remote-ssh-dir-resource",
-                                        resourceType = "SCP",
-                                        resourceToken = "local-ssh-cred",
-                                        resourceBackend = "FILE",
-                                        resourceCredentialBackend= "FILE",
-                                        targetAgentId = "agent0",
-                                        childPath= "",
-                                        mftAuthorizationToken = "user token")
-
-response = stub.getDirectoryResourceMetadata(request)
-print(response)
\ No newline at end of file
diff --git a/examples/src/main/python/download_http.py b/examples/src/main/python/download_http.py
deleted file mode 100644
index 15af21b..0000000
--- a/examples/src/main/python/download_http.py
+++ /dev/null
@@ -1,19 +0,0 @@
-import grpc
-import MFTApi_pb2
-import MFTApi_pb2_grpc
-
-channel = grpc.insecure_channel('localhost:7004')
-stub = MFTApi_pb2_grpc.MFTApiServiceStub(channel)
-download_request = MFTApi_pb2.HttpDownloadApiRequest(sourceStoreId ="remote-ssh-storage",
-                                  sourcePath= "/tmp/a.txt",
-                                  sourceToken = "local-ssh-cred",
-                                  sourceType= "SCP",
-                                  targetAgent = "agent0",
-                                  mftAuthorizationToken = "")
-
-result = stub.submitHttpDownload(download_request)
-print(result)
-
-## Sample output ##
-# url: "http://localhost:3333/53937f40-d545-4180-967c-ddb193d672d8"
-# targetAgent: "agent0"
\ No newline at end of file
diff --git a/examples/src/main/python/readme.md b/examples/src/main/python/readme.md
deleted file mode 100644
index 4c9a972..0000000
--- a/examples/src/main/python/readme.md
+++ /dev/null
@@ -1,11 +0,0 @@
-python3 -m venv venv
-source venv/bin/activate
-pip install --upgrade pip
-pip install -r requirements.txt
-
-mkdir -p google/api    
-curl https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/annotations.proto > google/api/annotations.proto     
-curl https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/http.proto > google/api/http.proto
-
-python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. --proto_path=../../../../common/mft-common-proto/src/main/proto/ CredCommon.proto
-python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. --proto_path=../../../../api/stub/src/main/proto/ --proto_path=../../../../common/mft-common-proto/src/main/proto/ MFTApi.proto
diff --git a/examples/src/main/python/requirements.txt b/examples/src/main/python/requirements.txt
deleted file mode 100644
index 3128456..0000000
--- a/examples/src/main/python/requirements.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-grpcio==1.34.1
-google-api-python-client==1.12.8
-grpcio-tools==1.34.1
\ No newline at end of file
diff --git a/examples/src/main/resources/logback.xml b/examples/src/main/resources/logback.xml
deleted file mode 100644
index a1e67fe..0000000
--- a/examples/src/main/resources/logback.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-
-    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
-        </encoder>
-    </appender>
-
-    <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <File>../logs/airavata.log</File>
-        <Append>true</Append>
-        <encoder>
-            <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern>
-        </encoder>
-        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern>
-            <maxHistory>30</maxHistory>
-            <totalSizeCap>1GB</totalSizeCap>
-        </rollingPolicy>
-    </appender>
-
-    <logger name="ch.qos.logback" level="WARN"/>
-    <logger name="org.apache.helix" level="WARN"/>
-    <logger name="org.apache.zookeeper" level="ERROR"/>
-    <logger name="org.apache.airavata" level="INFO"/>
-    <logger name="org.hibernate" level="ERROR"/>
-    <logger name="net.schmizz.sshj" level="WARN"/>
-    <root level="INFO">
-        <appender-ref ref="CONSOLE"/>
-        <appender-ref ref="LOGFILE"/>
-    </root>
-</configuration>
diff --git a/pom.xml b/pom.xml
index b225416..30ae04f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,6 @@
         <module>transport</module>
         <module>agent</module>
         <module>controller</module>
-        <module>examples</module>
         <module>command-line</module>
     </modules>
 
diff --git a/services/resource-service/client/src/main/java/org/apache/airavata/mft/resource/client/StorageServiceClient.java b/services/resource-service/client/src/main/java/org/apache/airavata/mft/resource/client/StorageServiceClient.java
index 2e31c9d..c98422b 100644
--- a/services/resource-service/client/src/main/java/org/apache/airavata/mft/resource/client/StorageServiceClient.java
+++ b/services/resource-service/client/src/main/java/org/apache/airavata/mft/resource/client/StorageServiceClient.java
@@ -11,6 +11,7 @@ import org.apache.airavata.mft.resource.service.odata.ODataStorageServiceGrpc;
 import org.apache.airavata.mft.resource.service.s3.S3StorageServiceGrpc;
 import org.apache.airavata.mft.resource.service.scp.SCPStorageServiceGrpc;
 import org.apache.airavata.mft.resource.service.swift.SwiftStorageServiceGrpc;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageCommonServiceGrpc;
 import org.apache.airavata.mft.storage.stubs.storagesecret.StorageSecretServiceGrpc;
 
 import java.io.Closeable;
@@ -68,6 +69,9 @@ public class StorageServiceClient implements Closeable {
         return ODataStorageServiceGrpc.newBlockingStub(channel);
     }
 
+    public StorageCommonServiceGrpc.StorageCommonServiceBlockingStub common() {
+        return StorageCommonServiceGrpc.newBlockingStub(channel);
+    }
     @Override
     public void close() throws IOException {
 
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
index 51a9408..b14a2cf 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
@@ -27,6 +27,8 @@ import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.odata.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
 import org.apache.airavata.mft.resource.stubs.swift.storage.*;
 import org.apache.airavata.mft.storage.stubs.storagesecret.*;
 
@@ -107,4 +109,6 @@ public interface ResourceBackend {
     ODataStorage createODataStorage(ODataStorageCreateRequest request) throws Exception;
     boolean updateODataStorage(ODataStorageUpdateRequest request) throws Exception;
     boolean deleteODataStorage(ODataStorageDeleteRequest request) throws Exception;
+
+    StorageTypeResolveResponse resolveStorageType(StorageTypeResolveRequest request) throws Exception;
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
index 3517d8a..380e25c 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
@@ -28,6 +28,8 @@ import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.odata.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
 import org.apache.airavata.mft.resource.stubs.swift.storage.*;
 import org.apache.airavata.mft.storage.stubs.storagesecret.*;
 import org.json.simple.JSONArray;
@@ -616,4 +618,9 @@ public class FileBasedResourceBackend implements ResourceBackend {
     public boolean deleteODataStorage(ODataStorageDeleteRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
+
+    @Override
+    public StorageTypeResolveResponse resolveStorageType(StorageTypeResolveRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
index 26d0e90..29de806 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
@@ -30,6 +30,8 @@ import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.odata.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
 import org.apache.airavata.mft.resource.stubs.swift.storage.*;
 import org.apache.airavata.mft.storage.stubs.storagesecret.*;
 import org.dozer.DozerBeanMapper;
@@ -70,6 +72,9 @@ public class SQLResourceBackend implements ResourceBackend {
     @Autowired
     private ODataStorageRepository odataStorageRepository;
 
+    @Autowired
+    private ResolveStorageRepository resolveStorageRepository;
+
     private DozerBeanMapper mapper = new DozerBeanMapper();
 
     @Override
@@ -256,6 +261,12 @@ public class SQLResourceBackend implements ResourceBackend {
     @Override
     public SCPStorage createSCPStorage(SCPStorageCreateRequest request) {
         SCPStorageEntity savedEntity = scpStorageRepository.save(mapper.map(request, SCPStorageEntity.class));
+
+        ResolveStorageEntity storageTypeEty = new ResolveStorageEntity();
+        storageTypeEty.setStorageId(savedEntity.getStorageId());
+        storageTypeEty.setStorageType(ResolveStorageEntity.StorageType.SCP);
+        resolveStorageRepository.save(storageTypeEty);
+
         return mapper.map(savedEntity, SCPStorage.newBuilder().getClass()).build();
     }
 
@@ -289,6 +300,12 @@ public class SQLResourceBackend implements ResourceBackend {
     @Override
     public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
         LocalStorageEntity savedEntity = localStorageRepository.save(mapper.map(request, LocalStorageEntity.class));
+
+        ResolveStorageEntity storageTypeEty = new ResolveStorageEntity();
+        storageTypeEty.setStorageId(savedEntity.getStorageId());
+        storageTypeEty.setStorageType(ResolveStorageEntity.StorageType.LOCAL);
+        resolveStorageRepository.save(storageTypeEty);
+
         return mapper.map(savedEntity, LocalStorage.newBuilder().getClass()).build();
     }
 
@@ -322,6 +339,12 @@ public class SQLResourceBackend implements ResourceBackend {
     @Override
     public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
         S3StorageEntity savedEntity = s3StorageRepository.save(mapper.map(request, S3StorageEntity.class));
+
+        ResolveStorageEntity storageTypeEty = new ResolveStorageEntity();
+        storageTypeEty.setStorageId(savedEntity.getStorageId());
+        storageTypeEty.setStorageType(ResolveStorageEntity.StorageType.S3);
+        resolveStorageRepository.save(storageTypeEty);
+
         return mapper.map(savedEntity, S3Storage.newBuilder().getClass()).build();
     }
 
@@ -455,6 +478,12 @@ public class SQLResourceBackend implements ResourceBackend {
     @Override
     public FTPStorage createFTPStorage(FTPStorageCreateRequest request) {
         FTPStorageEntity savedEntity = ftpStorageRepository.save(mapper.map(request, FTPStorageEntity.class));
+
+        ResolveStorageEntity storageTypeEty = new ResolveStorageEntity();
+        storageTypeEty.setStorageId(savedEntity.getStorageId());
+        storageTypeEty.setStorageType(ResolveStorageEntity.StorageType.FTP);
+        resolveStorageRepository.save(storageTypeEty);
+
         return mapper.map(savedEntity, FTPStorage.newBuilder().getClass()).build();
     }
 
@@ -488,6 +517,12 @@ public class SQLResourceBackend implements ResourceBackend {
     @Override
     public SwiftStorage createSwiftStorage(SwiftStorageCreateRequest request) throws Exception {
         SwiftStorageEntity savedEntity = swiftStorageRepository.save(mapper.map(request, SwiftStorageEntity.class));
+
+        ResolveStorageEntity storageTypeEty = new ResolveStorageEntity();
+        storageTypeEty.setStorageId(savedEntity.getStorageId());
+        storageTypeEty.setStorageType(ResolveStorageEntity.StorageType.SWIFT);
+        resolveStorageRepository.save(storageTypeEty);
+
         return mapper.map(savedEntity, SwiftStorage.newBuilder().getClass()).build();
     }
 
@@ -521,6 +556,12 @@ public class SQLResourceBackend implements ResourceBackend {
     @Override
     public ODataStorage createODataStorage(ODataStorageCreateRequest request) throws Exception {
         ODataStorageEntity savedEntity = odataStorageRepository.save(mapper.map(request, ODataStorageEntity.class));
+
+        ResolveStorageEntity storageTypeEty = new ResolveStorageEntity();
+        storageTypeEty.setStorageId(savedEntity.getStorageId());
+        storageTypeEty.setStorageType(ResolveStorageEntity.StorageType.ODATA);
+        resolveStorageRepository.save(storageTypeEty);
+
         return mapper.map(savedEntity, ODataStorage.newBuilder().getClass()).build();
     }
 
@@ -536,4 +577,10 @@ public class SQLResourceBackend implements ResourceBackend {
         resourceRepository.deleteByStorageIdAndStorageType(request.getStorageId(), GenericResourceEntity.StorageType.SWIFT);
         return true;
     }
+
+    @Override
+    public StorageTypeResolveResponse resolveStorageType(StorageTypeResolveRequest request) throws Exception {
+        ResolveStorageEntity resolveStorage = resolveStorageRepository.getByStorageId(request.getStorageId());
+        return StorageTypeResolveResponse.newBuilder().setStorageType(resolveStorage.getStorageType().name()).build();
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/entity/ResolveStorageEntity.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/entity/ResolveStorageEntity.java
new file mode 100644
index 0000000..c4a9a67
--- /dev/null
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/entity/ResolveStorageEntity.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.resource.server.backend.sql.entity;
+
+import org.hibernate.annotations.GenericGenerator;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+
+@Entity
+public class ResolveStorageEntity {
+
+    public enum StorageType {
+        S3, SCP, LOCAL, FTP, BOX, DROPBOX, GCS, AZURE, SWIFT, ODATA;
+    }
+
+    @Id
+    @Column(name = "STORAGE_ID")
+    private String storageId;
+
+    @Column(name = "STORAGE_TYPE")
+    private StorageType storageType;
+
+    public String getStorageId() {
+        return storageId;
+    }
+
+    public void setStorageId(String storageId) {
+        this.storageId = storageId;
+    }
+
+    public StorageType getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(StorageType storageType) {
+        this.storageType = storageType;
+    }
+}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/BoxExample.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/repository/ResolveStorageRepository.java
similarity index 67%
rename from examples/src/main/java/org/apache/airavata/mft/examples/transfer/BoxExample.java
rename to services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/repository/ResolveStorageRepository.java
index 0072e6f..c42ad9a 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/BoxExample.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/repository/ResolveStorageRepository.java
@@ -15,7 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.mft.examples.transfer;
+package org.apache.airavata.mft.resource.server.backend.sql.repository;
 
-public class BoxExample {
+import org.apache.airavata.mft.resource.server.backend.sql.entity.ResolveStorageEntity;
+import org.springframework.data.repository.CrudRepository;
+
+public interface ResolveStorageRepository extends CrudRepository<ResolveStorageEntity, String> {
+    public ResolveStorageEntity getByStorageId(String storageID);
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/StorageCommonServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/StorageCommonServiceHandler.java
new file mode 100644
index 0000000..ee6d479
--- /dev/null
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/StorageCommonServiceHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.resource.server.handler;
+
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageListResponse;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageCommonServiceGrpc;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveRequest;
+import org.apache.airavata.mft.resource.stubs.storage.common.StorageTypeResolveResponse;
+import org.lognet.springboot.grpc.GRpcService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+@GRpcService
+public class StorageCommonServiceHandler extends StorageCommonServiceGrpc.StorageCommonServiceImplBase {
+
+    private static final Logger logger = LoggerFactory.getLogger(StorageCommonServiceHandler.class);
+
+    @Autowired
+    private ResourceBackend backend;
+
+    @Override
+    public void resolveStorageType(StorageTypeResolveRequest request, StreamObserver<StorageTypeResolveResponse> responseObserver) {
+        try {
+            StorageTypeResolveResponse storageTypeResolveResponse = this.backend.resolveStorageType(request);
+            responseObserver.onNext(storageTypeResolveResponse);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in retrieving storage type for storage id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving storage type")
+                    .asRuntimeException());
+        }
+    }
+
+}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/DriveExample.java b/services/resource-service/stub/src/main/proto/common/StorageCommon.proto
similarity index 68%
rename from examples/src/main/java/org/apache/airavata/mft/examples/transfer/DriveExample.java
rename to services/resource-service/stub/src/main/proto/common/StorageCommon.proto
index 526159d..fe5ebfe 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/DriveExample.java
+++ b/services/resource-service/stub/src/main/proto/common/StorageCommon.proto
@@ -15,7 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.airavata.mft.examples.transfer;
+syntax = "proto3";
 
-public class DriveExample {
+option java_multiple_files = true;
+package org.apache.airavata.mft.resource.stubs.storage.common;
+
+message StorageTypeResolveRequest {
+    string storageId = 1;
+}
+
+message StorageTypeResolveResponse {
+    string storageType = 1;
 }
+
+service StorageCommonService {
+    rpc resolveStorageType (StorageTypeResolveRequest) returns (StorageTypeResolveResponse);
+}
\ No newline at end of file
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
index 3b334db..d037f52 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
@@ -31,11 +31,14 @@ import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
 import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
 import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -63,23 +66,33 @@ public class AzureMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                        String storageId, String credentialToken) throws Exception {
         checkInitialized();
 
-        if (!isAvailable(authZToken,resourceId, credentialToken)) {
-            throw new Exception("Azure blob can not find for resource id " + resourceId);
+        if (!isAvailable(authZToken, resourcePath, storageId, credentialToken)) {
+            throw new Exception("Azure blob can not find for resource path " + resourcePath);
         }
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource azureResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        AzureStorage azureStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+            azureStorage = storageServiceClient.azure()
+                    .getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
+
+        AzureSecret azureSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        }
 
         BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
 
-        BlobClient blobClient = blobServiceClient.getBlobContainerClient(azureResource.getAzureStorage().getContainer())
-                                                .getBlobClient(azureResource.getFile().getResourcePath());
+        BlobClient blobClient = blobServiceClient.getBlobContainerClient(azureStorage.getContainer())
+                                                .getBlobClient(resourcePath);
 
         BlobProperties properties = blobClient.getBlockBlobClient().getProperties();
         FileResourceMetadata metadata = new FileResourceMetadata();
@@ -99,58 +112,38 @@ public class AzureMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         checkInitialized();
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource azureResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
+        AzureStorage storage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        return isAvailable(azureResource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource genericResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(parentResourceId).build());
-
-        GenericResource azureResource = GenericResource.newBuilder().setFile(FileResource.newBuilder()
-                .setResourcePath(resourcePath).build()).setAzureStorage(genericResource.getAzureStorage()).build();
-        return isAvailable(azureResource, credentialToken);
-    }
+            storage = storageServiceClient.azure()
+                    .getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
 
-    public Boolean isAvailable(GenericResource azureResource, String credentialToken) throws Exception {
+        AzureSecret azureSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
-        BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(azureResource.getAzureStorage().getContainer());
+        BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(storage.getContainer());
         boolean containerExists = containerClient.exists();
         if (!containerExists) {
             return false;
         }
-        switch (azureResource.getResourceCase().name()){
-            case ResourceTypes.FILE:
-                return containerClient.getBlobClient(azureResource.getFile().getResourcePath()).exists();
-            case ResourceTypes.DIRECTORY:
-                return containerClient.getBlobClient(azureResource.getDirectory().getResourcePath()).exists();
-        }
-        return false;
+        return containerClient.getBlobClient(resourcePath).exists();
     }
 
+
 }
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
index ca57aa6..cde5542 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
@@ -23,15 +23,9 @@ import com.box.sdk.BoxFile;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -59,18 +53,19 @@ public class BoxMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
 
         checkInitialized();
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource boxResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        BoxSecret boxSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         BoxAPIConnection api = new BoxAPIConnection(boxSecret.getAccessToken());
-        BoxFile boxFile = new BoxFile(api, boxResource.getFile().getResourcePath());
+        BoxFile boxFile = new BoxFile(api, resourcePath);
         BoxFile.Info boxFileInfo = boxFile.getInfo();
 
         FileResourceMetadata metadata = new FileResourceMetadata();
@@ -86,57 +81,25 @@ public class BoxMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-
-        checkInitialized();
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource boxResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-        return isAvailable(boxResource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
         checkInitialized();
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource genericResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(parentResourceId).build());
+        BoxSecret boxSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        GenericResource boxResource = GenericResource.newBuilder().setFile(FileResource.newBuilder()
-                .setResourcePath(resourcePath).build()).setBoxStorage(genericResource.getBoxStorage()).build();
-
-        return isAvailable(boxResource, credentialToken);
-    }
-
-    private Boolean isAvailable(GenericResource boxResource, String credentialToken) throws Exception {
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         BoxAPIConnection api = new BoxAPIConnection(boxSecret.getAccessToken());
-
-        BoxFile boxFile;
-        switch (boxResource.getResourceCase().name()){
-            case ResourceTypes.FILE:
-                boxFile = new BoxFile(api, boxResource.getFile().getResourcePath());
-            case ResourceTypes.DIRECTORY:
-                boxFile = new BoxFile(api, boxResource.getDirectory().getResourcePath());
-        }
+        BoxFile boxFile = new BoxFile(api, resourcePath);
+        // TODO Fix this. Need to figur out how to check if the file exists in box
         return true;
     }
 }
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
index 9427cee..fe81f7a 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
@@ -23,15 +23,9 @@ import com.dropbox.core.v2.files.FileMetadata;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -59,19 +53,22 @@ public class DropboxMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId,
+                                                        String credentialToken) throws Exception {
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource dropboxResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        DropboxSecret dropboxSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        }
 
         DbxRequestConfig config = DbxRequestConfig.newBuilder("mftdropbox/v1").build();
         DbxClientV2 dbxClientV2 = new DbxClientV2(config, dropboxSecret.getAccessToken());
 
         FileResourceMetadata metadata = new FileResourceMetadata();
-        FileMetadata fileMetadata = (FileMetadata) dbxClientV2.files().getMetadata(dropboxResource.getFile().getResourcePath());
+        FileMetadata fileMetadata = (FileMetadata) dbxClientV2.files().getMetadata(resourcePath);
         metadata.setResourceSize(fileMetadata.getSize());
         metadata.setMd5sum(null);
         metadata.setUpdateTime(fileMetadata.getServerModified().getTime());
@@ -80,56 +77,25 @@ public class DropboxMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource dropboxResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-
-        return isAvailable(dropboxResource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource genericResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(parentResourceId).build());
 
-        GenericResource dropboxResource = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setDropboxStorage(genericResource.getDropboxStorage()).build();
+        DropboxSecret dropboxSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        return isAvailable(dropboxResource, credentialToken);
-    }
-
-    private Boolean isAvailable(GenericResource dropboxResource, String credentialToken) throws Exception {
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         DbxRequestConfig config = DbxRequestConfig.newBuilder("mftdropbox/v1").build();
         DbxClientV2 dbxClientV2 = new DbxClientV2(config, dropboxSecret.getAccessToken());
 
-        switch (dropboxResource.getResourceCase().name()){
-            case ResourceTypes.FILE:
-                return !dbxClientV2.files().searchV2(dropboxResource.getFile().getResourcePath()).getMatches().isEmpty();
-            case ResourceTypes.DIRECTORY:
-                return !dbxClientV2.files().searchV2(dropboxResource.getDirectory().getResourcePath()).getMatches().isEmpty();
-        }
-        return false;
+        return !dbxClientV2.files().searchV2(resourcePath).getMatches().isEmpty();
     }
 }
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
index e9fbe63..ce1050c 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -20,15 +20,11 @@ package org.apache.airavata.mft.transport.ftp;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
@@ -67,27 +63,37 @@ public class FTPMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                        String storageId, String credentialToken) throws Exception {
 
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource ftpResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        FTPStorage ftpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
+
+            ftpStorage = storageServiceClient.ftp()
+                    .getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
+
+        FTPSecret ftpSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         FileResourceMetadata resourceMetadata = new FileResourceMetadata();
         FTPClient ftpClient = null;
         try {
-            ftpClient = FTPTransportUtil.getFTPClient(ftpResource.getFtpStorage(), ftpSecret);
-            logger.info("Fetching metadata for resource {} in {}", ftpResource.getFile().getResourcePath(), ftpResource.getFtpStorage().getHost());
+            ftpClient = FTPTransportUtil.getFTPClient(ftpStorage, ftpSecret);
+            logger.info("Fetching metadata for resource {} in {}", resourcePath, ftpStorage.getHost());
 
-            FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getFile().getResourcePath());
+            FTPFile ftpFile = ftpClient.mlistFile(resourcePath);
 
             if (ftpFile != null) {
                 resourceMetadata.setResourceSize(ftpFile.getSize());
                 resourceMetadata.setUpdateTime(ftpFile.getTimestamp().getTimeInMillis());
-                if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getFile().getResourcePath()))) {
+                if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + resourcePath))) {
                     String[] replies = ftpClient.getReplyStrings();
                     resourceMetadata.setMd5sum(replies[0]);
                 } else {
@@ -95,7 +101,7 @@ public class FTPMetadataCollector implements MetadataCollector {
                 }
             }
         } catch (Exception e) {
-            logger.warn("Failed to fetch md5 for FTP resource {}", resourceId, e);
+            logger.warn("Failed to fetch md5 for FTP resource path {}", resourcePath, e);
         } finally {
             FTPTransportUtil.disconnectFTP(ftpClient);
         }
@@ -104,62 +110,37 @@ public class FTPMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                                  String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) {
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken)
+            throws Exception{
 
         checkInitialized();
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource ftpResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        FTPStorage ftpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        return isAvailable(ftpResource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-
-        checkInitialized();
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource genericResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                            .setResourceId(parentResourceId).build());
-
-        GenericResource ftpResource = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setFtpStorage(genericResource.getFtpStorage()).build();
-        return isAvailable(ftpResource, credentialToken);
-    }
-
-    public Boolean isAvailable(GenericResource ftpResource, String credentialToken) {
+            ftpStorage = storageServiceClient.ftp()
+                    .getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        FTPSecret ftpSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         FTPClient ftpClient = null;
         try {
-            ftpClient = FTPTransportUtil.getFTPClient(ftpResource.getFtpStorage(), ftpSecret);
-            InputStream inputStream = null;
-
-            switch (ftpResource.getResourceCase().name()) {
-                case ResourceTypes.FILE:
-                    inputStream = ftpClient.retrieveFileStream(ftpResource.getFile().getResourcePath());
-                case ResourceTypes.DIRECTORY:
-                    inputStream = ftpClient.retrieveFileStream(ftpResource.getDirectory().getResourcePath());
-            }
+            ftpClient = FTPTransportUtil.getFTPClient(ftpStorage, ftpSecret);
+            InputStream inputStream = ftpClient.retrieveFileStream(resourcePath);
+
             return !(inputStream == null || ftpClient.getReplyCode() == 550);
         } catch (Exception e) {
             logger.error("FTP client initialization failed ", e);
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
index 157e7e6..1f0e056 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
@@ -28,15 +28,11 @@ import com.google.api.services.storage.model.StorageObject;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
 import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
@@ -72,13 +68,22 @@ public class GCSMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource gcsResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        GCSStorage gcsStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
+
+            gcsStorage = storageServiceClient.gcs()
+                    .getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
+
+        GCSSecret gcsSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
         JsonFactory jsonFactory = new JacksonFactory();
@@ -92,8 +97,8 @@ public class GCSMetadataCollector implements MetadataCollector {
         Storage storage = new Storage.Builder(transport, jsonFactory, credential).build();
 
         FileResourceMetadata metadata = new FileResourceMetadata();
-        StorageObject gcsMetadata = storage.objects().get(gcsResource.getGcsStorage().getBucketName(),
-                                                            gcsResource.getFile().getResourcePath()).execute();
+        StorageObject gcsMetadata = storage.objects().get(gcsStorage.getBucketName(), resourcePath).execute();
+
         metadata.setResourceSize(gcsMetadata.getSize().longValue());
         String md5Sum = String.format("%032x", new BigInteger(1, Base64.getDecoder().decode(gcsMetadata.getMd5Hash())));
         metadata.setMd5sum(md5Sum);
@@ -103,47 +108,28 @@ public class GCSMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                                  String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource gcsResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
 
-        return isAvailable(gcsResource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource genericResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(parentResourceId).build());
-
-        GenericResource gcsResource = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setGcsStorage(genericResource.getGcsStorage()).build();
+        GCSStorage gcsStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        return isAvailable(gcsResource, credentialToken);
-    }
-
-    private Boolean isAvailable(GenericResource gcsResource, String credentialToken) throws Exception {
+            gcsStorage = storageServiceClient.gcs()
+                    .getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        GCSSecret gcsSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
         JsonFactory jsonFactory = new JacksonFactory();
@@ -155,14 +141,6 @@ public class GCSMetadataCollector implements MetadataCollector {
         }
 
         Storage storage = new Storage.Builder(transport, jsonFactory, credential).build();
-        switch (gcsResource.getResourceCase().name()){
-            case ResourceTypes.FILE:
-                return !storage.objects().get(gcsResource.getGcsStorage().getBucketName(), gcsResource.getFile().getResourcePath())
-                        .execute().isEmpty();
-            case ResourceTypes.DIRECTORY:
-                return !storage.objects().get(gcsResource.getGcsStorage().getBucketName(), gcsResource.getDirectory().getResourcePath())
-                        .execute().isEmpty();
-        }
-        return false;
+        return !storage.objects().get(gcsStorage.getBucketName(), resourcePath).execute().isEmpty();
     }
 }
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index 8ea41ac..23aa05c 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -20,15 +20,7 @@ package org.apache.airavata.mft.transport.local;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorage;
-import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorageGetRequest;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -61,21 +53,19 @@ public class LocalMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource localResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-        File resourceFile = new File(localResource.getFile().getResourcePath());
+        File resourceFile = new File(resourcePath);
         if (resourceFile.exists()) {
 
-            BasicFileAttributes basicFileAttributes = Files.readAttributes(Path.of(localResource.getFile().getResourcePath()), BasicFileAttributes.class);
+            BasicFileAttributes basicFileAttributes = Files.readAttributes(Path.of(resourcePath), BasicFileAttributes.class);
             FileResourceMetadata metadata = new FileResourceMetadata();
             metadata.setCreatedTime(basicFileAttributes.creationTime().toMillis());
             metadata.setUpdateTime(basicFileAttributes.lastModifiedTime().toMillis());
             metadata.setResourceSize(basicFileAttributes.size());
 
             MessageDigest digest = MessageDigest.getInstance("MD5");
-            FileInputStream fis = new FileInputStream(localResource.getFile().getResourcePath());
+            FileInputStream fis = new FileInputStream(resourcePath);
             byte[] byteArray = new byte[1024];
             int bytesCount = 0;
             while ((bytesCount = fis.read(byteArray)) != -1) {
@@ -91,50 +81,16 @@ public class LocalMetadataCollector implements MetadataCollector {
 
             return metadata;
         } else {
-            throw new Exception("Resource with id " + resourceId + " in path " + localResource.getFile().getResourcePath() + " does not exist");
+            throw new Exception("Resource with path " + resourcePath +  " does not exist");
         }
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource localResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-        return isAvailable(localResource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource localResource = resourceClient.get()
-                .getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(parentResourceId).build());
-
-        GenericResource localResource2 = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setLocalStorage(localResource.getLocalStorage()).build();
-        return isAvailable(localResource2, credentialToken);
-    }
-
-    public Boolean isAvailable(GenericResource localResource, String credentialToken) throws Exception {
-        switch (localResource.getResourceCase().name()){
-            case ResourceTypes.FILE:
-                return new File(localResource.getFile().getResourcePath()).exists();
-            case ResourceTypes.DIRECTORY:
-                return new File(localResource.getDirectory().getResourcePath()).exists();
-        }
-        return false;
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
+        return new File(resourcePath).exists();
     }
 }
diff --git a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
index b8415be..55e6fb2 100644
--- a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
+++ b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java
@@ -23,9 +23,12 @@ import org.apache.airavata.mft.credential.stubs.odata.ODataSecret;
 import org.apache.airavata.mft.credential.stubs.odata.ODataSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.common.GenericResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.resource.stubs.odata.storage.ODataStorage;
+import org.apache.airavata.mft.resource.stubs.odata.storage.ODataStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.http.HttpEntity;
@@ -49,25 +52,19 @@ public class ODataIncomingConnector implements IncomingStreamingConnector {
     private CloseableHttpResponse response;
     CloseableHttpClient client;
 
-    private GenericResource resource;
+    private String resourcePath;
     private ODataStorage odataStorage;
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
                 .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.ODATASTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+            odataStorage = storageServiceClient.odata()
+                    .getODataStorage(ODataStorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
         }
 
-        odataStorage = resource.getOdataStorage();
+        this.resourcePath = cc.getResourcePath();
 
         try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
                 cc.getSecretServiceHost(), cc.getSecretServicePort())) {
@@ -87,22 +84,16 @@ public class ODataIncomingConnector implements IncomingStreamingConnector {
 
     @Override
     public InputStream fetchInputStream() throws Exception {
-
         HttpGet httpGet = new HttpGet(odataStorage.getBaseUrl() +
-                "/Products('" + resource.getFile().getResourcePath() +"')/$value");
+                "/Products('" + resourcePath +"')/$value");
         response = client.execute(httpGet);
         int statusCode = response.getStatusLine().getStatusCode();
-        logger.info("Received status code {} for resource path {}", statusCode, resource.getFile().getResourcePath());
+        logger.info("Received status code {} for resource path {}", statusCode, resourcePath);
 
         HttpEntity entity = response.getEntity();
         return entity.getContent();
     }
 
-    @Override
-    public InputStream fetchInputStream(String childPath) throws Exception {
-        throw new UnsupportedOperationException("No child path structures available for OData");
-    }
-
     @Override
     public void complete() throws Exception {
         if (response != null) {
diff --git a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataMetadataCollector.java b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataMetadataCollector.java
index 63c8cca..a0deb54 100644
--- a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataMetadataCollector.java
+++ b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataMetadataCollector.java
@@ -23,11 +23,10 @@ import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.odata.ODataSecret;
 import org.apache.airavata.mft.credential.stubs.odata.ODataSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.odata.storage.ODataStorage;
+import org.apache.airavata.mft.resource.stubs.odata.storage.ODataStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.http.HttpEntity;
@@ -80,55 +79,46 @@ public class ODataMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        return findFileResourceMetadata(authZToken, resourceId, credentialToken)
-                .orElseThrow(() -> new Exception("Could not find a file resource entry for resource id " + resourceId));
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId,
+                                                        String credentialToken) throws Exception {
+        return findFileResourceMetadata(authZToken, resourcePath, storageId, credentialToken)
+                .orElseThrow(() -> new Exception("Could not find a file resource entry for resource path " + resourcePath));
     }
 
-    @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("OData does not have hierarchical structures");
-    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                                  String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("OData does not have directory structures");
     }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("OData does not have directory structures");
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
+        return findFileResourceMetadata(authZToken, resourcePath, storageId, credentialToken).isPresent();
     }
 
-    @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        return findFileResourceMetadata(authZToken, resourceId, credentialToken).isPresent();
-    }
+    private Optional<FileResourceMetadata> findFileResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                                    String storageId, String credentialToken) throws Exception {
 
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("OData does not have directory structures");
-    }
-
-    private Optional<FileResourceMetadata> findFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        ODataStorage odataStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        if (resource.getStorageCase() != GenericResource.StorageCase.ODATASTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
+            odataStorage = storageServiceClient.odata()
+                    .getODataStorage(ODataStorageGetRequest.newBuilder().setStorageId(storageId).build());
         }
 
-        ODataStorage odataStorage = resource.getOdataStorage();
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        ODataSecret oDataSecret = secretClient.odata().getODataSecret(
-                ODataSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        ODataSecret oDataSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            oDataSecret = secretClient.odata().getODataSecret(
+                    ODataSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         try (CloseableHttpClient httpClient = getHttpClient(oDataSecret)) {
 
             HttpGet httpGet = new HttpGet(odataStorage.getBaseUrl() +
-                    "/Products('" + resource.getFile().getResourcePath() +"')");
+                    "/Products('" + resourcePath +"')");
 
             try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
                 int statusCode = response.getStatusLine().getStatusCode();
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index 015470d..0d92054 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -18,9 +18,12 @@ import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.common.GenericResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -34,25 +37,20 @@ public class S3IncomingConnector implements IncomingChunkedConnector, IncomingSt
 
     private static final Logger logger = LoggerFactory.getLogger(S3IncomingConnector.class);
 
-    private GenericResource resource;
+    private S3Storage s3Storage;
     private AmazonS3 s3Client;
+    private String resourcePath;
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
                 .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+            s3Storage = storageServiceClient.s3()
+                    .getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
         }
 
-        S3Storage s3Storage = resource.getS3Storage();
+        this.resourcePath = cc.getResourcePath();
 
         S3Secret s3Secret;
 
@@ -83,31 +81,23 @@ public class S3IncomingConnector implements IncomingChunkedConnector, IncomingSt
 
     @Override
     public InputStream fetchInputStream() throws Exception {
-        S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), resource.getFile().getResourcePath());
-        return s3object.getObjectContent();
-    }
-
-    @Override
-    public InputStream fetchInputStream(String childPath) throws Exception {
-        S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), childPath);
+        S3Object s3object = s3Client.getObject(s3Storage.getBucketName(), resourcePath);
         return s3object.getObjectContent();
     }
 
     @Override
     public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
-        GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
-                resource.getFile().getResourcePath());
+        GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Storage.getBucketName(), resourcePath);
         rangeObjectRequest.setRange(startByte, endByte - 1);
         ObjectMetadata objectMetadata = s3Client.getObject(rangeObjectRequest, new File(downloadFile));
-        logger.debug("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
+        logger.debug("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resourcePath);
     }
 
     @Override
     public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception {
-        GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
-                resource.getFile().getResourcePath());
+        GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Storage.getBucketName(), resourcePath);
         rangeObjectRequest.setRange(startByte, endByte - 1);
-        logger.debug("Fetching input stream for chunk {} in resource {}", chunkId, resource.getResourceId());
+        logger.debug("Fetching input stream for chunk {} in resource path {}", chunkId, resourcePath);
         S3Object object = s3Client.getObject(rangeObjectRequest);
         return object.getObjectContent();
     }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 80d946e..71c625d 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -28,15 +28,11 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
 import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
@@ -66,14 +62,23 @@ public class S3MetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
 
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource s3Resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        S3Storage s3Storage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
+
+            s3Storage = storageServiceClient.s3()
+                    .getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
+
+        S3Secret s3Secret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
         AWSCredentials awsCreds;
         if (s3Secret.getSessionToken() == null || s3Secret.getSessionToken().equals("")) {
@@ -86,13 +91,13 @@ public class S3MetadataCollector implements MetadataCollector {
 
         AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
                 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
-                        s3Resource.getS3Storage().getEndpoint(),
-                        s3Resource.getS3Storage().getRegion()))
+                        s3Storage.getEndpoint(),
+                        s3Storage.getRegion()))
                 .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
                 .build();
 
         FileResourceMetadata metadata = new FileResourceMetadata();
-        ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getS3Storage().getBucketName(), s3Resource.getFile().getResourcePath());
+        ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Storage.getBucketName(), resourcePath);
         metadata.setResourceSize(s3Metadata.getContentLength());
         metadata.setMd5sum(s3Metadata.getETag());
         metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
@@ -101,65 +106,39 @@ public class S3MetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, String storageId,
+                                                                  String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");    }
 
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-
-        checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource s3Resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-
-        return isAvailable(s3Resource, credentialToken);
-    }
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
 
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource genericResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(parentResourceId).build());
-
-        GenericResource s3Resource = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setS3Storage(genericResource.getS3Storage()).build();
 
-        return isAvailable(s3Resource, credentialToken);
-    }
-
-    private Boolean isAvailable(GenericResource s3Resource, String credentialToken) throws Exception {
+        S3Storage s3Storage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+            s3Storage = storageServiceClient.s3()
+                    .getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
 
+        S3Secret s3Secret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
         BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
 
         AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
                 .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
-                        s3Resource.getS3Storage().getEndpoint(),
-                        s3Resource.getS3Storage().getRegion()))
+                        s3Storage.getEndpoint(),
+                        s3Storage.getRegion()))
                 .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Resource.getS3Storage().getRegion())
+                .withRegion(s3Storage.getRegion())
                 .build();
 
-        switch (s3Resource.getResourceCase().name()){
-            case ResourceTypes.FILE:
-                return s3Client.doesObjectExist(s3Resource.getS3Storage().getBucketName(), s3Resource.getFile().getResourcePath());
-            case ResourceTypes.DIRECTORY:
-                return s3Client.doesObjectExist(s3Resource.getS3Storage().getBucketName(), s3Resource.getDirectory().getResourcePath());
-        }
-        return false;
+        return s3Client.doesObjectExist(s3Storage.getBucketName(), resourcePath);
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index 01a2298..1b89ca2 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -12,11 +12,10 @@ import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
 import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -24,7 +23,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -33,28 +31,24 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
 
     private static final Logger logger = LoggerFactory.getLogger(S3OutgoingConnector.class);
 
-    private GenericResource resource;
+    private S3Storage s3Storage;
     private AmazonS3 s3Client;
+    private String resourcePath;
 
     InitiateMultipartUploadResult initResponse;
     List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
-                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
+        this.resourcePath = cc.getResourcePath();
 
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
-        }
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-        S3Storage s3Storage = resource.getS3Storage();
+            s3Storage = storageServiceClient.s3()
+                    .getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
+        }
 
         S3Secret s3Secret;
 
@@ -81,19 +75,19 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
                     .build();
         }
 
-        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(resource.getS3Storage().getBucketName(),
-                resource.getFile().getResourcePath());
+        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(s3Storage.getBucketName(),
+                resourcePath);
         initResponse = s3Client.initiateMultipartUpload(initRequest);
         logger.info("Initialized multipart upload for file {} in bucket {}",
-                resource.getFile().getResourcePath(), resource.getS3Storage().getBucketName());
+                resourcePath, s3Storage.getBucketName());
     }
 
     @Override
     public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
         File file = new File(uploadFile);
         UploadPartRequest uploadRequest = new UploadPartRequest()
-                .withBucketName(resource.getS3Storage().getBucketName())
-                .withKey(resource.getFile().getResourcePath())
+                .withBucketName(s3Storage.getBucketName())
+                .withKey(resourcePath)
                 .withUploadId(initResponse.getUploadId())
                 .withPartNumber(chunkId + 1)
                 .withFileOffset(0)
@@ -102,14 +96,14 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
 
         UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
         this.partETags.add(uploadResult.getPartETag());
-        logger.debug("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
+        logger.debug("Uploaded S3 chunk to path {} for resource path {}", uploadFile, resourcePath);
     }
 
     @Override
     public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception {
         UploadPartRequest uploadRequest = new UploadPartRequest()
-                .withBucketName(resource.getS3Storage().getBucketName())
-                .withKey(resource.getFile().getResourcePath())
+                .withBucketName(s3Storage.getBucketName())
+                .withKey(resourcePath)
                 .withUploadId(initResponse.getUploadId())
                 .withPartNumber(chunkId + 1)
                 .withFileOffset(0)
@@ -119,16 +113,16 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector {
         UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
         inputStream.close();
         this.partETags.add(uploadResult.getPartETag());
-        logger.debug("Uploaded S3 chunk {} for resource id {} using stream", chunkId, resource.getResourceId());
+        logger.debug("Uploaded S3 chunk {} for resource path {} using stream", chunkId, resourcePath);
     }
 
     @Override
     public void complete() throws Exception {
-        CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(resource.getS3Storage().getBucketName(),
-                resource.getFile().getResourcePath(), initResponse.getUploadId(), partETags);
+        CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(s3Storage.getBucketName(),
+                resourcePath, initResponse.getUploadId(), partETags);
         s3Client.completeMultipartUpload(compRequest);
-        logger.info("Completing the upload for file {} in bucket {}", resource.getFile().getResourcePath(),
-                resource.getS3Storage().getBucketName());
+        logger.info("Completing the upload for file {} in bucket {}", resourcePath,
+                s3Storage.getBucketName());
     }
 
     @Override
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
index fa20539..34123a3 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java
@@ -27,9 +27,12 @@ import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.common.GenericResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -51,26 +54,22 @@ public class S3OutgoingStreamingConnector implements OutgoingStreamingConnector
 
     private static final Logger logger = LoggerFactory.getLogger(S3OutgoingStreamingConnector.class);
 
-    private GenericResource resource;
     private S3OutputStream s3OutputStream;
     private S3ClientMultipartUpload s3;
+    private String resourcePath;
+    private S3Storage s3Storage;
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
-                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+            s3Storage = storageServiceClient.s3()
+                    .getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
         }
 
-        S3Storage s3Storage = resource.getS3Storage();
+        this.resourcePath = cc.getResourcePath();
 
         S3Secret s3Secret;
 
@@ -118,28 +117,13 @@ public class S3OutgoingStreamingConnector implements OutgoingStreamingConnector
         this.s3OutputStream = S3OutputStream.builder()
                 .s3(s3)
                 .uploadRequest(MultipartUploadRequest.builder()
-                        .bucket(resource.getS3Storage().getBucketName())
-                        .key(resource.getFile().getResourcePath()).build())
+                        .bucket(s3Storage.getBucketName())
+                        .key(resourcePath).build())
                 .autoComplete(false)
                 .build();
 
         logger.info("Initialized multipart upload for file {} in bucket {}",
-                resource.getFile().getResourcePath(), resource.getS3Storage().getBucketName());
-        return this.s3OutputStream;
-    }
-
-    @Override
-    public OutputStream fetchOutputStream(String childPath) throws Exception {
-        this.s3OutputStream = S3OutputStream.builder()
-                .s3(s3)
-                .uploadRequest(MultipartUploadRequest.builder()
-                        .bucket(resource.getS3Storage().getBucketName())
-                        .key(resource.getFile().getResourcePath() + "/" + childPath).build())
-                .autoComplete(false)
-                .build();
-
-        logger.info("Initialized multipart upload for file {} child path {} in bucket {}",
-                resource.getFile().getResourcePath(),  childPath, resource.getS3Storage().getBucketName());
+                resourcePath, s3Storage.getBucketName());
         return this.s3OutputStream;
     }
 }
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
index 1ec6cd1..1013acc 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -26,9 +26,12 @@ import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.common.GenericResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -43,27 +46,24 @@ public final class SCPIncomingConnector implements IncomingStreamingConnector {
     private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
 
     private Session session;
-    private GenericResource resource;
     private Channel channel;
     private OutputStream out;
     private InputStream in;
     private final byte[] buf = new byte[1024];
+    private String resourcePath;
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
 
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+        SCPStorage scpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
                 .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
+            scpStorage = storageServiceClient.scp()
+                    .getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
         }
 
-        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
-        }
+        this.resourcePath = cc.getResourcePath();
 
         SCPSecret scpSecret;
 
@@ -75,7 +75,6 @@ public final class SCPIncomingConnector implements IncomingStreamingConnector {
                     .setSecretId(cc.getCredentialToken()).build());
         }
 
-        SCPStorage scpStorage = resource.getScpStorage();
         logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
 
         this.session = SCPTransportUtil.createSession(
@@ -98,38 +97,6 @@ public final class SCPIncomingConnector implements IncomingStreamingConnector {
 
     @Override
     public InputStream fetchInputStream() throws Exception {
-        String resourcePath = null;
-        switch (resource.getResourceCase()){
-            case FILE:
-                resourcePath = resource.getFile().getResourcePath();
-                break;
-            case DIRECTORY:
-                throw new Exception("A directory path can not be streamed");
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
-        }
-
-        return fetchInputStreamJCraft(escapeSpecialChars(resourcePath));
-    }
-
-    @Override
-    public InputStream fetchInputStream(String childPath) throws Exception {
-
-        String resourcePath = null;
-        switch (resource.getResourceCase()){
-            case FILE:
-                throw new Exception("A child path can not be associated with a file parent");
-            case DIRECTORY:
-                resourcePath = resource.getDirectory().getResourcePath();
-                if (!childPath.startsWith(resourcePath)) {
-                    throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
-                }
-                resourcePath = childPath;
-                break;
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
-        }
-
         return fetchInputStreamJCraft(escapeSpecialChars(resourcePath));
     }
 
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index 882f8d2..47930fc 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -31,16 +31,13 @@ import net.schmizz.sshj.userauth.password.Resource;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.DirectoryResource;
-import org.apache.airavata.mft.resource.stubs.common.FileResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.commons.io.IOUtils;
@@ -77,26 +74,28 @@ public class SCPMetadataCollector implements MetadataCollector {
         }
     }
 
-    private FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, GenericResource scpResource, SCPSecret scpSecret) throws Exception {
-        try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
+    private FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                         SCPStorage scpStorage, SCPSecret scpSecret) throws Exception {
 
-            logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
+        try (SSHClient sshClient = getSSHClient(scpStorage, scpSecret)) {
+
+            logger.info("Fetching metadata for resource {} in {}", resourcePath, scpStorage.getHost());
 
             try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
-                FileAttributes lstat = sftpClient.lstat(scpResource.getFile().getResourcePath());
+                FileAttributes lstat = sftpClient.lstat(resourcePath);
                 sftpClient.close();
 
                 FileResourceMetadata metadata = new FileResourceMetadata();
                 metadata.setResourceSize(lstat.getSize());
                 metadata.setCreatedTime(lstat.getAtime());
                 metadata.setUpdateTime(lstat.getMtime());
-                metadata.setFriendlyName(new File(scpResource.getFile().getResourcePath()).getName());
-                metadata.setResourcePath(scpResource.getFile().getResourcePath());
+                metadata.setFriendlyName(new File(resourcePath).getName());
+                metadata.setResourcePath(resourcePath);
 
                 try {
                     // TODO calculate md5 using the binary based on the OS platform. Eg: MacOS has md5. Linux has md5sum
                     // This only works for linux SCP resources. Improve to work in mac and windows resources
-                    Session.Command md5Command = sshClient.startSession().exec("md5sum " + scpResource.getFile().getResourcePath());
+                    Session.Command md5Command = sshClient.startSession().exec("md5sum " + resourcePath);
                     StringWriter outWriter = new StringWriter();
                     StringWriter errorWriter = new StringWriter();
 
@@ -110,92 +109,44 @@ public class SCPMetadataCollector implements MetadataCollector {
                         logger.warn("MD5 fetch error out {}", errorWriter.toString());
                     }
                 } catch (Exception e) {
-                    logger.warn("Failed to fetch md5 for SCP resource {}", scpResource.getResourceId(), e);
+                    logger.warn("Failed to fetch md5 for SCP resource with path {}", resourcePath, e);
                 }
                 return metadata;
             }
         }
     }
 
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId,
+                                                        String credentialToken) throws Exception {
 
         checkInitialized();
 
-        GenericResource scpResource;
-        SCPSecret scpSecret;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
-            scpResource = resourceClient.get()
-                    .getGenericResource(GenericResourceGetRequest.newBuilder()
-                            .setAuthzToken(authZToken).setResourceId(resourceId).build());
-        }
+        SCPStorage scpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
-            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+            scpStorage = storageServiceClient.scp()
+                    .getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
         }
 
-        return getFileResourceMetadata(authZToken,scpResource, scpSecret);
-    }
-
-    @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String childResourcePath, String credentialToken) throws Exception {
-
-        GenericResource resource;
         SCPSecret scpSecret;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(authZToken)
-                    .setResourceId(parentResourceId).build());
-        }
 
         try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
             scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
                     .setAuthzToken(authZToken).setSecretId(credentialToken).build());
         }
 
-        boolean isChildPath = false;
-        if (childResourcePath != null && !"".equals(childResourcePath)) {
-            isChildPath = true;
-        }
-
-        String resourcePath = null;
-        switch (resource.getResourceCase()) {
-            case FILE:
-                if (isChildPath) {
-                    throw new Exception("A child path can not be associated with a file parent");
-                }
-                resourcePath = resource.getFile().getResourcePath();
-                break;
-            case DIRECTORY:
-                resourcePath = resource.getDirectory().getResourcePath();
-                if (isChildPath) {
-                    if (!childResourcePath.startsWith(resourcePath)) {
-                        throw new Exception("Child path " + childResourcePath + " is not in the parent path " + resourcePath);
-                    }
-                    resourcePath = childResourcePath;
-                }
-
-                break;
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + parentResourceId);
-        }
-
-        GenericResource scpResource2 = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder()
-                        .setResourcePath(resourcePath).build())
-                .setScpStorage(resource.getScpStorage()).build();
-
-        return getFileResourceMetadata(authZToken, scpResource2, scpSecret);
+        return getFileResourceMetadata(authZToken, resourcePath, scpStorage, scpSecret);
     }
 
-    private DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, GenericResource scpResource, SCPSecret scpSecret) throws Exception {
-        try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
+    private DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath, SCPStorage scpStorage, SCPSecret scpSecret) throws Exception {
+        try (SSHClient sshClient = getSSHClient(scpStorage, scpSecret)) {
 
-            logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
+            logger.info("Fetching metadata for resource {} in {}", resourcePath, scpStorage.getHost());
 
             try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
-                List<RemoteResourceInfo> lsOut = sftpClient.ls(scpResource.getDirectory().getResourcePath());
-                FileAttributes lsStat = sftpClient.lstat(scpResource.getDirectory().getResourcePath());
+                List<RemoteResourceInfo> lsOut = sftpClient.ls(resourcePath);
+                FileAttributes lsStat = sftpClient.lstat(resourcePath);
                 sftpClient.close();
 
                 DirectoryResourceMetadata.Builder dirMetadataBuilder = DirectoryResourceMetadata.Builder.getBuilder()
@@ -223,8 +174,8 @@ public class SCPMetadataCollector implements MetadataCollector {
                     }
                 }
 
-                dirMetadataBuilder = dirMetadataBuilder.withFriendlyName(new File(scpResource.getDirectory().getResourcePath()).getName())
-                        .withResourcePath(scpResource.getDirectory().getResourcePath())
+                dirMetadataBuilder = dirMetadataBuilder.withFriendlyName(new File(resourcePath).getName())
+                        .withResourcePath(resourcePath)
                         .withCreatedTime(lsStat.getAtime())
                         .withUpdateTime(lsStat.getMtime());
                 return dirMetadataBuilder.build();
@@ -233,148 +184,56 @@ public class SCPMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                                  String storageId, String credentialToken) throws Exception {
 
-        GenericResource resource;
-        SCPSecret scpSecret;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(authZToken)
-                    .setResourceId(resourceId).build());
-        }
+        SCPStorage scpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
-            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+            scpStorage = storageServiceClient.scp()
+                    .getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
         }
 
-        return getDirectoryResourceMetadata(authZToken, resource, scpSecret);
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String childResourcePath, String credentialToken) throws Exception {
-
-        GenericResource resource;
         SCPSecret scpSecret;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(authZToken).setResourceId(parentResourceId).build());
-        }
 
         try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
             scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
                     .setAuthzToken(authZToken).setSecretId(credentialToken).build());
         }
 
-        boolean isChildPath = false;
-        if (childResourcePath != null && !"".equals(childResourcePath)) {
-            isChildPath = true;
-        }
-
-        String resourcePath = null;
-
-        switch (resource.getResourceCase()){
-            case FILE:
-                if (isChildPath){
-                    throw new Exception("A child path can not be associated with a file parent");
-                }
-                resourcePath = resource.getFile().getResourcePath();
-                break;
-            case DIRECTORY:
-                resourcePath = resource.getDirectory().getResourcePath();
-                if (isChildPath) {
-                    if (!childResourcePath.startsWith(resourcePath)) {
-                        throw new Exception("Child path " + childResourcePath + " is not in the parent path " + resourcePath);
-                    }
-                    resourcePath = childResourcePath;
-                }
-
-                break;
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + parentResourceId);
-        }
-
-        GenericResource scpResource = GenericResource.newBuilder()
-                .setDirectory(DirectoryResource.newBuilder().setResourcePath(resourcePath).build())
-                .setScpStorage(resource.getScpStorage()).build();
-
-        return getDirectoryResourceMetadata(authZToken,scpResource, scpSecret);
+        return getDirectoryResourceMetadata(authZToken, resourcePath, scpStorage, scpSecret);
     }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
 
         checkInitialized();
 
-        GenericResource resource;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(authZToken).setResourceId(resourceId).build());
-        }
-
-        return isAvailable(authZToken, resource, credentialToken);
-    }
-
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        checkInitialized();
+        SCPStorage scpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        GenericResource resource;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(authToken).setResourceId(parentResourceId).build());
+            scpStorage = storageServiceClient.scp()
+                    .getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
         }
 
-        validateParent(resource, resourcePath);
-
-        GenericResource targetScpResource = GenericResource.newBuilder()
-                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setScpStorage(resource.getScpStorage()).build();
-
-        return isAvailable(authToken, targetScpResource, credentialToken);
-    }
-
-    public Boolean isAvailable(AuthToken authToken, GenericResource scpResource, String credentialToken) throws Exception {
-
-
         SCPSecret scpSecret;
 
         try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
             scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                    .setAuthzToken(authToken).setSecretId(credentialToken).build());
+                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
         }
 
-        try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
-            logger.info("Checking the availability of file {}", scpResource.getFile().getResourcePath());
+        try (SSHClient sshClient = getSSHClient(scpStorage, scpSecret)) {
+            logger.info("Checking the availability of file {}", resourcePath);
             try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
-                switch (scpResource.getResourceCase().name()){
-                    case ResourceTypes.FILE:
-                        return sftpClient.statExistence(scpResource.getFile().getResourcePath()) != null;
-                    case ResourceTypes.DIRECTORY:
-                        return sftpClient.statExistence(scpResource.getDirectory().getResourcePath()) != null;
-                }
-                return false;
+                return sftpClient.statExistence(resourcePath) != null;
             }
         }
     }
 
-    private void validateParent(GenericResource parentSCPResource, String resourcePath) throws Exception {
-        if (!ResourceTypes.DIRECTORY.equals(parentSCPResource.getResourceCase().name())) {
-            logger.error("Parent resource " + parentSCPResource.getResourceId() + " is not a DIRECTORY type");
-            throw new Exception("Parent resource " + parentSCPResource.getResourceId() + " is not a DIRECTORY type");
-        }
-
-        String parentDir = parentSCPResource.getDirectory().getResourcePath();
-        parentDir = parentDir.endsWith(File.separator) ? parentDir : parentDir + File.separator;
-        if (!resourcePath.startsWith(parentDir)) {
-            logger.error("Given resource path " + resourcePath + " is not a part of the parent resource path "
-                    + parentSCPResource.getDirectory().getResourcePath());
-            throw new Exception("Given resource path " + resourcePath + " is not a part of the parent resource path "
-                    + parentSCPResource.getDirectory().getResourcePath());
-        }
-    }
-
-    private SSHClient getSSHClient(GenericResource scpResource, SCPSecret scpSecret) throws IOException {
+    private SSHClient getSSHClient(SCPStorage scpStorage, SCPSecret scpSecret) throws IOException {
 
         SSHClient sshClient = new SSHClient();
 
@@ -408,7 +267,7 @@ public class SCPMetadataCollector implements MetadataCollector {
             }
         }));
 
-        sshClient.connect(scpResource.getScpStorage().getHost(), scpResource.getScpStorage().getPort());
+        sshClient.connect(scpStorage.getHost(), scpStorage.getPort());
         sshClient.auth(scpSecret.getUser(), am);
 
         return sshClient;
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
index 48044aa..a00b82e 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -26,9 +26,12 @@ import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.common.GenericResource;
 import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
 import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -42,7 +45,6 @@ public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
 
     private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);
 
-    private GenericResource resource;
     private Session session;
     private OutputStream out;
     private InputStream in;
@@ -55,17 +57,12 @@ public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
     public void init(ConnectorConfig cc) throws Exception {
 
         this.cc = cc;
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+        SCPStorage scpStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
                 .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+            scpStorage = storageServiceClient.scp()
+                    .getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
         }
 
         SCPSecret scpSecret;
@@ -78,7 +75,6 @@ public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
                     .setSecretId(cc.getCredentialToken()).build());
         }
 
-        SCPStorage scpStorage = resource.getScpStorage();
         logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
 
         this.session = SCPTransportUtil.createSession(
@@ -101,38 +97,8 @@ public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
 
     @Override
     public OutputStream fetchOutputStream() throws Exception {
-        String resourcePath = null;
-        switch (resource.getResourceCase()){
-            case FILE:
-                resourcePath = resource.getFile().getResourcePath();
-                break;
-            case DIRECTORY:
-                throw new Exception("A directory path can not be streamed");
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
-        }
-
-        return fetchOutputStreamJCraft(escapeSpecialChars(resourcePath), cc.getMetadata().getResourceSize());
-    }
-
-    @Override
-    public OutputStream fetchOutputStream(String childPath) throws Exception {
-        String resourcePath = null;
-        switch (resource.getResourceCase()){
-            case FILE:
-                throw new Exception("A child path can not be associated with a file parent");
-            case DIRECTORY:
-                resourcePath = resource.getDirectory().getResourcePath();
-                if (!childPath.startsWith(resourcePath)) {
-                    throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
-                }
-                resourcePath = childPath;
-                break;
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
-        }
 
-        return fetchOutputStreamJCraft(escapeSpecialChars(resourcePath), cc.getMetadata().getResourceSize());
+        return fetchOutputStreamJCraft(escapeSpecialChars(cc.getResourcePath()), cc.getMetadata().getResourceSize());
     }
 
     public OutputStream fetchOutputStreamJCraft(String resourcePath, long fileSize) throws Exception {
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
index 7c195b7..34db1b7 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java
@@ -21,15 +21,12 @@ import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
 import org.apache.airavata.mft.credential.stubs.swift.SwiftSecret;
 import org.apache.airavata.mft.credential.stubs.swift.SwiftSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorage;
+import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.apache.commons.io.IOUtils;
 import org.jclouds.ContextBuilder;
 import org.jclouds.http.options.GetOptions;
 import org.jclouds.openstack.keystone.auth.config.CredentialTypes;
@@ -49,26 +46,23 @@ public class SwiftIncomingConnector implements IncomingChunkedConnector {
 
     private static final Logger logger = LoggerFactory.getLogger(SwiftIncomingConnector.class);
 
-    private GenericResource resource;
     private SwiftApi swiftApi;
     private ObjectApi objectApi;
+    private String resourcePath;
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
-                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
+        SwiftStorage swiftStorage;
+
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-        if (resource.getStorageCase() != GenericResource.StorageCase.SWIFTSTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+            swiftStorage = storageServiceClient.swift()
+                    .getSwiftStorage(SwiftStorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
         }
 
-        SwiftStorage swiftStorage = resource.getSwiftStorage();
+        this.resourcePath = resourcePath;
 
         SwiftSecret swiftSecret;
 
@@ -126,7 +120,7 @@ public class SwiftIncomingConnector implements IncomingChunkedConnector {
     @Override
     public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
         SwiftObject swiftObject = objectApi.get(
-                resource.getFile().getResourcePath(),
+                resourcePath,
                 GetOptions.Builder.range(startByte, endByte));
 
         InputStream inputStream = swiftObject.getPayload().openStream();
@@ -145,7 +139,7 @@ public class SwiftIncomingConnector implements IncomingChunkedConnector {
     public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception {
 
         SwiftObject swiftObject = objectApi.get(
-                resource.getFile().getResourcePath(),
+                resourcePath,
                 GetOptions.Builder.range(startByte, endByte));
 
         return swiftObject.getPayload().openStream();
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftMetadataCollector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftMetadataCollector.java
index 2200b2b..5d4210f 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftMetadataCollector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftMetadataCollector.java
@@ -23,11 +23,10 @@ import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.swift.SwiftSecret;
 import org.apache.airavata.mft.credential.stubs.swift.SwiftSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorage;
+import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.jclouds.ContextBuilder;
@@ -92,20 +91,28 @@ public class SwiftMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         checkInitialized();
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource swiftResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        SwiftStorage swiftStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SwiftSecret swiftSecret = secretClient.swift().getSwiftSecret(SwiftSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+            swiftStorage = storageServiceClient.swift()
+                    .getSwiftStorage(SwiftStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
+
+        SwiftSecret swiftSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            swiftSecret = secretClient.swift().getSwiftSecret(SwiftSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
-        SwiftApi swiftApi = getSwiftApi(swiftResource.getSwiftStorage(), swiftSecret);
+        SwiftApi swiftApi = getSwiftApi(swiftStorage, swiftSecret);
 
-        ObjectApi objectApi = swiftApi.getObjectApi(swiftResource.getSwiftStorage().getRegion(), swiftResource.getSwiftStorage().getContainer());
+        ObjectApi objectApi = swiftApi.getObjectApi(swiftStorage.getRegion(), swiftStorage.getContainer());
 
-        SwiftObject swiftObject = objectApi.get(swiftResource.getFile().getResourcePath());
+        SwiftObject swiftObject = objectApi.get(resourcePath);
 
         FileResourceMetadata metadata = new FileResourceMetadata();
         metadata.setResourceSize(swiftObject.getPayload().getContentMetadata().getContentLength());
@@ -116,41 +123,35 @@ public class SwiftMetadataCollector implements MetadataCollector {
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
-    }
-
-    @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourcePath,
+                                                                  String storageId, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
     @Override
-    public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
+    public Boolean isAvailable(AuthToken authZToken, String resourcePath, String storageId, String credentialToken) throws Exception {
         checkInitialized();
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource swiftResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        SwiftStorage swiftStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(resourceServiceHost, resourceServicePort)) {
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SwiftSecret swiftSecret = secretClient.swift().getSwiftSecret(SwiftSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+            swiftStorage = storageServiceClient.swift()
+                    .getSwiftStorage(SwiftStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        }
 
-        SwiftApi swiftApi = getSwiftApi(swiftResource.getSwiftStorage(), swiftSecret);
+        SwiftSecret swiftSecret;
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                secretServiceHost, secretServicePort)) {
+            swiftSecret = secretClient.swift().getSwiftSecret(SwiftSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        }
 
-        ObjectApi objectApi = swiftApi.getObjectApi(swiftResource.getSwiftStorage().getRegion(), swiftResource.getSwiftStorage().getContainer());
+        SwiftApi swiftApi = getSwiftApi(swiftStorage, swiftSecret);
 
-        SwiftObject swiftObject = objectApi.get(swiftResource.getFile().getResourcePath());
+        ObjectApi objectApi = swiftApi.getObjectApi(swiftStorage.getRegion(), swiftStorage.getContainer());
 
-        return swiftObject != null;
-    }
+        SwiftObject swiftObject = objectApi.get(resourcePath);
 
-    @Override
-    public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
-        throw new UnsupportedOperationException("Method not implemented");
+        return swiftObject != null;
     }
 }
diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
index 38fd79a..565ab30 100644
--- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
+++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java
@@ -21,11 +21,10 @@ import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
 import org.apache.airavata.mft.credential.stubs.swift.SwiftSecret;
 import org.apache.airavata.mft.credential.stubs.swift.SwiftSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorage;
+import org.apache.airavata.mft.resource.stubs.swift.storage.SwiftStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.jclouds.ContextBuilder;
@@ -36,7 +35,6 @@ import org.jclouds.openstack.swift.v1.SwiftApi;
 import org.jclouds.openstack.swift.v1.domain.Segment;
 import org.jclouds.openstack.swift.v1.features.ObjectApi;
 import org.jclouds.openstack.swift.v1.features.StaticLargeObjectApi;
-import org.jclouds.openstack.swift.v1.options.PutOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,10 +47,10 @@ public class SwiftOutgoingConnector implements OutgoingChunkedConnector {
 
     private static final Logger logger = LoggerFactory.getLogger(SwiftOutgoingConnector.class);
 
-    private GenericResource resource;
     private SwiftApi swiftApi;
     private ObjectApi objectApi;
     private StaticLargeObjectApi staticLargeObjectApi;
+    private String resourcePath;
 
     private final Map<Integer, Segment> segmentMap = new ConcurrentHashMap();
 
@@ -60,20 +58,16 @@ public class SwiftOutgoingConnector implements OutgoingChunkedConnector {
 
     @Override
     public void init(ConnectorConfig cc) throws Exception {
-        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
-                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
 
-            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                    .setAuthzToken(cc.getAuthToken())
-                    .setResourceId(cc.getResourceId()).build());
-        }
+        SwiftStorage swiftStorage;
+        try (StorageServiceClient storageServiceClient = StorageServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+            swiftStorage = storageServiceClient.swift()
+                    .getSwiftStorage(SwiftStorageGetRequest.newBuilder().setStorageId(cc.getStorageId()).build());
 
-        if (resource.getStorageCase() != GenericResource.StorageCase.SWIFTSTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
-            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
         }
 
-        SwiftStorage swiftStorage = resource.getSwiftStorage();
+        this.resourcePath = cc.getResourcePath();
 
         SwiftSecret swiftSecret;
 
@@ -124,7 +118,7 @@ public class SwiftOutgoingConnector implements OutgoingChunkedConnector {
             segments.add(segmentMap.get(id));
         }
 
-        String etag = staticLargeObjectApi.replaceManifest(resource.getFile().getResourcePath(),
+        String etag = staticLargeObjectApi.replaceManifest(resourcePath,
                 segments, new HashMap<>());
 
         if (swiftApi != null) {
@@ -145,9 +139,9 @@ public class SwiftOutgoingConnector implements OutgoingChunkedConnector {
 
     @Override
     public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception {
-        String etag = objectApi.put(resource.getFile().getResourcePath() + chunkId, new InputStreamPayload(inputStream));
+        String etag = objectApi.put(resourcePath + chunkId, new InputStreamPayload(inputStream));
         Segment segment = Segment.builder().etag(etag)
-                .path(resource.getFile().getResourcePath() + chunkId)
+                .path(resourcePath + chunkId)
                 .sizeBytes(endByte - startByte).build();
         segmentMap.put(chunkId, segment);
     }