You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/09/16 04:44:35 UTC

[airavata] branch master updated: Enabling streaming in file transfers to_avoid using temporary storage (Copied from staging)

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new f0809fb  Enabling streaming in file transfers to_avoid using temporary storage (Copied from staging)
f0809fb is described below

commit f0809fb3348fd5d2433c2e53f31e02edc32c4dac
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Sep 16 00:44:26 2019 -0400

    Enabling streaming in file transfers to_avoid using temporary storage (Copied from staging)
---
 .../participant/airavata-server.properties.j2      |  12 +-
 .../apache/airavata/agents/api/AgentAdaptor.java   |  14 +-
 ...orageResourceAdaptor.java => FileMetadata.java} |  34 ++-
 .../agents/api/StorageResourceAdaptor.java         |  20 +-
 .../airavata/agents/streaming/TransferResult.java  |  64 ++++++
 .../agents/streaming/VirtualInputStream.java       |  82 ++++++++
 .../agents/streaming/VirtualOutputStream.java      |  54 +++++
 .../agents/streaming/VirtualStreamProducer.java    |  55 +++++
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  |  21 +-
 .../agent/storage/StorageResourceAdaptorImpl.java  |   8 +-
 .../airavata/helix/adaptor/SSHJAgentAdaptor.java   | 138 ++++++++++++-
 .../airavata/helix/adaptor/SSHJStorageAdaptor.java |   8 +-
 .../helix/impl/task/staging/ArchiveTask.java       |  12 +-
 .../helix/impl/task/staging/DataStagingTask.java   | 229 +++++++++++++++++----
 .../impl/task/staging/InputDataStagingTask.java    |  55 +----
 .../impl/task/submission/JobSubmissionTask.java    |   6 +-
 .../apache/airavata/common/utils/Constants.java    |   2 +
 .../airavata/common/utils/ServerSettings.java      |   4 +
 18 files changed, 686 insertions(+), 132 deletions(-)

diff --git a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
index 1e5d46d..e5ab3c3 100644
--- a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
@@ -72,4 +72,14 @@ zookeeper.server.connection={{ zookeeper_connection_url }}
 zookeeper.timeout=30000
 
 
-local.data.location={{ local_data_location }}
\ No newline at end of file
+local.data.location={{ local_data_location }}
+
+###########################################################################
+# Data Parsing Task Level Configurations
+###########################################################################
+data.parser.delete.container=True
+
+###########################################################################
+# Data Staging Task Level Configurations
+###########################################################################
+enable.streaming.transfer=True
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
index 5355d5c..107b9ff 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -20,6 +20,10 @@
 package org.apache.airavata.agents.api;
 
 import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.List;
 
 /**
@@ -40,13 +44,19 @@ public interface AgentAdaptor {
 
     public void createDirectory(String path, boolean recursive) throws AgentException;
 
-    public void copyFileTo(String localFile, String remoteFile) throws AgentException;
+    public void uploadFile(String localFile, String remoteFile) throws AgentException;
 
-    public void copyFileFrom(String remoteFile, String localFile) throws AgentException;
+    public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException;
+
+    public void downloadFile(String remoteFile, String localFile) throws AgentException;
+
+    public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException;
 
     public List<String> listDirectory(String path) throws AgentException;
 
     public Boolean doesFileExist(String filePath) throws AgentException;
 
     public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException;
+
+    public FileMetadata getFileMetadata(String remoteFile) throws AgentException;
 }
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/FileMetadata.java
similarity index 61%
copy from modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
copy to modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/FileMetadata.java
index 6db2f1b..e3c2b51 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/FileMetadata.java
@@ -19,9 +19,33 @@
  */
 package org.apache.airavata.agents.api;
 
-public interface StorageResourceAdaptor {
-    public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException;
-    public void uploadFile(String sourceFile, String destFile) throws AgentException;
-    public void downloadFile(String sourceFile, String destFile) throws AgentException;
-    public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException;
+public class FileMetadata {
+
+    private String name;
+    private long size;
+    private int permissions = 420;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+    public int getPermissions() {
+        return permissions;
+    }
+
+    public void setPermissions(int permissions) {
+        this.permissions = permissions;
+    }
 }
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
index 6db2f1b..c59fb19 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
@@ -19,9 +19,21 @@
  */
 package org.apache.airavata.agents.api;
 
-public interface StorageResourceAdaptor {
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+public interface StorageResourceAdaptor extends AgentAdaptor {
     public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException;
-    public void uploadFile(String sourceFile, String destFile) throws AgentException;
-    public void downloadFile(String sourceFile, String destFile) throws AgentException;
+    public void uploadFile(String localFile, String remoteFile) throws AgentException;
+    public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException;
+    public void downloadFile(String remoteFile, String localFile) throws AgentException;
+    public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException;
     public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException;
-}
+    public void createDirectory(String path) throws AgentException;
+    public void createDirectory(String path, boolean recursive) throws AgentException;
+    public List<String> listDirectory(String path) throws AgentException;
+    public Boolean doesFileExist(String filePath) throws AgentException;
+    public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException;
+    public FileMetadata getFileMetadata(String remoteFile) throws AgentException;
+}
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/TransferResult.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/TransferResult.java
new file mode 100644
index 0000000..d317142
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/TransferResult.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+public class TransferResult {
+
+    public static enum TransferStatus {
+        COMPLETED, FAILED, PAUSED
+    }
+
+    private String transferId;
+    private TransferStatus transferStatus;
+    private String message;
+    private Throwable error;
+
+    public String getTransferId() {
+        return transferId;
+    }
+
+    public void setTransferId(String transferId) {
+        this.transferId = transferId;
+    }
+
+    public TransferStatus getTransferStatus() {
+        return transferStatus;
+    }
+
+    public void setTransferStatus(TransferStatus transferStatus) {
+        this.transferStatus = transferStatus;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public Throwable getError() {
+        return error;
+    }
+
+    public void setError(Throwable error) {
+        this.error = error;
+    }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualInputStream.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualInputStream.java
new file mode 100644
index 0000000..3d49f23
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualInputStream.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class VirtualInputStream extends InputStream {
+
+    private BlockingQueue<Integer> queue;
+    private long byteCount;
+    private long streamLength;
+
+    public VirtualInputStream(BlockingQueue<Integer> queue, long streamLength) {
+        this.queue = queue;
+        this.streamLength = streamLength;
+    }
+
+    public int read(byte b[], int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        if (byteCount == streamLength) {
+            return -1;
+        }
+
+        int c = read();
+
+        b[off] = (byte)c;
+
+        int i = 1;
+        try {
+            for (; i < len ; i++) {
+                if (byteCount == streamLength) {
+                    break;
+                }
+                c = read();
+                b[off + i] = (byte)c;
+            }
+        } catch (IOException ee) {
+        }
+        return i;
+    }
+
+    @Override
+    public int read() throws IOException {
+        try {
+            Integer cont =  queue.poll(10, TimeUnit.SECONDS);
+            if (cont == null) {
+                throw new IOException("Timed out reading from the queue");
+            }
+            byteCount++;
+            return cont;
+        } catch (InterruptedException e) {
+            throw new IOException("Read was interrupted", e);
+        }
+    }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualOutputStream.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualOutputStream.java
new file mode 100644
index 0000000..b5d639d
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualOutputStream.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class VirtualOutputStream extends OutputStream {
+
+    private BlockingQueue<Integer> queue;
+    private long byteCount;
+    private long streamLength;
+
+    public VirtualOutputStream(BlockingQueue<Integer> queue, long streamLength) {
+        this.queue = queue;
+        this.streamLength = streamLength;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        try {
+            if (byteCount == streamLength) {
+                throw new IOException("Can not write more than the stream length " + streamLength);
+            }
+            boolean status = this.queue.offer(b, 10, TimeUnit.SECONDS);
+            if (!status) {
+                throw new IOException("Timed out writing into the queue");
+            }
+            byteCount++;
+            //System.out.println("Writing byte " + byteCount);
+        } catch (InterruptedException e) {
+            throw new IOException("Write was interrupted", e);
+        }
+    }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualStreamProducer.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualStreamProducer.java
new file mode 100644
index 0000000..c8b71ef
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualStreamProducer.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class VirtualStreamProducer {
+
+    private InputStream inputStream;
+    private OutputStream outputStream;
+
+    public VirtualStreamProducer(int bufferSize, long streamLength) {
+        ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
+        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(bufferSize);
+        inputStream = new VirtualInputStream(queue, streamLength);
+        outputStream = new VirtualOutputStream(queue, streamLength);
+    }
+
+    public InputStream getInputStream() {
+        return inputStream;
+    }
+
+    public void setInputStream(InputStream inputStream) {
+        this.inputStream = inputStream;
+    }
+
+    public OutputStream getOutputStream() {
+        return outputStream;
+    }
+
+    public void setOutputStream(OutputStream outputStream) {
+        this.outputStream = outputStream;
+    }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index e4cffe9..b7a6bad 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
@@ -176,7 +178,7 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
-    public void copyFileTo(String localFile, String remoteFile) throws AgentException {
+    public void uploadFile(String localFile, String remoteFile) throws AgentException {
 
         FileInputStream fis;
         boolean ptimestamp = true;
@@ -282,8 +284,13 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
     }
 
+    @Override
+    public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
+        throw new AgentException("Operation not implemented");
+    }
+
     // TODO file not found does not return exception
-    public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
+    public void downloadFile(String remoteFile, String localFile) throws AgentException {
         FileOutputStream fos = null;
         ChannelExec channelExec = null;
         try {
@@ -414,6 +421,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
     }
 
     @Override
+    public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
+        throw new AgentException("Operation not implemented");
+    }
+
+    @Override
     public List<String> listDirectory(String path) throws AgentException {
         String command = "ls " + path;
         ChannelExec channelExec = null;
@@ -515,6 +527,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
         throw new AgentException("Operation not implemented");
     }
 
+    @Override
+    public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
+        throw new AgentException("Operation not implemented");
+    }
+
     private static class DefaultUserInfo implements UserInfo, UIKeyboardInteractive {
 
         private String userName;
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
index c36e34b..229f174 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
@@ -66,13 +66,13 @@ public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements Stora
     }
 
     @Override
-    public void uploadFile(String sourceFile, String destFile) throws AgentException {
-        super.copyFileTo(sourceFile, destFile);
+    public void uploadFile(String localFile, String remoteFile) throws AgentException {
+        super.uploadFile(localFile, remoteFile);
     }
 
     @Override
-    public void downloadFile(String sourceFile, String destFile) throws AgentException {
-        super.copyFileFrom(sourceFile, destFile);
+    public void downloadFile(String remoteFile, String localFile) throws AgentException {
+        super.downloadFile(remoteFile, localFile);
     }
 
     @Override
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index edbd16e..05ce146 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -19,6 +19,7 @@
  */
 package org.apache.airavata.helix.adaptor;
 
+import com.google.common.collect.Lists;
 import net.schmizz.keepalive.KeepAliveProvider;
 import net.schmizz.sshj.DefaultConfig;
 import net.schmizz.sshj.connection.channel.direct.Session;
@@ -31,10 +32,11 @@ import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
 import net.schmizz.sshj.userauth.password.PasswordFinder;
 import net.schmizz.sshj.userauth.password.PasswordUtils;
 import net.schmizz.sshj.userauth.password.Resource;
-import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.agents.api.AgentException;
-import org.apache.airavata.agents.api.AgentUtils;
-import org.apache.airavata.agents.api.CommandOutput;
+import net.schmizz.sshj.xfer.FilePermission;
+import net.schmizz.sshj.xfer.LocalDestFile;
+import net.schmizz.sshj.xfer.LocalFileFilter;
+import net.schmizz.sshj.xfer.LocalSourceFile;
+import org.apache.airavata.agents.api.*;
 import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
 import org.apache.airavata.helix.agent.ssh.StandardOutReader;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -45,11 +47,8 @@ import org.apache.airavata.model.credential.store.SSHCredential;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
+import java.io.*;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class SSHJAgentAdaptor implements AgentAdaptor {
@@ -203,7 +202,7 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
     }
 
     @Override
-    public void copyFileTo(String localFile, String remoteFile) throws AgentException {
+    public void uploadFile(String localFile, String remoteFile) throws AgentException {
         try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
             fileTransfer.upload(localFile, remoteFile);
         } catch (Exception e) {
@@ -212,7 +211,66 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
     }
 
     @Override
-    public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
+    public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
+        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+            fileTransfer.upload(new LocalSourceFile() {
+                @Override
+                public String getName() {
+                    return metadata.getName();
+                }
+
+                @Override
+                public long getLength() {
+                    return metadata.getSize();
+                }
+
+                @Override
+                public InputStream getInputStream() throws IOException {
+                    return localInStream;
+                }
+
+                @Override
+                public int getPermissions() throws IOException {
+                    return 420; //metadata.getPermissions();
+                }
+
+                @Override
+                public boolean isFile() {
+                    return true;
+                }
+
+                @Override
+                public boolean isDirectory() {
+                    return false;
+                }
+
+                @Override
+                public Iterable<? extends LocalSourceFile> getChildren(LocalFileFilter filter) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public boolean providesAtimeMtime() {
+                    return false;
+                }
+
+                @Override
+                public long getLastAccessTime() throws IOException {
+                    return 0;
+                }
+
+                @Override
+                public long getLastModifiedTime() throws IOException {
+                    return 0;
+                }
+            }, remoteFile);
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
+    public void downloadFile(String remoteFile, String localFile) throws AgentException {
         try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
             fileTransfer.download(remoteFile, localFile);
         } catch (Exception e) {
@@ -221,6 +279,50 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
     }
 
     @Override
+    public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
+        try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+            fileTransfer.download(remoteFile, new LocalDestFile() {
+                @Override
+                public OutputStream getOutputStream() throws IOException {
+                    return localOutStream;
+                }
+
+                @Override
+                public LocalDestFile getChild(String name) {
+                    return null;
+                }
+
+                @Override
+                public LocalDestFile getTargetFile(String filename) throws IOException {
+                    return this;
+                }
+
+                @Override
+                public LocalDestFile getTargetDirectory(String dirname) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public void setPermissions(int perms) throws IOException {
+
+                }
+
+                @Override
+                public void setLastAccessedTime(long t) throws IOException {
+
+                }
+
+                @Override
+                public void setLastModifiedTime(long t) throws IOException {
+
+                }
+            });
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
+    @Override
     public List<String> listDirectory(String path) throws AgentException {
         try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
             List<RemoteResourceInfo> ls = sftpClient.ls(path);
@@ -250,6 +352,20 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
         }
     }
 
+    @Override
+    public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
+        try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+            FileAttributes stat = sftpClient.stat(remoteFile);
+            FileMetadata metadata = new FileMetadata();
+            metadata.setName(new File(remoteFile).getName());
+            metadata.setSize(stat.getSize());
+            metadata.setPermissions(FilePermission.toMask(stat.getPermissions()));
+            return metadata;
+        } catch (Exception e) {
+            throw new AgentException(e);
+        }
+    }
+
     private boolean isMatch(String s, String p) {
         int i = 0;
         int j = 0;
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
index 0cc7ff9..c1656a0 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
@@ -80,13 +80,13 @@ public class SSHJStorageAdaptor extends SSHJAgentAdaptor implements StorageResou
     }
 
     @Override
-    public void uploadFile(String sourceFile, String destFile) throws AgentException {
-        super.copyFileTo(sourceFile, destFile);
+    public void uploadFile(String localFile, String remoteFile) throws AgentException {
+        super.uploadFile(localFile, remoteFile);
     }
 
     @Override
-    public void downloadFile(String sourceFile, String destFile) throws AgentException {
-        super.copyFileFrom(sourceFile, destFile);
+    public void downloadFile(String remoteFile, String localFile) throws AgentException {
+        super.downloadFile(remoteFile, localFile);
     }
 
     @Override
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
index 4f1129f..6be6fce 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
@@ -87,7 +87,7 @@ public class ArchiveTask extends DataStagingTask {
                 CommandOutput tarCommandOutput = adaptor.executeCommand(tarringCommand, null);
                 if (tarCommandOutput.getExitCode() != 0) {
                     throw new TaskOnFailException("Failed while running the tar command " + tarringCommand + ". Sout : " +
-                            tarCommandOutput.getStdOut() + ". Serr " + tarCommandOutput.getStdError(), true, null);
+                            tarCommandOutput.getStdOut() + ". Serr " + tarCommandOutput.getStdError(), false, null);
                 }
 
             } catch (AgentException e) {
@@ -98,7 +98,7 @@ public class ArchiveTask extends DataStagingTask {
 
             if (!fileTransferred) {
                 logger.error("Failed to transfer created archive file " + tarCreationAbsPath);
-                throw new TaskOnFailException("Failed to transfer created archive file " + tarCreationAbsPath, true, null);
+                throw new TaskOnFailException("Failed to transfer created archive file " + tarCreationAbsPath, false, null);
             }
 
             String deleteTarCommand = "rm " + tarCreationAbsPath;
@@ -108,11 +108,11 @@ public class ArchiveTask extends DataStagingTask {
                 CommandOutput rmCommandOutput = adaptor.executeCommand(deleteTarCommand, null);
                 if (rmCommandOutput.getExitCode() != 0) {
                     throw new TaskOnFailException("Failed while running the rm command " + deleteTarCommand + ". Sout : " +
-                            rmCommandOutput.getStdOut() + ". Serr " + rmCommandOutput.getStdError(), true, null);
+                            rmCommandOutput.getStdOut() + ". Serr " + rmCommandOutput.getStdError(), false, null);
                 }
 
             } catch (AgentException e) {
-                throw new TaskOnFailException("Failed while running the rm command " + tarringCommand, true, null);
+                throw new TaskOnFailException("Failed while running the rm command " + tarringCommand, false, null);
             }
 
             String destParent = destFilePath.substring(0, destFilePath.lastIndexOf("/"));
@@ -125,10 +125,10 @@ public class ArchiveTask extends DataStagingTask {
                 CommandOutput unTarCommandOutput = storageResourceAdaptor.executeCommand(unArchiveTarCommand, destParent);
                 if (unTarCommandOutput.getExitCode() != 0) {
                     throw new TaskOnFailException("Failed while running the untar command " + deleteTarCommand + ". Sout : " +
-                            unTarCommandOutput.getStdOut() + ". Serr " + unTarCommandOutput.getStdError(), true, null);
+                            unTarCommandOutput.getStdOut() + ". Serr " + unTarCommandOutput.getStdError(), false, null);
                 }
             } catch (AgentException e) {
-                throw new TaskOnFailException("Failed while running the untar command " + tarringCommand, true, null);
+                throw new TaskOnFailException("Failed while running the untar command " + tarringCommand, false, null);
             }
 
             return onSuccess("Archival task successfully completed");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index b4a0ea8..a0f51e2 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -21,7 +21,10 @@ package org.apache.airavata.helix.impl.task.staging;
 
 import org.apache.airavata.agents.api.AgentAdaptor;
 import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.FileMetadata;
 import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.agents.streaming.TransferResult;
+import org.apache.airavata.agents.streaming.VirtualStreamProducer;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.impl.task.AiravataTask;
 import org.apache.airavata.helix.impl.task.TaskOnFailException;
@@ -35,12 +38,21 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
 
 @SuppressWarnings("WeakerAccess")
 public abstract class DataStagingTask extends AiravataTask {
 
     private final static Logger logger = LoggerFactory.getLogger(DataStagingTask.class);
 
+    private final static ExecutorService PASS_THROUGH_EXECUTOR =
+            new ThreadPoolExecutor(10, 60, 0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<>());
+
     @SuppressWarnings("WeakerAccess")
     protected DataStagingTaskModel getDataStagingTaskModel() throws TaskOnFailException {
         try {
@@ -48,10 +60,10 @@ public abstract class DataStagingTask extends AiravataTask {
             if (subTaskModel != null) {
                 return DataStagingTaskModel.class.cast(subTaskModel);
             } else {
-                throw new TaskOnFailException("Data staging task model can not be null for task " + getTaskId(), true, null);
+                throw new TaskOnFailException("Data staging task model can not be null for task " + getTaskId(), false, null);
             }
         } catch (Exception e) {
-            throw new TaskOnFailException("Failed while obtaining data staging task model for task " + getTaskId(), true, e);
+            throw new TaskOnFailException("Failed while obtaining data staging task model for task " + getTaskId(), false, e);
         }
     }
 
@@ -59,7 +71,7 @@ public abstract class DataStagingTask extends AiravataTask {
     protected StorageResourceDescription getStorageResource() throws TaskOnFailException {
         StorageResourceDescription storageResource = getTaskContext().getStorageResource();
         if (storageResource == null) {
-            throw new TaskOnFailException("Storage resource can not be null for task " + getTaskId(), true, null);
+            throw new TaskOnFailException("Storage resource can not be null for task " + getTaskId(), false, null);
         }
         return storageResource;
     }
@@ -80,7 +92,7 @@ public abstract class DataStagingTask extends AiravataTask {
             return storageResourceAdaptor;
         } catch (AgentException e) {
             throw new TaskOnFailException("Failed to obtain adaptor for storage resource " + getTaskContext().getStorageResourceId() +
-                    " in task " + getTaskId(), true, e);
+                    " in task " + getTaskId(), false, e);
         }
     }
 
@@ -95,7 +107,7 @@ public abstract class DataStagingTask extends AiravataTask {
                     getTaskContext().getComputeResourceLoginUserName());
         } catch (Exception e) {
             throw new TaskOnFailException("Failed to obtain adaptor for compute resource " + getTaskContext().getComputeResourceId() +
-                    " in task " + getTaskId(), true, e);
+                    " in task " + getTaskId(), false, e);
         }
     }
 
@@ -134,6 +146,173 @@ public abstract class DataStagingTask extends AiravataTask {
         return filePath;
     }
 
+    public void naiveTransfer(AgentAdaptor srcAdaptor, String sourceFile, AgentAdaptor destAdaptor, String destFile,
+                              String tempFile) throws TaskOnFailException {
+        logger.info("Using naive transfer to transfer " + sourceFile + " to " + destFile);
+        try {
+            try {
+                logger.info("Downloading file " + sourceFile + " to loacl temp file " + tempFile);
+                srcAdaptor.downloadFile(sourceFile, tempFile);
+            } catch (AgentException e) {
+                throw new TaskOnFailException("Failed downloading file " + sourceFile + " to the local path " +
+                        tempFile, false, e);
+            }
+
+            File localFile = new File(tempFile);
+            if (!localFile.exists()) {
+                throw new TaskOnFailException("Local file does not exist at " + tempFile, false, null);
+            }
+
+            try {
+                logger.info("Uploading file form local temp file " + tempFile + " to " + destFile);
+                destAdaptor.uploadFile(tempFile, destFile);
+            } catch (AgentException e) {
+                throw new TaskOnFailException("Failed uploading file to " + destFile + " from local path " +
+                        tempFile, false, e);
+            }
+        } finally {
+            logger.info("Deleting temporary file " + tempFile);
+            deleteTempFile(tempFile);
+        }
+    }
+
+    public static void passThroughTransfer(AgentAdaptor srcAdaptor, String sourceFile, AgentAdaptor destAdaptor,
+                                           String destFile) throws TaskOnFailException {
+        logger.info("Using pass through transfer to transfer " + sourceFile + " to " + destFile);
+
+        FileMetadata tempMetadata;
+        try {
+            tempMetadata = srcAdaptor.getFileMetadata(sourceFile);
+        } catch (AgentException e) {
+            throw new TaskOnFailException("Failed to obtain metadata for file " + sourceFile, false, e );
+        }
+
+        final FileMetadata fileMetadata = tempMetadata;
+
+        VirtualStreamProducer streamProducer = new VirtualStreamProducer(1024, fileMetadata.getSize());
+
+        OutputStream os = streamProducer.getOutputStream();
+        InputStream is = streamProducer.getInputStream();
+
+        Callable<TransferResult> inCallable = () -> {
+            TransferResult result = new TransferResult();
+            result.setTransferId("In");
+
+            try {
+                logger.info("Executing in-bound transfer for file " + sourceFile);
+                srcAdaptor.downloadFile(sourceFile, os, fileMetadata);
+                logger.info("Completed in-bound transfer for file " + sourceFile);
+                result.setTransferStatus(TransferResult.TransferStatus.COMPLETED);
+                result.setMessage("Successfully completed the transfer");
+
+            } catch (Exception e) {
+                result.setMessage("In-bound transfer failed for file " + sourceFile + ". Reason : " + e.getMessage());
+                result.setTransferStatus(TransferResult.TransferStatus.FAILED);
+                result.setError(e);
+            }
+            return result;
+        };
+
+        Callable<TransferResult> outCallable = () -> {
+            TransferResult result = new TransferResult();
+            result.setTransferId("Out");
+
+            try {
+                logger.info("Executing out-bound transfer for file " + destFile);
+                destAdaptor.uploadFile(is, fileMetadata, destFile);
+                logger.info("Completed out-bound transfer for file " + destFile);
+                result.setTransferStatus(TransferResult.TransferStatus.COMPLETED);
+                result.setMessage("Successfully completed the transfer");
+
+            } catch (Exception e) {
+                result.setMessage("Out-bound transfer failed for file " + destFile + ". Reason : " + e.getMessage());
+                result.setTransferStatus(TransferResult.TransferStatus.FAILED);
+                result.setError(e);
+            }
+
+            return result;
+        };
+
+        CompletionService<TransferResult> completionService = new ExecutorCompletionService<TransferResult>(PASS_THROUGH_EXECUTOR);
+
+        Map<String, Future<TransferResult>> unResolvedFutures = new HashMap<>();
+
+        unResolvedFutures.put("In", completionService.submit(inCallable));
+        unResolvedFutures.put("Out", completionService.submit(outCallable));
+
+        int completed = 0;
+        int failed = 0;
+        TransferResult failedResult = null;
+
+        try {
+            while (completed < 2 && failed == 0) {
+                try {
+                    Future<TransferResult> res = completionService.take();
+                    if (res.get().getTransferStatus() == TransferResult.TransferStatus.COMPLETED) {
+                        completed++;
+                        logger.debug("Transfer " + res.get().getTransferId() + " completed");
+                    } else {
+                        failed++;
+                        failedResult = res.get();
+                        logger.warn("Transfer " + res.get().getTransferId() + " failed", failedResult.getError());
+                    }
+                    unResolvedFutures.remove(res.get().getTransferId());
+
+                } catch (Exception e) {
+                    logger.error("Error occurred while monitoring transfers", e);
+                    throw new TaskOnFailException("Error occurred while monitoring transfers", false, e);
+                }
+            }
+
+            if (failed > 0) {
+                logger.error("Transfer from " + sourceFile + " to " + destFile + " failed. " + failedResult.getMessage(),
+                        failedResult.getError());
+                throw new TaskOnFailException("Pass through file transfer failed from " + sourceFile + " to " +
+                        destFile, false, failedResult.getError());
+            } else {
+                logger.info("Transfer from " + sourceFile + " to " + destFile + " completed");
+            }
+
+        } finally {
+            // Cleaning up unresolved transfers
+            if (unResolvedFutures.size() > 0) {
+                unResolvedFutures.forEach((id, future) -> {
+                    try {
+                        logger.warn("Cancelling transfer " + id);
+                        future.cancel(true);
+                    } catch (Exception e) {
+                        // Ignore
+                        logger.warn(e.getMessage());
+                    }
+                });
+            }
+        }
+    }
+
+    protected void transferFileToComputeResource(String sourcePath, String destPath, AgentAdaptor computeAdaptor,
+                                                 StorageResourceAdaptor storageAdaptor) throws TaskOnFailException {
+
+        try {
+            FileMetadata fileMetadata = storageAdaptor.getFileMetadata(sourcePath);
+            if (fileMetadata.getSize() == 0) {
+                logger.error("File " + sourcePath +" size is 0 so ignoring the upload");
+                throw new TaskOnFailException("Input staging has failed as file " + sourcePath + " size is 0", false, null);
+            }
+        } catch (AgentException e) {
+            logger.error("Failed to fetch metadata for file " + sourcePath, e);
+            throw new TaskOnFailException("Failed to fetch metadata for file " + sourcePath, false, e);
+        }
+
+        if  (ServerSettings.isSteamingEnabled()) {
+            passThroughTransfer(storageAdaptor, sourcePath, computeAdaptor, destPath);
+        } else {
+            String sourceFileName = sourcePath.substring(sourcePath.lastIndexOf(File.separator) + 1, sourcePath.length());
+            String tempPath = getLocalDataPath(sourceFileName);
+            naiveTransfer(storageAdaptor, sourcePath, computeAdaptor, destPath, tempPath);
+        }
+
+    }
+
     protected boolean transferFileToStorage(String sourcePath, String destPath, String fileName, AgentAdaptor adaptor,
                               StorageResourceAdaptor storageResourceAdaptor) throws TaskOnFailException {
 
@@ -164,42 +343,14 @@ public abstract class DataStagingTask extends AiravataTask {
             throw new TaskOnFailException("Error while checking the file " + sourcePath + " existence", false, e);
         }
 
-        String localSourceFilePath = getLocalDataPath(fileName);
 
-        try {
-            try {
-                logger.info("Downloading output file " + sourcePath + " to the local path " + localSourceFilePath);
-                adaptor.copyFileFrom(sourcePath, localSourceFilePath);
-                logger.info("Output file downloaded to " + localSourceFilePath);
-            } catch (AgentException e) {
-                throw new TaskOnFailException("Failed downloading output file " + sourcePath + " to the local path " +
-                        localSourceFilePath, false, e);
-            }
-
-            File localFile = new File(localSourceFilePath);
-            if (localFile.exists()) {
-                /*if (localFile.length() == 0) {
-                    logger.warn("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
-                    return false;
-                }*/
-            } else {
-                throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
-            }
-            // Uploading output file to the storage resource
-            try {
-                logger.info("Uploading the output file to " + destPath + " from local path " + localSourceFilePath);
-                storageResourceAdaptor.uploadFile(localSourceFilePath, destPath);
-                logger.info("Output file uploaded to " + destPath);
-            } catch (AgentException e) {
-                throw new TaskOnFailException("Failed uploading the output file to " + destPath + " from local path " +
-                        localSourceFilePath, false, e);
-            }
-
-            return true;
-        } finally {
-            logger.info("Deleting temporary file " + localSourceFilePath);
-            deleteTempFile(localSourceFilePath);
+        if  (ServerSettings.isSteamingEnabled()) {
+            passThroughTransfer(adaptor, sourcePath, storageResourceAdaptor, destPath);
+        } else {
+            String tempPath = getLocalDataPath(fileName);
+            naiveTransfer(adaptor, sourcePath, storageResourceAdaptor, destPath, tempPath);
         }
+        return true;
     }
 
     protected void deleteTempFile(String filePath) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index 60120aa..6ba0c24 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -79,12 +79,17 @@ public class InputDataStagingTask extends DataStagingTask {
                     sourceUrls = new String[]{dataStagingTaskModel.getSource()};
                 }
 
+                // Fetch and validate storage adaptor
+                StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
+                // Fetch and validate compute resource adaptor
+                AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
                 for (String url : sourceUrls) {
                     URI sourceURI = new URI(url);
                     URI destinationURI = new URI(dataStagingTaskModel.getDestination());
 
                     logger.info("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
-                    copySingleFile(sourceURI, destinationURI, taskHelper);
+                    transferFileToComputeResource(sourceURI.getPath(), destinationURI.getPath(), adaptor, storageResourceAdaptor);
                 }
 
             } catch (URISyntaxException e) {
@@ -107,54 +112,6 @@ public class InputDataStagingTask extends DataStagingTask {
         }
     }
 
-    private void copySingleFile(URI sourceURI, URI destinationURI, TaskHelper taskHelper) throws TaskOnFailException {
-
-        String sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
-                    sourceURI.getPath().length());
-
-        // Fetch and validate storage adaptor
-        StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
-
-        // Fetch and validate compute resource adaptor
-        AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
-
-        String localSourceFilePath = getLocalDataPath(sourceFileName);
-        // Downloading input file from the storage resource
-
-        try {
-            try {
-                logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath);
-                storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
-                logger.info("Input file downloaded to " + localSourceFilePath);
-            } catch (AgentException e) {
-                throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
-            }
-
-            File localFile = new File(localSourceFilePath);
-            if (localFile.exists()) {
-                if (localFile.length() == 0) {
-                    logger.error("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
-                    throw new TaskOnFailException("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
-                }
-            } else {
-                throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
-            }
-
-            // Uploading input file to the compute resource
-            try {
-                logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath);
-                adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
-                logger.info("Input file uploaded to " + destinationURI.getPath());
-            } catch (AgentException e) {
-                throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
-            }
-
-        } finally {
-            logger.info("Deleting temporary file " + localSourceFilePath);
-            deleteTempFile(localSourceFilePath);
-        }
-    }
-
     @Override
     public void onCancel(TaskContext taskContext) {
 
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 5f9bda2..0934a87 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -31,12 +31,8 @@ import org.apache.airavata.helix.impl.task.submission.config.GroovyMapData;
 import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
 import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
 import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
-import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
 import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.status.JobStatus;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
@@ -77,7 +73,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
 
         logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory +
                 " of compute resource " + getTaskContext().getComputeResourceId());
-        agentAdaptor.copyFileTo(tempJobFile.getAbsolutePath(), workingDirectory);
+        agentAdaptor.uploadFile(tempJobFile.getAbsolutePath(), workingDirectory);
 
         RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath());
 
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 8820d45..63c99ba 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -69,4 +69,6 @@ public final class Constants {
     public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
 
     public static final String NEWLINE = System.getProperty("line.separator");
+
+    public static final String ENABLE_STREAMING_TRANSFER = "enable.streaming.transfer";
 }
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index e8ff43f..e740cf5 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -502,4 +502,8 @@ public class ServerSettings extends ApplicationSettings {
     public static String getSharingRegistryHost() {
         return getSetting(SHARING_REGISTRY_HOST, "localhost");
     }
+
+    public static Boolean isSteamingEnabled() {
+        return Boolean.valueOf(getSetting(Constants.ENABLE_STREAMING_TRANSFER, "True"));
+    }
 }