You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2019/09/16 04:44:35 UTC
[airavata] branch master updated: Enabling streaming in file
transfers to_avoid using temporary storage (Copied from staging)
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new f0809fb Enabling streaming in file transfers to_avoid using temporary storage (Copied from staging)
f0809fb is described below
commit f0809fb3348fd5d2433c2e53f31e02edc32c4dac
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Mon Sep 16 00:44:26 2019 -0400
Enabling streaming in file transfers to_avoid using temporary storage (Copied from staging)
---
.../participant/airavata-server.properties.j2 | 12 +-
.../apache/airavata/agents/api/AgentAdaptor.java | 14 +-
...orageResourceAdaptor.java => FileMetadata.java} | 34 ++-
.../agents/api/StorageResourceAdaptor.java | 20 +-
.../airavata/agents/streaming/TransferResult.java | 64 ++++++
.../agents/streaming/VirtualInputStream.java | 82 ++++++++
.../agents/streaming/VirtualOutputStream.java | 54 +++++
.../agents/streaming/VirtualStreamProducer.java | 55 +++++
.../airavata/helix/agent/ssh/SshAgentAdaptor.java | 21 +-
.../agent/storage/StorageResourceAdaptorImpl.java | 8 +-
.../airavata/helix/adaptor/SSHJAgentAdaptor.java | 138 ++++++++++++-
.../airavata/helix/adaptor/SSHJStorageAdaptor.java | 8 +-
.../helix/impl/task/staging/ArchiveTask.java | 12 +-
.../helix/impl/task/staging/DataStagingTask.java | 229 +++++++++++++++++----
.../impl/task/staging/InputDataStagingTask.java | 55 +----
.../impl/task/submission/JobSubmissionTask.java | 6 +-
.../apache/airavata/common/utils/Constants.java | 2 +
.../airavata/common/utils/ServerSettings.java | 4 +
18 files changed, 686 insertions(+), 132 deletions(-)
diff --git a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2 b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
index 1e5d46d..e5ab3c3 100644
--- a/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
+++ b/dev-tools/ansible/roles/helix_setup/templates/participant/airavata-server.properties.j2
@@ -72,4 +72,14 @@ zookeeper.server.connection={{ zookeeper_connection_url }}
zookeeper.timeout=30000
-local.data.location={{ local_data_location }}
\ No newline at end of file
+local.data.location={{ local_data_location }}
+
+###########################################################################
+# Data Parsing Task Level Configurations
+###########################################################################
+data.parser.delete.container=True
+
+###########################################################################
+# Data Staging Task Level Configurations
+###########################################################################
+enable.streaming.transfer=True
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
index 5355d5c..107b9ff 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentAdaptor.java
@@ -20,6 +20,10 @@
package org.apache.airavata.agents.api;
import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.util.List;
/**
@@ -40,13 +44,19 @@ public interface AgentAdaptor {
public void createDirectory(String path, boolean recursive) throws AgentException;
- public void copyFileTo(String localFile, String remoteFile) throws AgentException;
+ public void uploadFile(String localFile, String remoteFile) throws AgentException;
- public void copyFileFrom(String remoteFile, String localFile) throws AgentException;
+ public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException;
+
+ public void downloadFile(String remoteFile, String localFile) throws AgentException;
+
+ public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException;
public List<String> listDirectory(String path) throws AgentException;
public Boolean doesFileExist(String filePath) throws AgentException;
public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException;
+
+ public FileMetadata getFileMetadata(String remoteFile) throws AgentException;
}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/FileMetadata.java
similarity index 61%
copy from modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
copy to modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/FileMetadata.java
index 6db2f1b..e3c2b51 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/FileMetadata.java
@@ -19,9 +19,33 @@
*/
package org.apache.airavata.agents.api;
-public interface StorageResourceAdaptor {
- public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException;
- public void uploadFile(String sourceFile, String destFile) throws AgentException;
- public void downloadFile(String sourceFile, String destFile) throws AgentException;
- public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException;
+public class FileMetadata {
+
+ private String name;
+ private long size;
+ private int permissions = 420;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public int getPermissions() {
+ return permissions;
+ }
+
+ public void setPermissions(int permissions) {
+ this.permissions = permissions;
+ }
}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
index 6db2f1b..c59fb19 100644
--- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/StorageResourceAdaptor.java
@@ -19,9 +19,21 @@
*/
package org.apache.airavata.agents.api;
-public interface StorageResourceAdaptor {
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+public interface StorageResourceAdaptor extends AgentAdaptor {
public void init(String storageResourceId, String gatewayId, String loginUser, String token) throws AgentException;
- public void uploadFile(String sourceFile, String destFile) throws AgentException;
- public void downloadFile(String sourceFile, String destFile) throws AgentException;
+ public void uploadFile(String localFile, String remoteFile) throws AgentException;
+ public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException;
+ public void downloadFile(String remoteFile, String localFile) throws AgentException;
+ public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException;
public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException;
-}
+ public void createDirectory(String path) throws AgentException;
+ public void createDirectory(String path, boolean recursive) throws AgentException;
+ public List<String> listDirectory(String path) throws AgentException;
+ public Boolean doesFileExist(String filePath) throws AgentException;
+ public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException;
+ public FileMetadata getFileMetadata(String remoteFile) throws AgentException;
+}
\ No newline at end of file
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/TransferResult.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/TransferResult.java
new file mode 100644
index 0000000..d317142
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/TransferResult.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+public class TransferResult {
+
+ public static enum TransferStatus {
+ COMPLETED, FAILED, PAUSED
+ }
+
+ private String transferId;
+ private TransferStatus transferStatus;
+ private String message;
+ private Throwable error;
+
+ public String getTransferId() {
+ return transferId;
+ }
+
+ public void setTransferId(String transferId) {
+ this.transferId = transferId;
+ }
+
+ public TransferStatus getTransferStatus() {
+ return transferStatus;
+ }
+
+ public void setTransferStatus(TransferStatus transferStatus) {
+ this.transferStatus = transferStatus;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public void setError(Throwable error) {
+ this.error = error;
+ }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualInputStream.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualInputStream.java
new file mode 100644
index 0000000..3d49f23
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualInputStream.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class VirtualInputStream extends InputStream {
+
+ private BlockingQueue<Integer> queue;
+ private long byteCount;
+ private long streamLength;
+
+ public VirtualInputStream(BlockingQueue<Integer> queue, long streamLength) {
+ this.queue = queue;
+ this.streamLength = streamLength;
+ }
+
+ public int read(byte b[], int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (byteCount == streamLength) {
+ return -1;
+ }
+
+ int c = read();
+
+ b[off] = (byte)c;
+
+ int i = 1;
+ try {
+ for (; i < len ; i++) {
+ if (byteCount == streamLength) {
+ break;
+ }
+ c = read();
+ b[off + i] = (byte)c;
+ }
+ } catch (IOException ee) {
+ }
+ return i;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ Integer cont = queue.poll(10, TimeUnit.SECONDS);
+ if (cont == null) {
+ throw new IOException("Timed out reading from the queue");
+ }
+ byteCount++;
+ return cont;
+ } catch (InterruptedException e) {
+ throw new IOException("Read was interrupted", e);
+ }
+ }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualOutputStream.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualOutputStream.java
new file mode 100644
index 0000000..b5d639d
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualOutputStream.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class VirtualOutputStream extends OutputStream {
+
+ private BlockingQueue<Integer> queue;
+ private long byteCount;
+ private long streamLength;
+
+ public VirtualOutputStream(BlockingQueue<Integer> queue, long streamLength) {
+ this.queue = queue;
+ this.streamLength = streamLength;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ if (byteCount == streamLength) {
+ throw new IOException("Can not write more than the stream length " + streamLength);
+ }
+ boolean status = this.queue.offer(b, 10, TimeUnit.SECONDS);
+ if (!status) {
+ throw new IOException("Timed out writing into the queue");
+ }
+ byteCount++;
+ //System.out.println("Writing byte " + byteCount);
+ } catch (InterruptedException e) {
+ throw new IOException("Write was interrupted", e);
+ }
+ }
+}
diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualStreamProducer.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualStreamProducer.java
new file mode 100644
index 0000000..c8b71ef
--- /dev/null
+++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/streaming/VirtualStreamProducer.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.agents.streaming;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class VirtualStreamProducer {
+
+ private InputStream inputStream;
+ private OutputStream outputStream;
+
+ public VirtualStreamProducer(int bufferSize, long streamLength) {
+ ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
+ BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(bufferSize);
+ inputStream = new VirtualInputStream(queue, streamLength);
+ outputStream = new VirtualOutputStream(queue, streamLength);
+ }
+
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+ public void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ public void setOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index e4cffe9..b7a6bad 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@@ -176,7 +178,7 @@ public class SshAgentAdaptor implements AgentAdaptor {
}
}
- public void copyFileTo(String localFile, String remoteFile) throws AgentException {
+ public void uploadFile(String localFile, String remoteFile) throws AgentException {
FileInputStream fis;
boolean ptimestamp = true;
@@ -282,8 +284,13 @@ public class SshAgentAdaptor implements AgentAdaptor {
}
}
+ @Override
+ public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
+ throw new AgentException("Operation not implemented");
+ }
+
// TODO file not found does not return exception
- public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
+ public void downloadFile(String remoteFile, String localFile) throws AgentException {
FileOutputStream fos = null;
ChannelExec channelExec = null;
try {
@@ -414,6 +421,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
}
@Override
+ public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
+ throw new AgentException("Operation not implemented");
+ }
+
+ @Override
public List<String> listDirectory(String path) throws AgentException {
String command = "ls " + path;
ChannelExec channelExec = null;
@@ -515,6 +527,11 @@ public class SshAgentAdaptor implements AgentAdaptor {
throw new AgentException("Operation not implemented");
}
+ @Override
+ public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
+ throw new AgentException("Operation not implemented");
+ }
+
private static class DefaultUserInfo implements UserInfo, UIKeyboardInteractive {
private String userName;
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
index c36e34b..229f174 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/storage/StorageResourceAdaptorImpl.java
@@ -66,13 +66,13 @@ public class StorageResourceAdaptorImpl extends SshAgentAdaptor implements Stora
}
@Override
- public void uploadFile(String sourceFile, String destFile) throws AgentException {
- super.copyFileTo(sourceFile, destFile);
+ public void uploadFile(String localFile, String remoteFile) throws AgentException {
+ super.uploadFile(localFile, remoteFile);
}
@Override
- public void downloadFile(String sourceFile, String destFile) throws AgentException {
- super.copyFileFrom(sourceFile, destFile);
+ public void downloadFile(String remoteFile, String localFile) throws AgentException {
+ super.downloadFile(remoteFile, localFile);
}
@Override
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
index edbd16e..05ce146 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJAgentAdaptor.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.helix.adaptor;
+import com.google.common.collect.Lists;
import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.connection.channel.direct.Session;
@@ -31,10 +32,11 @@ import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
import net.schmizz.sshj.userauth.password.PasswordFinder;
import net.schmizz.sshj.userauth.password.PasswordUtils;
import net.schmizz.sshj.userauth.password.Resource;
-import org.apache.airavata.agents.api.AgentAdaptor;
-import org.apache.airavata.agents.api.AgentException;
-import org.apache.airavata.agents.api.AgentUtils;
-import org.apache.airavata.agents.api.CommandOutput;
+import net.schmizz.sshj.xfer.FilePermission;
+import net.schmizz.sshj.xfer.LocalDestFile;
+import net.schmizz.sshj.xfer.LocalFileFilter;
+import net.schmizz.sshj.xfer.LocalSourceFile;
+import org.apache.airavata.agents.api.*;
import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
import org.apache.airavata.helix.agent.ssh.StandardOutReader;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -45,11 +47,8 @@ import org.apache.airavata.model.credential.store.SSHCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
+import java.io.*;
+import java.util.*;
import java.util.stream.Collectors;
public class SSHJAgentAdaptor implements AgentAdaptor {
@@ -203,7 +202,7 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
@Override
- public void copyFileTo(String localFile, String remoteFile) throws AgentException {
+ public void uploadFile(String localFile, String remoteFile) throws AgentException {
try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
fileTransfer.upload(localFile, remoteFile);
} catch (Exception e) {
@@ -212,7 +211,66 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
@Override
- public void copyFileFrom(String remoteFile, String localFile) throws AgentException {
+ public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
+ try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ fileTransfer.upload(new LocalSourceFile() {
+ @Override
+ public String getName() {
+ return metadata.getName();
+ }
+
+ @Override
+ public long getLength() {
+ return metadata.getSize();
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return localInStream;
+ }
+
+ @Override
+ public int getPermissions() throws IOException {
+ return 420; //metadata.getPermissions();
+ }
+
+ @Override
+ public boolean isFile() {
+ return true;
+ }
+
+ @Override
+ public boolean isDirectory() {
+ return false;
+ }
+
+ @Override
+ public Iterable<? extends LocalSourceFile> getChildren(LocalFileFilter filter) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean providesAtimeMtime() {
+ return false;
+ }
+
+ @Override
+ public long getLastAccessTime() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getLastModifiedTime() throws IOException {
+ return 0;
+ }
+ }, remoteFile);
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
+ public void downloadFile(String remoteFile, String localFile) throws AgentException {
try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
fileTransfer.download(remoteFile, localFile);
} catch (Exception e) {
@@ -221,6 +279,50 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
@Override
+ public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
+ try(SCPFileTransferWrapper fileTransfer = sshjClient.newSCPFileTransferWrapper()) {
+ fileTransfer.download(remoteFile, new LocalDestFile() {
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return localOutStream;
+ }
+
+ @Override
+ public LocalDestFile getChild(String name) {
+ return null;
+ }
+
+ @Override
+ public LocalDestFile getTargetFile(String filename) throws IOException {
+ return this;
+ }
+
+ @Override
+ public LocalDestFile getTargetDirectory(String dirname) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void setPermissions(int perms) throws IOException {
+
+ }
+
+ @Override
+ public void setLastAccessedTime(long t) throws IOException {
+
+ }
+
+ @Override
+ public void setLastModifiedTime(long t) throws IOException {
+
+ }
+ });
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
+ @Override
public List<String> listDirectory(String path) throws AgentException {
try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
List<RemoteResourceInfo> ls = sftpClient.ls(path);
@@ -250,6 +352,20 @@ public class SSHJAgentAdaptor implements AgentAdaptor {
}
}
+ @Override
+ public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
+ try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
+ FileAttributes stat = sftpClient.stat(remoteFile);
+ FileMetadata metadata = new FileMetadata();
+ metadata.setName(new File(remoteFile).getName());
+ metadata.setSize(stat.getSize());
+ metadata.setPermissions(FilePermission.toMask(stat.getPermissions()));
+ return metadata;
+ } catch (Exception e) {
+ throw new AgentException(e);
+ }
+ }
+
private boolean isMatch(String s, String p) {
int i = 0;
int j = 0;
diff --git a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
index 0cc7ff9..c1656a0 100644
--- a/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
+++ b/modules/airavata-helix/agent-impl/sshj-agent/src/main/java/org/apache/airavata/helix/adaptor/SSHJStorageAdaptor.java
@@ -80,13 +80,13 @@ public class SSHJStorageAdaptor extends SSHJAgentAdaptor implements StorageResou
}
@Override
- public void uploadFile(String sourceFile, String destFile) throws AgentException {
- super.copyFileTo(sourceFile, destFile);
+ public void uploadFile(String localFile, String remoteFile) throws AgentException {
+ super.uploadFile(localFile, remoteFile);
}
@Override
- public void downloadFile(String sourceFile, String destFile) throws AgentException {
- super.copyFileFrom(sourceFile, destFile);
+ public void downloadFile(String remoteFile, String localFile) throws AgentException {
+ super.downloadFile(remoteFile, localFile);
}
@Override
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
index 4f1129f..6be6fce 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
@@ -87,7 +87,7 @@ public class ArchiveTask extends DataStagingTask {
CommandOutput tarCommandOutput = adaptor.executeCommand(tarringCommand, null);
if (tarCommandOutput.getExitCode() != 0) {
throw new TaskOnFailException("Failed while running the tar command " + tarringCommand + ". Sout : " +
- tarCommandOutput.getStdOut() + ". Serr " + tarCommandOutput.getStdError(), true, null);
+ tarCommandOutput.getStdOut() + ". Serr " + tarCommandOutput.getStdError(), false, null);
}
} catch (AgentException e) {
@@ -98,7 +98,7 @@ public class ArchiveTask extends DataStagingTask {
if (!fileTransferred) {
logger.error("Failed to transfer created archive file " + tarCreationAbsPath);
- throw new TaskOnFailException("Failed to transfer created archive file " + tarCreationAbsPath, true, null);
+ throw new TaskOnFailException("Failed to transfer created archive file " + tarCreationAbsPath, false, null);
}
String deleteTarCommand = "rm " + tarCreationAbsPath;
@@ -108,11 +108,11 @@ public class ArchiveTask extends DataStagingTask {
CommandOutput rmCommandOutput = adaptor.executeCommand(deleteTarCommand, null);
if (rmCommandOutput.getExitCode() != 0) {
throw new TaskOnFailException("Failed while running the rm command " + deleteTarCommand + ". Sout : " +
- rmCommandOutput.getStdOut() + ". Serr " + rmCommandOutput.getStdError(), true, null);
+ rmCommandOutput.getStdOut() + ". Serr " + rmCommandOutput.getStdError(), false, null);
}
} catch (AgentException e) {
- throw new TaskOnFailException("Failed while running the rm command " + tarringCommand, true, null);
+ throw new TaskOnFailException("Failed while running the rm command " + tarringCommand, false, null);
}
String destParent = destFilePath.substring(0, destFilePath.lastIndexOf("/"));
@@ -125,10 +125,10 @@ public class ArchiveTask extends DataStagingTask {
CommandOutput unTarCommandOutput = storageResourceAdaptor.executeCommand(unArchiveTarCommand, destParent);
if (unTarCommandOutput.getExitCode() != 0) {
throw new TaskOnFailException("Failed while running the untar command " + deleteTarCommand + ". Sout : " +
- unTarCommandOutput.getStdOut() + ". Serr " + unTarCommandOutput.getStdError(), true, null);
+ unTarCommandOutput.getStdOut() + ". Serr " + unTarCommandOutput.getStdError(), false, null);
}
} catch (AgentException e) {
- throw new TaskOnFailException("Failed while running the untar command " + tarringCommand, true, null);
+ throw new TaskOnFailException("Failed while running the untar command " + tarringCommand, false, null);
}
return onSuccess("Archival task successfully completed");
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index b4a0ea8..a0f51e2 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -21,7 +21,10 @@ package org.apache.airavata.helix.impl.task.staging;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.FileMetadata;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.agents.streaming.TransferResult;
+import org.apache.airavata.agents.streaming.VirtualStreamProducer;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.AiravataTask;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
@@ -35,12 +38,21 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
@SuppressWarnings("WeakerAccess")
public abstract class DataStagingTask extends AiravataTask {
private final static Logger logger = LoggerFactory.getLogger(DataStagingTask.class);
+ private final static ExecutorService PASS_THROUGH_EXECUTOR =
+ new ThreadPoolExecutor(10, 60, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>());
+
@SuppressWarnings("WeakerAccess")
protected DataStagingTaskModel getDataStagingTaskModel() throws TaskOnFailException {
try {
@@ -48,10 +60,10 @@ public abstract class DataStagingTask extends AiravataTask {
if (subTaskModel != null) {
return DataStagingTaskModel.class.cast(subTaskModel);
} else {
- throw new TaskOnFailException("Data staging task model can not be null for task " + getTaskId(), true, null);
+ throw new TaskOnFailException("Data staging task model can not be null for task " + getTaskId(), false, null);
}
} catch (Exception e) {
- throw new TaskOnFailException("Failed while obtaining data staging task model for task " + getTaskId(), true, e);
+ throw new TaskOnFailException("Failed while obtaining data staging task model for task " + getTaskId(), false, e);
}
}
@@ -59,7 +71,7 @@ public abstract class DataStagingTask extends AiravataTask {
protected StorageResourceDescription getStorageResource() throws TaskOnFailException {
StorageResourceDescription storageResource = getTaskContext().getStorageResource();
if (storageResource == null) {
- throw new TaskOnFailException("Storage resource can not be null for task " + getTaskId(), true, null);
+ throw new TaskOnFailException("Storage resource can not be null for task " + getTaskId(), false, null);
}
return storageResource;
}
@@ -80,7 +92,7 @@ public abstract class DataStagingTask extends AiravataTask {
return storageResourceAdaptor;
} catch (AgentException e) {
throw new TaskOnFailException("Failed to obtain adaptor for storage resource " + getTaskContext().getStorageResourceId() +
- " in task " + getTaskId(), true, e);
+ " in task " + getTaskId(), false, e);
}
}
@@ -95,7 +107,7 @@ public abstract class DataStagingTask extends AiravataTask {
getTaskContext().getComputeResourceLoginUserName());
} catch (Exception e) {
throw new TaskOnFailException("Failed to obtain adaptor for compute resource " + getTaskContext().getComputeResourceId() +
- " in task " + getTaskId(), true, e);
+ " in task " + getTaskId(), false, e);
}
}
@@ -134,6 +146,173 @@ public abstract class DataStagingTask extends AiravataTask {
return filePath;
}
+ public void naiveTransfer(AgentAdaptor srcAdaptor, String sourceFile, AgentAdaptor destAdaptor, String destFile,
+ String tempFile) throws TaskOnFailException {
+ logger.info("Using naive transfer to transfer " + sourceFile + " to " + destFile);
+ try {
+ try {
+ logger.info("Downloading file " + sourceFile + " to loacl temp file " + tempFile);
+ srcAdaptor.downloadFile(sourceFile, tempFile);
+ } catch (AgentException e) {
+ throw new TaskOnFailException("Failed downloading file " + sourceFile + " to the local path " +
+ tempFile, false, e);
+ }
+
+ File localFile = new File(tempFile);
+ if (!localFile.exists()) {
+ throw new TaskOnFailException("Local file does not exist at " + tempFile, false, null);
+ }
+
+ try {
+ logger.info("Uploading file form local temp file " + tempFile + " to " + destFile);
+ destAdaptor.uploadFile(tempFile, destFile);
+ } catch (AgentException e) {
+ throw new TaskOnFailException("Failed uploading file to " + destFile + " from local path " +
+ tempFile, false, e);
+ }
+ } finally {
+ logger.info("Deleting temporary file " + tempFile);
+ deleteTempFile(tempFile);
+ }
+ }
+
+ public static void passThroughTransfer(AgentAdaptor srcAdaptor, String sourceFile, AgentAdaptor destAdaptor,
+ String destFile) throws TaskOnFailException {
+ logger.info("Using pass through transfer to transfer " + sourceFile + " to " + destFile);
+
+ FileMetadata tempMetadata;
+ try {
+ tempMetadata = srcAdaptor.getFileMetadata(sourceFile);
+ } catch (AgentException e) {
+ throw new TaskOnFailException("Failed to obtain metadata for file " + sourceFile, false, e );
+ }
+
+ final FileMetadata fileMetadata = tempMetadata;
+
+ VirtualStreamProducer streamProducer = new VirtualStreamProducer(1024, fileMetadata.getSize());
+
+ OutputStream os = streamProducer.getOutputStream();
+ InputStream is = streamProducer.getInputStream();
+
+ Callable<TransferResult> inCallable = () -> {
+ TransferResult result = new TransferResult();
+ result.setTransferId("In");
+
+ try {
+ logger.info("Executing in-bound transfer for file " + sourceFile);
+ srcAdaptor.downloadFile(sourceFile, os, fileMetadata);
+ logger.info("Completed in-bound transfer for file " + sourceFile);
+ result.setTransferStatus(TransferResult.TransferStatus.COMPLETED);
+ result.setMessage("Successfully completed the transfer");
+
+ } catch (Exception e) {
+ result.setMessage("In-bound transfer failed for file " + sourceFile + ". Reason : " + e.getMessage());
+ result.setTransferStatus(TransferResult.TransferStatus.FAILED);
+ result.setError(e);
+ }
+ return result;
+ };
+
+ Callable<TransferResult> outCallable = () -> {
+ TransferResult result = new TransferResult();
+ result.setTransferId("Out");
+
+ try {
+ logger.info("Executing out-bound transfer for file " + destFile);
+ destAdaptor.uploadFile(is, fileMetadata, destFile);
+ logger.info("Completed out-bound transfer for file " + destFile);
+ result.setTransferStatus(TransferResult.TransferStatus.COMPLETED);
+ result.setMessage("Successfully completed the transfer");
+
+ } catch (Exception e) {
+ result.setMessage("Out-bound transfer failed for file " + destFile + ". Reason : " + e.getMessage());
+ result.setTransferStatus(TransferResult.TransferStatus.FAILED);
+ result.setError(e);
+ }
+
+ return result;
+ };
+
+ CompletionService<TransferResult> completionService = new ExecutorCompletionService<TransferResult>(PASS_THROUGH_EXECUTOR);
+
+ Map<String, Future<TransferResult>> unResolvedFutures = new HashMap<>();
+
+ unResolvedFutures.put("In", completionService.submit(inCallable));
+ unResolvedFutures.put("Out", completionService.submit(outCallable));
+
+ int completed = 0;
+ int failed = 0;
+ TransferResult failedResult = null;
+
+ try {
+ while (completed < 2 && failed == 0) {
+ try {
+ Future<TransferResult> res = completionService.take();
+ if (res.get().getTransferStatus() == TransferResult.TransferStatus.COMPLETED) {
+ completed++;
+ logger.debug("Transfer " + res.get().getTransferId() + " completed");
+ } else {
+ failed++;
+ failedResult = res.get();
+ logger.warn("Transfer " + res.get().getTransferId() + " failed", failedResult.getError());
+ }
+ unResolvedFutures.remove(res.get().getTransferId());
+
+ } catch (Exception e) {
+ logger.error("Error occurred while monitoring transfers", e);
+ throw new TaskOnFailException("Error occurred while monitoring transfers", false, e);
+ }
+ }
+
+ if (failed > 0) {
+ logger.error("Transfer from " + sourceFile + " to " + destFile + " failed. " + failedResult.getMessage(),
+ failedResult.getError());
+ throw new TaskOnFailException("Pass through file transfer failed from " + sourceFile + " to " +
+ destFile, false, failedResult.getError());
+ } else {
+ logger.info("Transfer from " + sourceFile + " to " + destFile + " completed");
+ }
+
+ } finally {
+ // Cleaning up unresolved transfers
+ if (unResolvedFutures.size() > 0) {
+ unResolvedFutures.forEach((id, future) -> {
+ try {
+ logger.warn("Cancelling transfer " + id);
+ future.cancel(true);
+ } catch (Exception e) {
+ // Ignore
+ logger.warn(e.getMessage());
+ }
+ });
+ }
+ }
+ }
+
+ protected void transferFileToComputeResource(String sourcePath, String destPath, AgentAdaptor computeAdaptor,
+ StorageResourceAdaptor storageAdaptor) throws TaskOnFailException {
+
+ try {
+ FileMetadata fileMetadata = storageAdaptor.getFileMetadata(sourcePath);
+ if (fileMetadata.getSize() == 0) {
+ logger.error("File " + sourcePath +" size is 0 so ignoring the upload");
+ throw new TaskOnFailException("Input staging has failed as file " + sourcePath + " size is 0", false, null);
+ }
+ } catch (AgentException e) {
+ logger.error("Failed to fetch metadata for file " + sourcePath, e);
+ throw new TaskOnFailException("Failed to fetch metadata for file " + sourcePath, false, e);
+ }
+
+ if (ServerSettings.isSteamingEnabled()) {
+ passThroughTransfer(storageAdaptor, sourcePath, computeAdaptor, destPath);
+ } else {
+ String sourceFileName = sourcePath.substring(sourcePath.lastIndexOf(File.separator) + 1, sourcePath.length());
+ String tempPath = getLocalDataPath(sourceFileName);
+ naiveTransfer(storageAdaptor, sourcePath, computeAdaptor, destPath, tempPath);
+ }
+
+ }
+
protected boolean transferFileToStorage(String sourcePath, String destPath, String fileName, AgentAdaptor adaptor,
StorageResourceAdaptor storageResourceAdaptor) throws TaskOnFailException {
@@ -164,42 +343,14 @@ public abstract class DataStagingTask extends AiravataTask {
throw new TaskOnFailException("Error while checking the file " + sourcePath + " existence", false, e);
}
- String localSourceFilePath = getLocalDataPath(fileName);
- try {
- try {
- logger.info("Downloading output file " + sourcePath + " to the local path " + localSourceFilePath);
- adaptor.copyFileFrom(sourcePath, localSourceFilePath);
- logger.info("Output file downloaded to " + localSourceFilePath);
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed downloading output file " + sourcePath + " to the local path " +
- localSourceFilePath, false, e);
- }
-
- File localFile = new File(localSourceFilePath);
- if (localFile.exists()) {
- /*if (localFile.length() == 0) {
- logger.warn("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
- return false;
- }*/
- } else {
- throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
- }
- // Uploading output file to the storage resource
- try {
- logger.info("Uploading the output file to " + destPath + " from local path " + localSourceFilePath);
- storageResourceAdaptor.uploadFile(localSourceFilePath, destPath);
- logger.info("Output file uploaded to " + destPath);
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed uploading the output file to " + destPath + " from local path " +
- localSourceFilePath, false, e);
- }
-
- return true;
- } finally {
- logger.info("Deleting temporary file " + localSourceFilePath);
- deleteTempFile(localSourceFilePath);
+ if (ServerSettings.isSteamingEnabled()) {
+ passThroughTransfer(adaptor, sourcePath, storageResourceAdaptor, destPath);
+ } else {
+ String tempPath = getLocalDataPath(fileName);
+ naiveTransfer(adaptor, sourcePath, storageResourceAdaptor, destPath, tempPath);
}
+ return true;
}
protected void deleteTempFile(String filePath) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index 60120aa..6ba0c24 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -79,12 +79,17 @@ public class InputDataStagingTask extends DataStagingTask {
sourceUrls = new String[]{dataStagingTaskModel.getSource()};
}
+ // Fetch and validate storage adaptor
+ StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
+ // Fetch and validate compute resource adaptor
+ AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
for (String url : sourceUrls) {
URI sourceURI = new URI(url);
URI destinationURI = new URI(dataStagingTaskModel.getDestination());
logger.info("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
- copySingleFile(sourceURI, destinationURI, taskHelper);
+ transferFileToComputeResource(sourceURI.getPath(), destinationURI.getPath(), adaptor, storageResourceAdaptor);
}
} catch (URISyntaxException e) {
@@ -107,54 +112,6 @@ public class InputDataStagingTask extends DataStagingTask {
}
}
- private void copySingleFile(URI sourceURI, URI destinationURI, TaskHelper taskHelper) throws TaskOnFailException {
-
- String sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
- sourceURI.getPath().length());
-
- // Fetch and validate storage adaptor
- StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
-
- // Fetch and validate compute resource adaptor
- AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
-
- String localSourceFilePath = getLocalDataPath(sourceFileName);
- // Downloading input file from the storage resource
-
- try {
- try {
- logger.info("Downloading input file " + sourceURI.getPath() + " to the local path " + localSourceFilePath);
- storageResourceAdaptor.downloadFile(sourceURI.getPath(), localSourceFilePath);
- logger.info("Input file downloaded to " + localSourceFilePath);
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed downloading input file " + sourceFileName + " to the local path " + localSourceFilePath, false, e);
- }
-
- File localFile = new File(localSourceFilePath);
- if (localFile.exists()) {
- if (localFile.length() == 0) {
- logger.error("Local file " + localSourceFilePath +" size is 0 so ignoring the upload");
- throw new TaskOnFailException("Input staging has failed as file " + localSourceFilePath + " size is 0", true, null);
- }
- } else {
- throw new TaskOnFailException("Local file does not exist at " + localSourceFilePath, false, null);
- }
-
- // Uploading input file to the compute resource
- try {
- logger.info("Uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath);
- adaptor.copyFileTo(localSourceFilePath, destinationURI.getPath());
- logger.info("Input file uploaded to " + destinationURI.getPath());
- } catch (AgentException e) {
- throw new TaskOnFailException("Failed uploading the input file to " + destinationURI.getPath() + " from local path " + localSourceFilePath, false, e);
- }
-
- } finally {
- logger.info("Deleting temporary file " + localSourceFilePath);
- deleteTempFile(localSourceFilePath);
- }
- }
-
@Override
public void onCancel(TaskContext taskContext) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 5f9bda2..0934a87 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -31,12 +31,8 @@ import org.apache.airavata.helix.impl.task.submission.config.GroovyMapData;
import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfiguration;
import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
-import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.status.JobStatus;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
@@ -77,7 +73,7 @@ public abstract class JobSubmissionTask extends AiravataTask {
logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory +
" of compute resource " + getTaskContext().getComputeResourceId());
- agentAdaptor.copyFileTo(tempJobFile.getAbsolutePath(), workingDirectory);
+ agentAdaptor.uploadFile(tempJobFile.getAbsolutePath(), workingDirectory);
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath());
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 8820d45..63c99ba 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -69,4 +69,6 @@ public final class Constants {
public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
public static final String NEWLINE = System.getProperty("line.separator");
+
+ public static final String ENABLE_STREAMING_TRANSFER = "enable.streaming.transfer";
}
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index e8ff43f..e740cf5 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -502,4 +502,8 @@ public class ServerSettings extends ApplicationSettings {
public static String getSharingRegistryHost() {
return getSetting(SHARING_REGISTRY_HOST, "localhost");
}
+
+ public static Boolean isSteamingEnabled() {
+ return Boolean.valueOf(getSetting(Constants.ENABLE_STREAMING_TRANSFER, "True"));
+ }
}