You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2021/10/23 03:32:45 UTC

[airavata-mft] branch develop updated: Simplifying the streaming logic to perform transfer using a single thread

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/develop by this push:
     new 490a953  Simplifying the streaming logic to perform transfer using a single thread
490a953 is described below

commit 490a953256ea411efc7e0ae5f91aa7f15cf71bba
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Fri Oct 22 23:32:36 2021 -0400

    Simplifying the streaming logic to perform transfer using a single thread
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    |  49 ++--
 .../airavata/mft/agent/TransportMediator.java      | 254 +++++++--------------
 .../mft/agent/http/AgentHttpDownloadData.java      | 105 +++++++++
 .../airavata/mft/agent/http/ConnectorParams.java   |  60 -----
 .../mft/agent/http/HttpDownloadRequest.java        |  75 ------
 .../airavata/mft/agent/http/HttpServerHandler.java |  62 +----
 .../mft/agent/http/HttpTransferRequest.java        | 105 ---------
 .../mft/agent/http/HttpTransferRequestsStore.java  |  16 +-
 .../apache/airavata/mft/agent/rpc/RPCParser.java   |  44 ++--
 agent/src/main/resources/application.properties    |   3 +-
 .../airavata/mft/core/ConnectorResolver.java       | 103 ++-------
 .../airavata/mft/core/FileResourceMetadata.java    |   2 +-
 .../org/apache/airavata/mft/core/TransferTask.java |  53 -----
 .../airavata/mft/core/api/ConnectorConfig.java     | 167 ++++++++++++++
 .../airavata/mft/core/api/IncomingConnector.java   |  27 +++
 .../airavata/mft/core/api/OutgoingConnector.java   |  27 +++
 scripts/build.sh                                   |   3 +-
 .../src/main/resources/applicationContext.xml      |   2 +-
 .../mft/transport/scp/LimitInputStream.java        |  94 ++++++++
 .../mft/transport/scp/SCPIncomingConnector.java    | 229 +++++++++++++++++++
 .../mft/transport/scp/SCPMetadataCollector.java    |   2 +-
 .../mft/transport/scp/SCPOutgoingConnector.java    | 223 ++++++++++++++++++
 22 files changed, 1067 insertions(+), 638 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5ccf25a..bec7945 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -35,9 +35,9 @@ import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.agent.rpc.RPCParser;
 import org.apache.airavata.mft.api.service.CallbackEndpoint;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.core.ConnectorResolver;
+import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -172,14 +172,6 @@ public class MFTAgent implements CommandLineRunner {
                             .setPublisher(agentId)
                             .setDescription("Starting the transfer"));
 
-                        Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
-                        Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
-                        inConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
-                        Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
-                        Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
-                        outConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
                         Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
                         MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
                         srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
@@ -188,6 +180,34 @@ public class MFTAgent implements CommandLineRunner {
                         MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
                         dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
+                        FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
+                                request.getMftAuthorizationToken(),
+                                request.getSourceResourceId(),
+                                request.getSourceToken());
+
+
+                        ConnectorConfig srcCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                                .withAuthToken(request.getMftAuthorizationToken())
+                                .withResourceServiceHost(resourceServiceHost)
+                                .withResourceServicePort(resourceServicePort)
+                                .withSecretServiceHost(secretServiceHost)
+                                .withSecretServicePort(secretServicePort)
+                                .withTransferId(transferId)
+                                .withResourceId(request.getSourceResourceId())
+                                .withCredentialToken(request.getSourceToken())
+                                .withMetadata(srcMetadata).build();
+
+                        ConnectorConfig dstCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                                .withAuthToken(request.getMftAuthorizationToken())
+                                .withResourceServiceHost(resourceServiceHost)
+                                .withResourceServicePort(resourceServicePort)
+                                .withSecretServiceHost(secretServiceHost)
+                                .withSecretServicePort(secretServicePort)
+                                .withTransferId(transferId)
+                                .withResourceId(request.getDestinationResourceId())
+                                .withCredentialToken(request.getDestinationToken())
+                                .withMetadata(srcMetadata).build();
+
                         mftConsulClient.submitTransferStateToProcess(transferId, agentId, new TransferState()
                             .setState("STARTED")
                             .setPercentage(0)
@@ -195,13 +215,11 @@ public class MFTAgent implements CommandLineRunner {
                             .setPublisher(agentId)
                             .setDescription("Started the transfer"));
 
-
-                        TransferApiRequest finalRequest = request;
-                        mediator.transfer(transferId, request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
+                        mediator.transferSingleThread(transferId, request, srcCC, dstCC,
                                 (id, st) -> {
                                     try {
                                         mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
-                                        handleCallbacks(finalRequest.getCallbackEndpointsList(), id, st);
+
                                     } catch (MFTConsulClientException e) {
                                         logger.error("Failed while updating transfer state", e);
                                     }
@@ -213,8 +231,7 @@ public class MFTAgent implements CommandLineRunner {
                                     } catch (Exception e) {
                                         logger.error("Failed while deleting scheduled path for transfer {}", id);
                                     }
-                                }
-                        );
+                        });
 
                         logger.info("Started the transfer " + transferId);
 
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index b9195b8..6822275 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -20,17 +20,17 @@ package org.apache.airavata.mft.agent;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
 import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.*;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 
 public class TransportMediator {
@@ -40,197 +40,113 @@ public class TransportMediator {
     /*
     Number of maximum transfers handled at atime
      */
-    private int concurrentTransfers = 10;
-    private ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
-    private ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+    private final int concurrentTransfers = 10;
+    private final ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
+    private final ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
 
-    public void destroy() {
-        executor.shutdown();
-    }
-
-    public void transfer(String transferId, TransferApiRequest request, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
-                           MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
-                           BiConsumer<String, Boolean> exitingCallback) throws Exception {
+    public void transferSingleThread(String transferId,
+                                     TransferApiRequest request,
+                                     ConnectorConfig srcCC,
+                                     ConnectorConfig dstCC,
+                                     BiConsumer<String, TransferState> onStatusCallback,
+                                     BiConsumer<String, Boolean> exitingCallback) {
 
-        FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
-                            request.getMftAuthorizationToken(),
-                            request.getSourceResourceId(),
-                            request.getSourceToken());
+        executor.submit(() -> {
 
-        final long resourceSize = srcMetadata.getResourceSize();
-        logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
+            final AtomicBoolean transferInProgress = new AtomicBoolean(true);
 
-        final DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
-        final ReentrantLock statusLock = new ReentrantLock();
+            try {
 
-        ConnectorContext context = new ConnectorContext();
-        context.setMetadata(srcMetadata);
-        context.setStreamBuffer(streamBuffer);
-        context.setTransferId(transferId);
+                long start = System.currentTimeMillis();
 
-        TransferTask recvTask = new TransferTask(request.getMftAuthorizationToken(), request.getSourceResourceId(),
-                request.getSourceChildResourcePath(), request.getSourceToken(), context, inConnector);
-        TransferTask sendTask = new TransferTask(request.getMftAuthorizationToken(), request.getDestinationResourceId(),
-                request.getDestinationChildResourcePath(), request.getDestinationToken(), context, outConnector);
-        List<Future<Integer>> futureList = new ArrayList<>();
+                onStatusCallback.accept(transferId, new TransferState()
+                        .setPercentage(100)
+                        .setState("RUNNING")
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription("Transfer successfully completed"));
 
-        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
+                Optional<IncomingConnector> inConnectorOp = ConnectorResolver.resolveIncomingConnector(request.getSourceType());
+                Optional<OutgoingConnector> outConnectorOp = ConnectorResolver.resolveOutgoingConnector(request.getDestinationType());
 
-        long startTime = System.nanoTime();
+                IncomingConnector inConnector = inConnectorOp
+                        .orElseThrow(() -> new Exception("Could not find an in connector for type " + request.getSourceType()));
 
-        futureList.add(completionService.submit(recvTask));
-        futureList.add(completionService.submit(sendTask));
+                OutgoingConnector outConnector = outConnectorOp
+                        .orElseThrow(() -> new Exception("Could not find an out connector for type " + request.getDestinationType()));
 
-        final AtomicBoolean transferInProgress = new AtomicBoolean(true);
-        final AtomicBoolean transferSuccess = new AtomicBoolean(true);
+                inConnector.init(srcCC);
+                outConnector.init(dstCC);
 
+                String srcChild = request.getSourceChildResourcePath();
+                String dstChild = request.getDestinationChildResourcePath();
 
-        // Monitoring the completeness of the transfer
-        Thread monitorThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
+                InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+                OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
 
-                try {
-                    int futureCnt = futureList.size();
-                    boolean transferErrored = false;
+                long count = 0;
+                final AtomicLong countAtomic = new AtomicLong();
+                countAtomic.set(count);
 
-                    for (int i = 0; i < futureCnt; i++) {
-                        Future<Integer> ft = completionService.take();
-                        futureList.remove(ft);
+                monitorPool.submit(() -> {
+                    while (true) {
                         try {
-                            ft.get();
-                        } catch (Exception e) {
-
-                            logger.error("One task failed with error", e);
-                            transferErrored = true;
-                            statusLock.lock();
-                            onStatusCallback.accept(transferId, new TransferState()
-                                .setPercentage(0)
-                                .setState("FAILED")
-                                .setUpdateTimeMils(System.currentTimeMillis())
-                                .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
-                            transferInProgress.set(false);
-                            transferSuccess.set(false);
-                            statusLock.unlock();
-
-                            for (Future<Integer> f : futureList) {
-                                try {
-                                    Thread.sleep(1000);
-                                } catch (InterruptedException ex) {
-                                    logger.error("Sleep failed", e);
-                                }
-                                f.cancel(true);
-                            }
-                            futureList.clear();
-                        }
-                    }
-
-                    if (!transferErrored) {
-                        Boolean transferred = destMetadataCollector.isAvailable(
-                                request.getMftAuthorizationToken(),
-                                request.getDestinationResourceId(),
-                                request.getDestinationToken());
-
-
-                        if (!transferred) {
-                            logger.error("Transfer completed but resource is not available in destination");
-                            throw new Exception("Transfer completed but resource is not available in destination");
+                            Thread.sleep(2000);
+                        } catch (InterruptedException e) {
+                            // Ignore
                         }
-
-                        FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
-                                request.getMftAuthorizationToken(),
-                                request.getDestinationResourceId(),
-                                request.getDestinationToken());
-
-
-                        boolean doIntegrityVerify = true;
-
-                        if (srcMetadata.getMd5sum() == null) {
-                            logger.warn("MD5 sum is not available for source resource. So this disables integrity verification");
-                            doIntegrityVerify = false;
-                        } else if (destMetadata.getMd5sum() == null) {
-                            logger.warn("MD5 sum is not available for destination resource. So this disables integrity verification");
-                            doIntegrityVerify = false;
+                        if (!transferInProgress.get()) {
+                            logger.info("Status monitor is exiting for transfer {}", transferId);
+                            break;
                         }
+                        double transferPercentage = countAtomic.get() * 100.0/ srcCC.getMetadata().getResourceSize();
+                        logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+                        onStatusCallback.accept(transferId, new TransferState()
+                                .setPercentage(transferPercentage)
+                                .setState("RUNNING")
+                                .setUpdateTimeMils(System.currentTimeMillis())
+                                .setDescription("Transfer Progress Updated"));
+                    }
+                });
 
-                        if (doIntegrityVerify && !destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
-                            logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
-                                    srcMetadata.getMd5sum(), destMetadata.getMd5sum());
-                            throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
-                                    + " destination md5 " + destMetadata.getMd5sum());
-                        }
+                int n;
+                byte[] buffer = new byte[128 * 1024];
+                for(count = 0L; -1 != (n = inputStream.read(buffer)); count += (long)n) {
+                    outputStream.write(buffer, 0, n);
+                    countAtomic.set(count);
+                }
 
-                        // Check
+                inConnector.complete();
+                outConnector.complete();
 
-                        long endTime = System.nanoTime();
+                long time = (System.currentTimeMillis() - start) / 1000;
 
-                        double time = (endTime - startTime) * 1.0 / 1000000000;
+                logger.info("Transfer {} completed. Time {} S.  Speed {} MB/s", transferId, time,
+                        (srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
 
-                        statusLock.lock();
-                        onStatusCallback.accept(transferId, new TransferState()
-                                .setPercentage(100)
-                                .setState("COMPLETED")
-                                .setUpdateTimeMils(System.currentTimeMillis())
-                                .setDescription("Transfer successfully completed"));
-                        transferInProgress.set(false);
-                        transferSuccess.set(true);
-                        statusLock.unlock();
+                onStatusCallback.accept(transferId, new TransferState()
+                        .setPercentage(100)
+                        .setState("COMPLETED")
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription("Transfer successfully completed"));
 
-                        logger.info("Transfer {} completed.  Speed {} MB/s", transferId,
-                                (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
-                    }
-                } catch (Exception e) {
-
-                    statusLock.lock();
-                    onStatusCallback.accept(transferId, new TransferState()
-                            .setPercentage(0)
-                            .setState("FAILED")
-                            .setUpdateTimeMils(System.currentTimeMillis())
-                            .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
-                    transferInProgress.set(false);
-                    transferSuccess.set(false);
-                    statusLock.unlock();
-
-                    logger.error("Transfer {} failed", transferId, e);
-                } finally {
-                    inConnector.destroy();
-                    outConnector.destroy();
-                    transferInProgress.set(false);
-                    exitingCallback.accept(transferId,transferSuccess.get());
-                }
-            }
-        });
+                exitingCallback.accept(transferId, true);
+            } catch (Exception e) {
 
-        // Monitoring the status of the transfer
-        Thread progressThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (true) {
+                logger.error("Transfer {} failed with error", transferId, e);
 
-                    try {
-                        Thread.sleep(2000);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                    statusLock.lock();
-                    if (!transferInProgress.get()){
-                        statusLock.unlock();
-                        logger.info("Status monitor is exiting for transfer {}", transferId);
-                        break;
-                    }
-                    double transferPercentage = streamBuffer.getProcessedBytes() * 100.0/ resourceSize;
-                    logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
-                    onStatusCallback.accept(transferId, new TransferState()
-                            .setPercentage(transferPercentage)
-                            .setState("RUNNING")
-                            .setUpdateTimeMils(System.currentTimeMillis())
-                            .setDescription("Transfer Progress Updated"));
-                    statusLock.unlock();
-                }
+                onStatusCallback.accept(transferId, new TransferState()
+                        .setPercentage(0)
+                        .setState("FAILED")
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+            } finally {
+                transferInProgress.set(false);
             }
         });
+    }
 
-        monitorPool.submit(monitorThread);
-        monitorPool.submit(progressThread);
+    public void destroy() {
+        executor.shutdown();
+        monitorPool.shutdown();
     }
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
new file mode 100644
index 0000000..1b8930c
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingConnector;
+
+public class AgentHttpDownloadData {
+    private IncomingConnector incomingConnector;
+    private ConnectorConfig connectorConfig;
+    private String childResourcePath;
+    private long createdTime = System.currentTimeMillis();
+
+    public IncomingConnector getIncomingConnector() {
+        return incomingConnector;
+    }
+
+    public void setIncomingConnector(IncomingConnector incomingConnector) {
+        this.incomingConnector = incomingConnector;
+    }
+
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
+
+    public void setConnectorConfig(ConnectorConfig connectorConfig) {
+        this.connectorConfig = connectorConfig;
+    }
+
+    public String getChildResourcePath() {
+        return childResourcePath;
+    }
+
+    public void setChildResourcePath(String childResourcePath) {
+        this.childResourcePath = childResourcePath;
+    }
+
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
+    }
+
+
+    public static final class AgentHttpDownloadDataBuilder {
+        private IncomingConnector incomingConnector;
+        private ConnectorConfig connectorConfig;
+        private String childResourcePath;
+        private long createdTime = System.currentTimeMillis();
+
+        private AgentHttpDownloadDataBuilder() {
+        }
+
+        public static AgentHttpDownloadDataBuilder newBuilder() {
+            return new AgentHttpDownloadDataBuilder();
+        }
+
+        public AgentHttpDownloadDataBuilder withIncomingConnector(IncomingConnector incomingConnector) {
+            this.incomingConnector = incomingConnector;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withChildResourcePath(String childResourcePath) {
+            this.childResourcePath = childResourcePath;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withCreatedTime(long createdTime) {
+            this.createdTime = createdTime;
+            return this;
+        }
+
+
+        public AgentHttpDownloadData build() {
+            AgentHttpDownloadData agentHttpDownloadData = new AgentHttpDownloadData();
+            agentHttpDownloadData.setIncomingConnector(incomingConnector);
+            agentHttpDownloadData.setConnectorConfig(connectorConfig);
+            agentHttpDownloadData.setChildResourcePath(childResourcePath);
+            agentHttpDownloadData.setCreatedTime(createdTime);
+            return agentHttpDownloadData;
+        }
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
deleted file mode 100644
index 99a4b38..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-public class ConnectorParams {
-
-    private String resourceServiceHost, secretServiceHost;
-    private int resourceServicePort, secretServicePort;
-
-    public String getResourceServiceHost() {
-        return resourceServiceHost;
-    }
-
-    public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
-        this.resourceServiceHost = resourceServiceHost;
-        return this;
-    }
-
-    public String getSecretServiceHost() {
-        return secretServiceHost;
-    }
-
-    public ConnectorParams setSecretServiceHost(String secretServiceHost) {
-        this.secretServiceHost = secretServiceHost;
-        return this;
-    }
-
-    public int getResourceServicePort() {
-        return resourceServicePort;
-    }
-
-    public ConnectorParams setResourceServicePort(int resourceServicePort) {
-        this.resourceServicePort = resourceServicePort;
-        return this;
-    }
-
-    public int getSecretServicePort() {
-        return secretServicePort;
-    }
-
-    public ConnectorParams setSecretServicePort(int secretServicePort) {
-        this.secretServicePort = secretServicePort;
-        return this;
-    }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
deleted file mode 100644
index 2cf56af..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpDownloadRequest {
-
-    private ConnectorParams connectorParams;
-    private Connector srcConnector;
-    private MetadataCollector srcMetadataCollector;
-    private String srcResourceId;
-    private String srcToken;
-
-    public ConnectorParams getConnectorParams() {
-        return connectorParams;
-    }
-
-    public HttpDownloadRequest setConnectorParams(ConnectorParams connectorParams) {
-        this.connectorParams = connectorParams;
-        return this;
-    }
-
-    public Connector getSrcConnector() {
-        return srcConnector;
-    }
-
-    public HttpDownloadRequest setSrcConnector(Connector srcConnector) {
-        this.srcConnector = srcConnector;
-        return this;
-    }
-
-    public MetadataCollector getSrcMetadataCollector() {
-        return srcMetadataCollector;
-    }
-
-    public HttpDownloadRequest setSrcMetadataCollector(MetadataCollector srcMetadataCollector) {
-        this.srcMetadataCollector = srcMetadataCollector;
-        return this;
-    }
-
-    public String getSrcResourceId() {
-        return srcResourceId;
-    }
-
-    public HttpDownloadRequest setSrcResourceId(String srcResourceId) {
-        this.srcResourceId = srcResourceId;
-        return this;
-    }
-
-    public String getSrcToken() {
-        return srcToken;
-    }
-
-    public HttpDownloadRequest setSrcToken(String srcToken) {
-        this.srcToken = srcToken;
-        return this;
-    }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index f03a952..87153b4 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -25,14 +25,12 @@ import io.netty.util.CharsetUtil;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.*;
 import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.IncomingConnector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.activation.MimetypesFileTypeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.io.InputStream;
 
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpResponseStatus.*;
@@ -43,7 +41,6 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
     private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
 
     private final HttpTransferRequestsStore transferRequestsStore;
-    private final ExecutorService executor = Executors.newFixedThreadPool(10);
 
     public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
         this.transferRequestsStore = transferRequestsStore;
@@ -66,48 +63,19 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
             final String uri = request.uri().substring(request.uri().lastIndexOf("/") + 1);
             logger.info("Received download request through url {}", uri);
 
-            HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+            AgentHttpDownloadData downloadData = transferRequestsStore.getDownloadRequest(uri);
 
-            if (httpTransferRequest == null) {
+            if (downloadData == null) {
                 logger.error("Couldn't find transfer request for uri {}", uri);
                 sendError(ctx, NOT_FOUND);
                 return;
             }
 
-            Connector connector = httpTransferRequest.getOtherConnector();
-            MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
-
-            ConnectorParams params = httpTransferRequest.getConnectorParams();
-
-            // TODO Load from HTTP Headers
-            AuthToken authToken = httpTransferRequest.getAuthToken();
-
-            connector.init(params.getResourceServiceHost(),
-                    params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
-
-            metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
-                    params.getSecretServiceHost(), params.getSecretServicePort());
-
-            Boolean available = metadataCollector.isAvailable(authToken,
-                    httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
-
-
-            if (!available) {
-                logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
-                sendError(ctx, NOT_FOUND);
-                return;
-            }
-
-            FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
-                    httpTransferRequest.getResourceId(),
-                    httpTransferRequest.getChildResourcePath(),
-                    httpTransferRequest.getCredentialToken());
-
-            long fileLength = fileResourceMetadata.getResourceSize();
+            long fileLength = downloadData.getConnectorConfig().getMetadata().getResourceSize();
 
             HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
             HttpUtil.setContentLength(response, fileLength);
-            setContentTypeHeader(response, fileResourceMetadata.getFriendlyName());
+            setContentTypeHeader(response, downloadData.getConnectorConfig().getMetadata().getFriendlyName());
 
             if (HttpUtil.isKeepAlive(request)) {
                 response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
@@ -120,19 +88,13 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
             ChannelFuture sendFileFuture;
             ChannelFuture lastContentFuture;
 
-            ConnectorContext connectorContext = new ConnectorContext();
-            connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
-            connectorContext.setTransferId(uri);
-            connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
-
-            TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
-                    httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
-                    connectorContext, connector);
-
-            // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
-            Future<Integer> pullStatusFuture = executor.submit(pullTask);
+            IncomingConnector incomingConnector = downloadData.getIncomingConnector();
+            incomingConnector.init(downloadData.getConnectorConfig());
+            InputStream inputStream = downloadData.getChildResourcePath().equals("")?
+                    incomingConnector.fetchInputStream() :
+                    incomingConnector.fetchInputStream(downloadData.getChildResourcePath());
 
-            sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
+            sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
                     ctx.newProgressivePromise());
 
             // HttpChunkedInput will write the end marker (LastHttpContent) for us.
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
deleted file mode 100644
index c6d5192..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpTransferRequest {
-    private Connector otherConnector;
-    private MetadataCollector otherMetadataCollector;
-    private ConnectorParams connectorParams;
-    private String resourceId;
-    private String childResourcePath;
-    private String credentialToken;
-    private long createdTime = System.currentTimeMillis();
-    private AuthToken authToken;
-
-    public Connector getOtherConnector() {
-        return otherConnector;
-    }
-
-    public HttpTransferRequest setOtherConnector(Connector otherConnector) {
-        this.otherConnector = otherConnector;
-        return this;
-    }
-
-    public MetadataCollector getOtherMetadataCollector() {
-        return otherMetadataCollector;
-    }
-
-    public HttpTransferRequest setOtherMetadataCollector(MetadataCollector otherMetadataCollector) {
-        this.otherMetadataCollector = otherMetadataCollector;
-        return this;
-    }
-
-    public String getResourceId() {
-        return resourceId;
-    }
-
-    public HttpTransferRequest setResourceId(String resourceId) {
-        this.resourceId = resourceId;
-        return this;
-    }
-
-    public String getChildResourcePath() {
-        return childResourcePath;
-    }
-
-    public HttpTransferRequest setChildResourcePath(String childResourcePath) {
-        this.childResourcePath = childResourcePath;
-        return this;
-    }
-
-    public String getCredentialToken() {
-        return credentialToken;
-    }
-
-    public HttpTransferRequest setCredentialToken(String credentialToken) {
-        this.credentialToken = credentialToken;
-        return this;
-    }
-
-    public ConnectorParams getConnectorParams() {
-        return connectorParams;
-    }
-
-    public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
-        this.connectorParams = connectorParams;
-        return this;
-    }
-
-    public long getCreatedTime() {
-        return createdTime;
-    }
-
-    public HttpTransferRequest setCreatedTime(long createdTime) {
-        this.createdTime = createdTime;
-        return this;
-    }
-
-    public AuthToken getAuthToken() {
-        return authToken;
-    }
-
-    public HttpTransferRequest setAuthToken(AuthToken authToken) {
-        this.authToken = authToken;
-        return this;
-    }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
index dd2f17e..56c468b 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
@@ -31,8 +31,8 @@ public class HttpTransferRequestsStore {
 
     private static final Logger logger = LoggerFactory.getLogger(HttpTransferRequestsStore.class);
 
-    final private Map<String, HttpTransferRequest> downloadRequestStore = new ConcurrentHashMap<>();
-    final private Map<String, HttpTransferRequest> uploadRequestStore = new ConcurrentHashMap<>();
+    final private Map<String, AgentHttpDownloadData> downloadRequestStore = new ConcurrentHashMap<>();
+    final private Map<String, AgentHttpDownloadData> uploadRequestStore = new ConcurrentHashMap<>();
 
     final private ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
     private long entryExpiryTimeMS = 300 * 1000;
@@ -56,32 +56,32 @@ public class HttpTransferRequestsStore {
         }, 2, 10, TimeUnit.SECONDS);
     }
 
-    public String addDownloadRequest(HttpTransferRequest request) {
+    public String addDownloadRequest(AgentHttpDownloadData request) {
         String randomUrl = UUID.randomUUID().toString();
         downloadRequestStore.put(randomUrl, request);
         return randomUrl;
     }
 
-    public HttpTransferRequest getDownloadRequest(String url) {
+    public AgentHttpDownloadData getDownloadRequest(String url) {
 
         //TODO  Need to block concurrent calls to same url as connectors are not thread safe
-        HttpTransferRequest request = downloadRequestStore.get(url);
+        AgentHttpDownloadData request = downloadRequestStore.get(url);
         if (request != null) {
             downloadRequestStore.remove(url);
         }
         return request;
     }
 
-    public String addUploadRequest(HttpTransferRequest request) {
+    public String addUploadRequest(AgentHttpDownloadData request) {
         String randomUrl = UUID.randomUUID().toString();
         uploadRequestStore.put(randomUrl, request);
         return randomUrl;
     }
 
-    public HttpTransferRequest getUploadRequest(String url) {
+    public AgentHttpDownloadData getUploadRequest(String url) {
 
         //TODO  Need to block concurrent calls to same url as connectors are not thread safe
-        HttpTransferRequest request = uploadRequestStore.get(url);
+        AgentHttpDownloadData request = uploadRequestStore.get(url);
         if (request != null) {
             uploadRequestStore.remove(url);
         }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index f5c47e4..8e1e155 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -21,8 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.util.JsonFormat;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
-import org.apache.airavata.mft.agent.http.ConnectorParams;
-import org.apache.airavata.mft.agent.http.HttpTransferRequest;
+import org.apache.airavata.mft.agent.http.AgentHttpDownloadData;
 import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.ConnectorResolver;
@@ -30,6 +29,8 @@ import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
 import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingConnector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,22 +155,33 @@ public class RPCParser {
                 mftAuthorizationToken = tokenBuilder.build();
 
                 metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
-                Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
+                Optional<IncomingConnector> connectorOp = ConnectorResolver.resolveIncomingConnector(storeType);
 
                 if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
-                    HttpTransferRequest transferRequest = new HttpTransferRequest()
-                            .setConnectorParams(new ConnectorParams()
-                                .setResourceServiceHost(resourceServiceHost)
-                                .setResourceServicePort(resourceServicePort)
-                                .setSecretServiceHost(secretServiceHost)
-                                .setSecretServicePort(secretServicePort))
-                            .setResourceId(resourceId)
-                            .setChildResourcePath(childResourcePath)
-                            .setCredentialToken(sourceToken)
-                            .setOtherMetadataCollector(metadataCollectorOp.get())
-                            .setOtherConnector(connectorOp.get())
-                            .setAuthToken(mftAuthorizationToken);
-                    String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
+
+                    MetadataCollector metadataCollector = metadataCollectorOp.get();
+                    metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+                    FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(
+                            mftAuthorizationToken,
+                            resourceId,
+                            childResourcePath,
+                            sourceToken);
+
+                    AgentHttpDownloadData downloadData = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
+                            .withChildResourcePath(childResourcePath)
+                            .withIncomingConnector(connectorOp.get())
+                            .withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                                    .withResourceServiceHost(resourceServiceHost)
+                                    .withResourceServicePort(resourceServicePort)
+                                    .withSecretServiceHost(secretServiceHost)
+                                    .withSecretServicePort(secretServicePort)
+                                    .withResourceId(resourceId)
+                                    .withCredentialToken(sourceToken)
+                                    .withAuthToken(mftAuthorizationToken)
+                                    .withMetadata(fileResourceMetadata).build()).build();
+
+                    String url = httpTransferRequestsStore.addDownloadRequest(downloadData);
 
                     return (agentAdvertisedUrl.endsWith("/")? agentAdvertisedUrl : agentAdvertisedUrl + "/") + url;
                 } else {
diff --git a/agent/src/main/resources/application.properties b/agent/src/main/resources/application.properties
index d5949da..4d8d87c 100644
--- a/agent/src/main/resources/application.properties
+++ b/agent/src/main/resources/application.properties
@@ -31,4 +31,5 @@ consul.port=8500
 resource.service.host=localhost
 resource.service.port=7002
 secret.service.host=localhost
-secret.service.port=7003
\ No newline at end of file
+secret.service.port=7003
+agent.advertised.url=http://localhost:3333
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index 469d19e..e6b5ef1 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -17,101 +17,42 @@
 
 package org.apache.airavata.mft.core;
 
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.OutgoingConnector;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Optional;
 
 public final class ConnectorResolver {
-    public static Optional<Connector> resolveConnector(String type, String direction) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
+    public static Optional<IncomingConnector> resolveIncomingConnector(String type) throws Exception {
 
         String className = null;
         switch (type) {
             case "SCP":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.scp.SCPReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.scp.SCPSender";
-                        break;
-                }
-                break;
-            case "LOCAL":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.local.LocalReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.local.LocalSender";
-                        break;
-                }
-                break;
-            case "S3":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.s3.S3Receiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.s3.S3Sender";
-                        break;
-                }
-                break;
-            case "BOX":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.box.BoxReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.box.BoxSender";
-                        break;
-                }
-                break;
-            case "AZURE":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.azure.AzureReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.azure.AzureSender";
-                        break;
-                }
+                className = "org.apache.airavata.mft.transport.scp.SCPIncomingConnector";
                 break;
-            case "GCS":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.gcp.GCSReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.gcp.GCSSender";
-                        break;
-                }
-                break;
-            case "DROPBOX":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.dropbox.DropboxReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.dropbox.DropboxSender";
-                        break;
-                }
-                break;
-            case "FTP":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.ftp.FTPReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.ftp.FTPSender";
-                        break;
-                }
+        }
+
+        if (className != null) {
+            Class<?> aClass = Class.forName(className);
+            return Optional.of((IncomingConnector) aClass.getDeclaredConstructor().newInstance());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public static Optional<OutgoingConnector> resolveOutgoingConnector(String type) throws Exception {
+
+        String className = null;
+        switch (type) {
+            case "SCP":
+                className = "org.apache.airavata.mft.transport.scp.SCPOutgoingConnector";
                 break;
         }
 
         if (className != null) {
             Class<?> aClass = Class.forName(className);
-            return Optional.of((Connector) aClass.getDeclaredConstructor().newInstance());
+            return Optional.of((OutgoingConnector) aClass.getDeclaredConstructor().newInstance());
         } else {
             return Optional.empty();
         }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
index d61d743..7033d02 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
@@ -91,7 +91,7 @@ public class FileResourceMetadata {
         private Builder() {
         }
 
-        public static Builder getBuilder() {
+        public static Builder newBuilder() {
             return new Builder();
         }
 
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
deleted file mode 100644
index 42a0e08..0000000
--- a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.core;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-
-import java.util.concurrent.Callable;
-
-public class TransferTask implements Callable<Integer> {
-
-    private Connector connector;
-    private ConnectorContext context;
-    private String resourceId;
-    private String childResourcePath;
-    private String credentialToken;
-    private AuthToken authToken;
-
-    public TransferTask(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                        ConnectorContext context, Connector connector) {
-        this.connector = connector;
-        this.context = context;
-        this.resourceId = resourceId;
-        this.authToken = authToken;
-        this.credentialToken = credentialToken;
-        this.childResourcePath = childResourcePath;
-    }
-
-    @Override
-    public Integer call() throws Exception {
-        if (childResourcePath == null || "".equals(childResourcePath)) {
-            this.connector.startStream(authToken, resourceId, credentialToken, context);
-        } else {
-            this.connector.startStream(authToken, resourceId, childResourcePath, credentialToken, context);
-        }
-        return 0;
-    }
-}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
new file mode 100644
index 0000000..bc754e8
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -0,0 +1,167 @@
+package org.apache.airavata.mft.core.api;
+
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+
+public class ConnectorConfig {
+    private String resourceServiceHost;
+    private int resourceServicePort;
+    private String secretServiceHost;
+    private int secretServicePort;
+    private String resourceId;
+    private String credentialToken;
+    private AuthToken authToken;
+    private String transferId;
+    private FileResourceMetadata metadata;
+
+    public String getResourceServiceHost() {
+        return resourceServiceHost;
+    }
+
+    public void setResourceServiceHost(String resourceServiceHost) {
+        this.resourceServiceHost = resourceServiceHost;
+    }
+
+    public int getResourceServicePort() {
+        return resourceServicePort;
+    }
+
+    public void setResourceServicePort(int resourceServicePort) {
+        this.resourceServicePort = resourceServicePort;
+    }
+
+    public String getSecretServiceHost() {
+        return secretServiceHost;
+    }
+
+    public void setSecretServiceHost(String secretServiceHost) {
+        this.secretServiceHost = secretServiceHost;
+    }
+
+    public int getSecretServicePort() {
+        return secretServicePort;
+    }
+
+    public void setSecretServicePort(int secretServicePort) {
+        this.secretServicePort = secretServicePort;
+    }
+
+    public String getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(String resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public String getCredentialToken() {
+        return credentialToken;
+    }
+
+    public void setCredentialToken(String credentialToken) {
+        this.credentialToken = credentialToken;
+    }
+
+    public AuthToken getAuthToken() {
+        return authToken;
+    }
+
+    public void setAuthToken(AuthToken authToken) {
+        this.authToken = authToken;
+    }
+
+    public String getTransferId() {
+        return transferId;
+    }
+
+    public void setTransferId(String transferId) {
+        this.transferId = transferId;
+    }
+
+    public FileResourceMetadata getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata(FileResourceMetadata metadata) {
+        this.metadata = metadata;
+    }
+
+
+    public static final class ConnectorConfigBuilder {
+        private String resourceServiceHost;
+        private int resourceServicePort;
+        private String secretServiceHost;
+        private int secretServicePort;
+        private String resourceId;
+        private String credentialToken;
+        private AuthToken authToken;
+        private String transferId;
+        private FileResourceMetadata metadata;
+
+        private ConnectorConfigBuilder() {
+        }
+
+        public static ConnectorConfigBuilder newBuilder() {
+            return new ConnectorConfigBuilder();
+        }
+
+        public ConnectorConfigBuilder withResourceServiceHost(String resourceServiceHost) {
+            this.resourceServiceHost = resourceServiceHost;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withResourceServicePort(int resourceServicePort) {
+            this.resourceServicePort = resourceServicePort;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withSecretServiceHost(String secretServiceHost) {
+            this.secretServiceHost = secretServiceHost;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withSecretServicePort(int secretServicePort) {
+            this.secretServicePort = secretServicePort;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withResourceId(String resourceId) {
+            this.resourceId = resourceId;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withCredentialToken(String credentialToken) {
+            this.credentialToken = credentialToken;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withAuthToken(AuthToken authToken) {
+            this.authToken = authToken;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withTransferId(String transferId) {
+            this.transferId = transferId;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withMetadata(FileResourceMetadata metadata) {
+            this.metadata = metadata;
+            return this;
+        }
+
+        public ConnectorConfig build() {
+            ConnectorConfig connectorConfig = new ConnectorConfig();
+            connectorConfig.setResourceServiceHost(resourceServiceHost);
+            connectorConfig.setResourceServicePort(resourceServicePort);
+            connectorConfig.setSecretServiceHost(secretServiceHost);
+            connectorConfig.setSecretServicePort(secretServicePort);
+            connectorConfig.setResourceId(resourceId);
+            connectorConfig.setCredentialToken(credentialToken);
+            connectorConfig.setAuthToken(authToken);
+            connectorConfig.setTransferId(transferId);
+            connectorConfig.setMetadata(metadata);
+            return connectorConfig;
+        }
+    }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
new file mode 100644
index 0000000..b805bf8
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.InputStream;
+
+public interface IncomingConnector {
+    public void init(ConnectorConfig connectorConfig) throws Exception;
+    public InputStream fetchInputStream() throws Exception;
+    public InputStream fetchInputStream(String childPath) throws Exception;
+    public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
new file mode 100644
index 0000000..9ab495c
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.OutputStream;
+
+public interface OutgoingConnector {
+    public void init(ConnectorConfig connectorConfig) throws Exception;
+    public OutputStream fetchOutputStream() throws Exception;
+    public OutputStream fetchOutputStream(String childPath) throws Exception;
+    public void complete() throws Exception;
+}
diff --git a/scripts/build.sh b/scripts/build.sh
index 5bb68ab..2d5e4af 100755
--- a/scripts/build.sh
+++ b/scripts/build.sh
@@ -19,6 +19,7 @@
 
 cd ../
 mvn clean install
+rm -rf build
 mkdir -p build
 cp agent/target/MFT-Agent-0.01-bin.zip build/
 cp controller/target/MFT-Controller-0.01-bin.zip build/
@@ -30,4 +31,4 @@ unzip -o build/MFT-Agent-0.01-bin.zip -d build/
 unzip -o build/MFT-Controller-0.01-bin.zip -d build/
 unzip -o build/Resource-Service-0.01-bin.zip -d build/
 unzip -o build/Secret-Service-0.01-bin.zip -d build/
-unzip -o build/API-Service-0.01-bin.zip -d build/
\ No newline at end of file
+unzip -o build/API-Service-0.01-bin.zip -d build/
diff --git a/services/resource-service/server/src/main/resources/applicationContext.xml b/services/resource-service/server/src/main/resources/applicationContext.xml
index 2fbd7eb..ea845a3 100644
--- a/services/resource-service/server/src/main/resources/applicationContext.xml
+++ b/services/resource-service/server/src/main/resources/applicationContext.xml
@@ -6,7 +6,7 @@
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">
 
-    <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.datalake.DatalakeResourceBackend"
+    <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.file.FileBasedResourceBackend"
           init-method="init" destroy-method="destroy"></bean>
 
 </beans>
\ No newline at end of file
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
new file mode 100644
index 0000000..806f7c2
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LimitInputStream extends FilterInputStream implements Channel {
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private long remaining;
+
+    public LimitInputStream(InputStream in, long length) {
+        super(in);
+        this.remaining = length;
+    }
+
+    public boolean isOpen() {
+        return this.open.get();
+    }
+
+    public int read() throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("read() - stream is closed (remaining=" + this.remaining + ")");
+        } else if (this.remaining > 0L) {
+            --this.remaining;
+            return super.read();
+        } else {
+            return -1;
+        }
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("read(len=" + len + ") stream is closed (remaining=" + this.remaining + ")");
+        } else {
+            int nb = len;
+            if ((long)len > this.remaining) {
+                nb = (int)this.remaining;
+            }
+
+            if (nb > 0) {
+                int read = super.read(b, off, nb);
+                this.remaining -= (long)read;
+                return read;
+            } else {
+                return -1;
+            }
+        }
+    }
+
+    public long skip(long n) throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("skip(" + n + ") stream is closed (remaining=" + this.remaining + ")");
+        } else {
+            long skipped = super.skip(n);
+            this.remaining -= skipped;
+            return skipped;
+        }
+    }
+
+    public int available() throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("available() stream is closed (remaining=" + this.remaining + ")");
+        } else {
+            int av = super.available();
+            return (long)av > this.remaining ? (int)this.remaining : av;
+        }
+    }
+
+    public void close() throws IOException {
+        if (!this.open.getAndSet(false)) {
+            ;
+        }
+    }
+}
+
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
new file mode 100644
index 0000000..7856a3a
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPIncomingConnector implements IncomingConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
+
+    private Session session;
+    private GenericResource resource;
+    private Channel channel;
+    private OutputStream out;
+    private InputStream in;
+    private final byte[] buf = new byte[1024];
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        SCPSecret scpSecret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+        }
+
+        SCPStorage scpStorage = resource.getScpStorage();
+        logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+        this.session = SCPTransportUtil.createSession(
+                scpSecret.getUser(),
+                scpStorage.getHost(),
+                scpStorage.getPort(),
+                scpSecret.getPrivateKey().getBytes(),
+                scpSecret.getPublicKey().getBytes(),
+                scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+        if (session == null) {
+            logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+            throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+        }
+    }
+
+    @Override
+    public InputStream fetchInputStream() throws Exception {
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                resourcePath = resource.getFile().getResourcePath();
+                break;
+            case DIRECTORY:
+                throw new Exception("A directory path can not be streamed");
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchInputStreamJCraft(resourcePath);
+    }
+
+    @Override
+    public InputStream fetchInputStream(String childPath) throws Exception {
+
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                throw new Exception("A child path can not be associated with a file parent");
+            case DIRECTORY:
+                resourcePath = resource.getDirectory().getResourcePath();
+                if (!childPath.startsWith(resourcePath)) {
+                    throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+                }
+                resourcePath = childPath;
+                break;
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchInputStreamJCraft(resourcePath);
+    }
+
+    private InputStream fetchInputStreamJCraft(String resourcePath) throws Exception{
+        String command = "scp -f " + resourcePath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+
+        // get I/O streams for remote scp
+        out = channel.getOutputStream();
+        in = channel.getInputStream();
+
+        channel.connect();
+
+        // send '\0'
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+
+        while (true) {
+            int c = checkAck(in);
+            if (c != 'C') {
+                break;
+            }
+
+            // read '0644 '
+            in.read(buf, 0, 5);
+
+            long filesize = 0L;
+            while (true) {
+                if (in.read(buf, 0, 1) < 0) {
+                    // error
+                    break;
+                }
+                if (buf[0] == ' ') break;
+                filesize = filesize * 10L + (long) (buf[0] - '0');
+            }
+
+            String file = null;
+            for (int i = 0; ; i++) {
+                in.read(buf, i, 1);
+                if (buf[i] == (byte) 0x0a) {
+                    file = new String(buf, 0, i);
+                    break;
+                }
+            }
+
+            logger.info("file-size=" + filesize + ", file=" + file);
+            // send '\0'
+            buf[0] = 0;
+            out.write(buf, 0, 1);
+            out.flush();
+
+            // read a content of lfile
+            return new LimitInputStream(in, filesize);
+        }
+        return null;
+    }
+
+    @Override
+    public void complete() throws Exception {
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+
+        // send '\0'
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+
+        channel.disconnect();
+        session.disconnect();
+    }
+
+    private int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        // b may be 0 for success,
+        //          1 for error,
+        //          2 for fatal error,
+        //         -1
+        if (b == 0) return b;
+        if (b == -1) return b;
+
+        if (b == 1 || b == 2) {
+            StringBuffer sb = new StringBuffer();
+            int c;
+            do {
+                c = in.read();
+                sb.append((char) c);
+            }
+            while (c != '\n');
+            if (b == 1) { // error
+                logger.error("Check Ack Failure b = 1 " + sb.toString());
+            }
+            if (b == 2) { // fatal error
+                logger.error("Check Ack Failure b = 2 " + sb.toString());
+            }
+        }
+        return b;
+    }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index a6c11c0..882f8d2 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -212,7 +212,7 @@ public class SCPMetadataCollector implements MetadataCollector {
                     }
 
                     if (rri.isRegularFile()) {
-                        FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.getBuilder()
+                        FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.newBuilder()
                                         .withFriendlyName(rri.getName())
                                         .withResourcePath(rri.getPath())
                                         .withCreatedTime(rri.getAttributes().getAtime())
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
new file mode 100644
index 0000000..4df4649
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPOutgoingConnector implements OutgoingConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);
+
+    private GenericResource resource;
+    private Session session;
+    private OutputStream out;
+    private InputStream in;
+    private Channel channel;
+    private ConnectorConfig cc;
+    private final byte[] buf = new byte[1024];
+
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+
+        this.cc = cc;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        SCPSecret scpSecret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+        }
+
+        SCPStorage scpStorage = resource.getScpStorage();
+        logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+        this.session = SCPTransportUtil.createSession(
+                scpSecret.getUser(),
+                scpStorage.getHost(),
+                scpStorage.getPort(),
+                scpSecret.getPrivateKey().getBytes(),
+                scpSecret.getPublicKey().getBytes(),
+                scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+        if (session == null) {
+            logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+            throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+        }
+    }
+
+    @Override
+    public OutputStream fetchOutputStream() throws Exception {
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                resourcePath = resource.getFile().getResourcePath();
+                break;
+            case DIRECTORY:
+                throw new Exception("A directory path can not be streamed");
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchOutputStreamJCraft(resourcePath, cc.getMetadata().getResourceSize());
+    }
+
+    @Override
+    public OutputStream fetchOutputStream(String childPath) throws Exception {
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                throw new Exception("A child path can not be associated with a file parent");
+            case DIRECTORY:
+                resourcePath = resource.getDirectory().getResourcePath();
+                if (!childPath.startsWith(resourcePath)) {
+                    throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+                }
+                resourcePath = childPath;
+                break;
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchOutputStreamJCraft(resourcePath, cc.getMetadata().getResourceSize());
+    }
+
+    public OutputStream fetchOutputStreamJCraft(String resourcePath, long fileSize) throws Exception {
+        boolean ptimestamp = true;
+
+        // exec 'scp -t rfile' remotely
+        String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + resourcePath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+
+        // get I/O streams for remote scp
+        out = channel.getOutputStream();
+        in = channel.getInputStream();
+
+        channel.connect();
+
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+
+        if (ptimestamp) {
+            command = "T" + (System.currentTimeMillis() / 1000) + " 0";
+            // The access time should be sent here,
+            // but it is not accessible with JavaAPI ;-<
+            command += (" " + (System.currentTimeMillis() / 1000) + " 0\n");
+            out.write(command.getBytes());
+            out.flush();
+            if (checkAck(in) != 0) {
+                throw new IOException("Error code found in ack " + (checkAck(in)));
+            }
+        }
+
+        // send "C0644 filesize filename", where filename should not include '/'
+        command = "C0644 " + fileSize + " ";
+        if (resourcePath.lastIndexOf('/') > 0) {
+            command += resourcePath.substring(resourcePath.lastIndexOf('/') + 1);
+        } else {
+            command += resourcePath;
+        }
+
+        command += "\n";
+        out.write(command.getBytes());
+        out.flush();
+
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+
+        return out;
+    }
+
+    @Override
+    public void complete() throws Exception {
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+        out.close();
+        channel.disconnect();
+        session.disconnect();
+    }
+
+    public int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        // b may be 0 for success,
+        //          1 for error,
+        //          2 for fatal error,
+        //         -1
+        if (b == 0) return b;
+        if (b == -1) return b;
+
+        if (b == 1 || b == 2) {
+            StringBuffer sb = new StringBuffer();
+            int c;
+            do {
+                c = in.read();
+                sb.append((char) c);
+            }
+            while (c != '\n');
+            if (b == 1) { // error
+                logger.error("Check Ack Failure b = 1 " + sb.toString());
+            }
+            if (b == 2) { // fatal error
+                logger.error("Check Ack Failure b = 2 " + sb.toString());
+            }
+        }
+        return b;
+    }
+}