You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/11/02 21:30:01 UTC
[1/3] airavata git commit: Reverted scp refactoring
Repository: airavata
Updated Branches:
refs/heads/develop db027a53e -> fe6ebe9c0
Reverted scp refactoring
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dbcfd77b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dbcfd77b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dbcfd77b
Branch: refs/heads/develop
Commit: dbcfd77be3abd26f97a185f65649cd3ea1ac46c0
Parents: 4a92da4
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 2 17:24:27 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 2 17:24:27 2016 -0400
----------------------------------------------------------------------
.../gfac/core/cluster/RemoteCluster.java | 6 +-
.../airavata/gfac/impl/BESRemoteCluster.java | 6 +-
.../airavata/gfac/impl/HPCRemoteCluster.java | 19 +-
.../airavata/gfac/impl/LocalRemoteCluster.java | 9 +-
.../org/apache/airavata/gfac/impl/SSHUtils.java | 198 +++++++++----------
.../airavata/gfac/impl/task/ArchiveTask.java | 9 +-
.../gfac/impl/task/BESJobSubmissionTask.java | 5 +-
.../airavata/gfac/impl/task/DataStageTask.java | 13 +-
.../gfac/impl/task/EnvironmentSetupTask.java | 4 +-
.../gfac/impl/task/SCPDataStageTask.java | 44 +++--
.../gfac/impl/task/utils/StreamData.java | 43 ++--
11 files changed, 178 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index f59a9e3..5f8d0ec 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param remoteFile remote file location, this can be a directory too
* @throws SSHApiException throws exception during error
*/
- void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionSessionConsumer) throws SSHApiException;
+ void copyTo(String localFile, String remoteFile) throws SSHApiException;
/**
* This will copy a remote file in path rFile to local file lFile
@@ -60,7 +60,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param remoteFile remote file path, this has to be a full qualified path
* @param localFile This is the local file to copy, this can be a directory too
*/
- void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionSessionConsumer) throws SSHApiException;
+ void copyFrom(String remoteFile, String localFile) throws SSHApiException;
/**
* This wil copy source remote file to target remote file.
@@ -77,7 +77,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param directoryPath the full qualified path for the directory user wants to create
* @throws SSHApiException throws during error
*/
- void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException;
+ void makeDirectory(String directoryPath) throws SSHApiException;
/**
* This will delete the given job from the queue
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
index 828a34e..0f517b5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
@@ -43,12 +43,12 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException {
}
@Override
- public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
}
@@ -57,7 +57,7 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void makeDirectory(String directoryPath) throws SSHApiException {
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index b322cef..c3566b8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -86,8 +86,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
@Override
public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
- copyTo(jobScriptFilePath, workingDirectory,
- session -> SSHUtils.scpTo(jobScriptFilePath, workingDirectory, session)); // scp script file to working directory
+ copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
StandardOutReader reader = new StandardOutReader();
@@ -113,13 +112,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException {
int retry = 3;
while (retry > 0) {
try {
session = Factory.getSSHSession(authenticationInfo, serverInfo);
log.info("Transferring localhost:" + localFile + " to " + serverInfo.getHost() + ":" + remoteFile);
- sessionConsumer.consume(session);
+ SSHUtils.scpTo(localFile, remoteFile, session);
retry = 0;
} catch (Exception e) {
retry--;
@@ -140,13 +139,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
int retry = 3;
while(retry>0) {
try {
session = Factory.getSSHSession(authenticationInfo, serverInfo);
log.info("Transferring " + serverInfo.getHost() + ":" + remoteFile + " To localhost:" + localFile);
- sessionConsumer.consume(session);
+ SSHUtils.scpFrom(remoteFile, localFile, session);
retry=0;
} catch (Exception e) {
retry--;
@@ -191,7 +190,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void makeDirectory(String directoryPath) throws SSHApiException {
int retryCount = 0;
try {
while (retryCount < MAX_RETRY_COUNT) {
@@ -199,9 +198,9 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
session = Factory.getSSHSession(authenticationInfo, serverInfo);
log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
try {
- sessionConsumer.consume(session);
+ SSHUtils.makeDirectory(directoryPath, session);
break; // Exit while loop
- } catch (DataStagingException e) {
+ } catch (JSchException e) {
if (retryCount == MAX_RETRY_COUNT) {
log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for creating directory: "
+ serverInfo.getHost() + ":" + directoryPath, e);
@@ -211,7 +210,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
log.error("Issue with jsch, Retry creating directory: " + serverInfo.getHost() + ":" + directoryPath);
}
}
- } catch (AiravataException | DataStagingException e) {
+ } catch (JSchException | AiravataException | IOException e) {
throw new SSHApiException("Failed to create directory " + serverInfo.getHost() + ":" + directoryPath, e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
index aa79a0c..d5422d2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -60,8 +60,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
try {
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
- String remoteFile = workingDirectory + File.separator + new File(jobScriptFilePath).getName();
- copyTo(jobScriptFilePath, remoteFile, session -> SSHUtils.scpTo(jobScriptFilePath, remoteFile, session)); // scp script file to working directory
+ copyTo(jobScriptFilePath, workingDirectory + File.separator + new File(jobScriptFilePath).getName()); // scp script file to working directory
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
submitCommand.setRawCommand(submitCommand.getRawCommand());
LocalCommandOutput localCommandOutput = new LocalCommandOutput();
@@ -77,7 +76,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
}
@Override
- public void copyTo(String localFile, String remoteFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException {
Path sourcePath = Paths.get(localFile);
Path targetPath = Paths.get(remoteFile);
try {
@@ -89,7 +88,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
}
@Override
- public void copyFrom(String remoteFile, String localFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
Path sourcePath = Paths.get(remoteFile);
Path targetPath = Paths.get(localFile);
try {
@@ -126,7 +125,7 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
}
@Override
- public void makeDirectory(String directoryPath, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void makeDirectory(String directoryPath) throws SSHApiException {
Path dirPath = Paths.get(directoryPath);
Set<PosixFilePermission> perms = new HashSet<>();
// add permission as rwxr--r-- 744
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index 07e1799..cd5651e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -24,7 +24,6 @@ import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.SSHApiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,100 +53,97 @@ public class SSHUtils {
* @param localFile Local file to transfer, this can be a directory
* @return returns the final remote file path, so that users can use the new file location
*/
- public static String scpTo(String localFile, String remoteFile, Session session) throws DataStagingException {
+ public static String scpTo(String localFile, String remoteFile, Session session) throws IOException,
+ JSchException, SSHApiException {
FileInputStream fis = null;
String prefix = null;
if (new File(localFile).isDirectory()) {
prefix = localFile + File.separator;
}
boolean ptimestamp = true;
- Channel channel = null;
+
// exec 'scp -t rfile' remotely
String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
- try {
- channel = session.openChannel("exec");
+ Channel channel = session.openChannel("exec");
- StandardOutReader stdOutReader = new StandardOutReader();
- ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
- ((ChannelExec) channel).setCommand(command);
+ StandardOutReader stdOutReader = new StandardOutReader();
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
+ ((ChannelExec) channel).setCommand(command);
- // get I/O streams for remote scp
- try (OutputStream out = channel.getOutputStream();
- InputStream in = channel.getInputStream()) {
+ // get I/O streams for remote scp
+ OutputStream out = channel.getOutputStream();
+ InputStream in = channel.getInputStream();
- channel.connect();
- if (checkAck(in) != 0) {
- String error = "Error Reading input Stream";
- log.error(error);
- throw new DataStagingException(error);
- }
+ channel.connect();
- File _lfile = new File(localFile);
-
- if (ptimestamp) {
- command = "T" + (_lfile.lastModified() / 1000) + " 0";
- // The access time should be sent here,
- // but it is not accessible with JavaAPI ;-<
- command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
- out.write(command.getBytes());
- out.flush();
- if (checkAck(in) != 0) {
- String error = "Error Reading input Stream";
- log.error(error);
- throw new DataStagingException(error);
- }
- }
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
- // send "C0644 filesize filename", where filename should not include '/'
- long filesize = _lfile.length();
- command = "C0644 " + filesize + " ";
- if (localFile.lastIndexOf('/') > 0) {
- command += localFile.substring(localFile.lastIndexOf('/') + 1);
- } else {
- command += localFile;
- }
- command += "\n";
- out.write(command.getBytes());
- out.flush();
- if (checkAck(in) != 0) {
- String error = "Error Reading input Stream";
- log.error(error);
- throw new DataStagingException(error);
- }
+ File _lfile = new File(localFile);
- // send a content of localFile
- fis = new FileInputStream(localFile);
- byte[] buf = new byte[1024];
- while (true) {
- int len = fis.read(buf, 0, buf.length);
- if (len <= 0) break;
- out.write(buf, 0, len); //out.flush();
- }
- fis.close();
- fis = null;
- // send '\0'
- buf[0] = 0;
- out.write(buf, 0, 1);
- out.flush();
- if (checkAck(in) != 0) {
- String error = "Error Reading input Stream";
- log.error(error);
- throw new DataStagingException(error);
- }
- }
- stdOutReader.onOutput(channel);
- if (stdOutReader.getStdErrorString().contains("scp:")) {
- throw new DataStagingException(stdOutReader.getStdErrorString());
- }
- //since remote file is always a file we just return the file
- return remoteFile;
- } catch (IOException | JSchException e) {
- throw new DataStagingException(e);
- } finally {
- if (channel != null && channel.isConnected()) {
- channel.disconnect();
+ if (ptimestamp) {
+ command = "T" + (_lfile.lastModified() / 1000) + " 0";
+ // The access time should be sent here,
+ // but it is not accessible with JavaAPI ;-<
+ command += (" " + (_lfile.lastModified() / 1000) + " 0\n");
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
}
}
+
+ // send "C0644 filesize filename", where filename should not include '/'
+ long filesize = _lfile.length();
+ command = "C0644 " + filesize + " ";
+ if (localFile.lastIndexOf('/') > 0) {
+ command += localFile.substring(localFile.lastIndexOf('/') + 1);
+ } else {
+ command += localFile;
+ }
+ command += "\n";
+ out.write(command.getBytes());
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+
+ // send a content of localFile
+ fis = new FileInputStream(localFile);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int len = fis.read(buf, 0, buf.length);
+ if (len <= 0) break;
+ out.write(buf, 0, len); //out.flush();
+ }
+ fis.close();
+ fis = null;
+ // send '\0'
+ buf[0] = 0;
+ out.write(buf, 0, 1);
+ out.flush();
+ if (checkAck(in) != 0) {
+ String error = "Error Reading input Stream";
+ log.error(error);
+ throw new SSHApiException(error);
+ }
+ out.close();
+ stdOutReader.onOutput(channel);
+
+
+ channel.disconnect();
+ if (stdOutReader.getStdErrorString().contains("scp:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
+ }
+ //since remote file is always a file we just return the file
+ return remoteFile;
}
/**
@@ -157,7 +153,8 @@ public class SSHUtils {
* @param localFile This is the local file to copy, this can be a directory too
* @return returns the final local file path of the new file came from the remote resource
*/
- public static void scpFrom(String remoteFile, String localFile, Session session) throws DataStagingException {
+ public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException,
+ JSchException, SSHApiException {
FileOutputStream fos = null;
try {
String prefix = null;
@@ -258,7 +255,6 @@ public class SSHUtils {
} catch (Exception e) {
log.error(e.getMessage(), e);
- throw new DataStagingException(e);
} finally {
try {
if (fos != null) fos.close();
@@ -276,11 +272,8 @@ public class SSHUtils {
* @param destinationSession JSch Session for target
* @return returns the final local file path of the new file came from the remote resource
*/
- public static void scpThirdParty(String sourceFile,
- Session sourceSession,
- String destinationFile,
- Session destinationSession,
- boolean ignoreEmptyFile) throws DataStagingException {
+ public static void scpThirdParty(String sourceFile, Session sourceSession, String destinationFile, Session destinationSession, boolean ignoreEmptyFile) throws
+ IOException, JSchException {
OutputStream sout = null;
InputStream sin = null;
OutputStream dout = null;
@@ -408,7 +401,7 @@ public class SSHUtils {
} catch (Exception e) {
log.error(e.getMessage(), e);
- throw new DataStagingException(e.getMessage());
+ throw new JSchException(e.getMessage());
} finally {
try {
if (dout != null) dout.close();
@@ -433,32 +426,35 @@ public class SSHUtils {
}
}
- public static void makeDirectory(String path, Session session) throws DataStagingException {
+ public static void makeDirectory(String path, Session session) throws IOException, JSchException, SSHApiException {
- Channel channel = null;
+ // exec 'scp -t rfile' remotely
String command = "mkdir -p " + path;
+ Channel channel = session.openChannel("exec");
+ StandardOutReader stdOutReader = new StandardOutReader();
+
+ ((ChannelExec) channel).setCommand(command);
+
+
+ ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
try {
- // exec 'scp -t rfile' remotely
- channel = session.openChannel("exec");
- StandardOutReader stdOutReader = new StandardOutReader();
channel.connect();
- stdOutReader.onOutput(channel);
- if (stdOutReader.getStdErrorString().contains("mkdir:")) {
- throw new DataStagingException(stdOutReader.getStdErrorString());
- }
-
} catch (JSchException e) {
+ channel.disconnect();
// session.disconnect();
log.error("Unable to retrieve command output. Command - " + command +
" on server - " + session.getHost() + ":" + session.getPort() +
" connecting user name - "
+ session.getUserName());
- throw new DataStagingException(e);
- }finally {
- if(channel != null && channel.isConnected())
- channel.disconnect();
+ throw e;
+ }
+ stdOutReader.onOutput(channel);
+ if (stdOutReader.getStdErrorString().contains("mkdir:")) {
+ throw new SSHApiException(stdOutReader.getStdErrorString());
}
+
+ channel.disconnect();
}
public static List<String> listDirectory(String path, Session session) throws IOException, JSchException,
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index 85d3fd2..88661f8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -138,8 +138,13 @@ public class ArchiveTask implements Task {
// move tar to storage resource
remoteCluster.execute(commandInfo);
URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, archiveTar);
- remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath, destinationURI.getPath(),
- session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true));
+ remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath ,destinationURI.getPath(), session -> {
+ try {
+ SSHUtils.scpThirdParty(sourceURI.getPath(),session, destinationURI.getPath(), sshSession, true);
+ } catch (Exception e) {
+ throw new DataStagingException("Error while transferring " + sourceURI.getPath() + " to " + destinationURI.getPath());
+ }
+ });
// delete tar in remote computer resource
commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath);
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index 1811286..990b9ea 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -31,7 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
import eu.unicore.util.httpclient.DefaultClientConfiguration;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -274,7 +273,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
break;
}
}
- } catch (DataStagingException | AiravataException | URISyntaxException e) {
+ } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) {
log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e);
throw new GFacException("Error while scp output files to remote storage file location", e);
}
@@ -318,7 +317,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask {
input.setValue("file:/" + localFilePath);
}
}
- } catch ( AiravataException | DataStagingException| URISyntaxException e) {
+ } catch (IOException | JSchException | AiravataException | SSHApiException | URISyntaxException e) {
log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e);
throw new GFacException("Error while scp input files to local file location", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
index a171f7f..8c6a125 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DataStageTask.java
@@ -24,7 +24,6 @@ import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.TaskState;
@@ -66,18 +65,14 @@ public class DataStageTask implements Task {
/**
* copy local file to compute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(
- sourceURI.getPath(),
- destinationURI.getPath(),
- session -> SSHUtils.scpTo(sourceURI.getPath(), destinationURI.getPath(), session));
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
+ .getPath());
} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
/**
* copy remote file from compute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(
- sourceURI.getPath(),
- destinationURI.getPath(),
- session -> SSHUtils.scpFrom(sourceURI.getPath(), destinationURI.getPath(), session));
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
+ .getPath());
}
status.setReason("Successfully staged data");
} catch (SSHApiException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
index bbe77d8..7de0282 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/EnvironmentSetupTask.java
@@ -25,7 +25,6 @@ import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.TaskState;
import org.apache.airavata.model.status.TaskStatus;
@@ -49,8 +48,7 @@ public class EnvironmentSetupTask implements Task {
TaskStatus status = new TaskStatus(TaskState.COMPLETED);
try {
RemoteCluster remoteCluster = taskContext.getParentProcessContext().getJobSubmissionRemoteCluster();
- String workingDir = taskContext.getParentProcessContext().getWorkingDir();
- remoteCluster.makeDirectory(workingDir, session -> SSHUtils.makeDirectory(workingDir, session));
+ remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
status.setReason("Successfully created environment");
} catch (SSHApiException e) {
String msg = "Error while environment setup";
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 4189f81..2788535 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -182,7 +182,7 @@ public class SCPDataStageTask implements Task {
errorModel.setUserFriendlyMessage(msg);
taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
return status;
- } catch (ApplicationSettingsException e) {
+ } catch (ApplicationSettingsException | FileNotFoundException e) {
String msg = "Failed while reading credentials";
log.error(msg, e);
status.setState(TaskState.FAILED);
@@ -219,7 +219,7 @@ public class SCPDataStageTask implements Task {
errorModel.setActualErrorMessage(e.getMessage());
errorModel.setUserFriendlyMessage(msg);
taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel));
- } catch (DataStagingException e) {
+ } catch (JSchException | IOException e) {
String msg = "Failed to do scp with client";
log.error(msg, e);
status.setState(TaskState.FAILED);
@@ -249,35 +249,39 @@ public class SCPDataStageTask implements Task {
}
private void inputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI
- destinationURI) throws SSHApiException {
+ destinationURI) throws SSHApiException, IOException, JSchException {
/**
* scp third party file transfer 'to' compute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
- sourceURI.getPath(),
- destinationURI.getPath(),
- session -> SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(), session, false));
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+ .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> {
+ try {
+ SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(), session, false);
+ } catch (Exception e) {
+ throw new DataStagingException("Error while file staging, from " + sourceURI.getPath()
+ + " to " + destinationURI.getPath());
+ }
+ });
}
private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
- throws SSHApiException, AiravataException, GFacException {
+ throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
/**
* scp third party file transfer 'from' comute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
- sourceURI.getPath(),
- destinationURI.getPath(),
- session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true));
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+ .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> {
+ try {
+ SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true);
+ } catch (Exception e) {
+ throw new DataStagingException("Error while file staging, from " + sourceURI.getPath()
+ + " to " + destinationURI.getPath());
+ }
+ });
// update output locations
- GFacUtils.saveExperimentOutput(
- taskContext.getParentProcessContext(),
- taskContext.getProcessOutput().getName(),
- destinationURI.toString());
- GFacUtils.saveProcessOutput(
- taskContext.getParentProcessContext(),
- taskContext.getProcessOutput().getName(),
- destinationURI.toString());
+ GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
+ GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dbcfd77b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index f5fd14d..fccce0d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -21,8 +21,10 @@
package org.apache.airavata.gfac.impl.task.utils;
+import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.credential.store.store.CredentialStoreException;
import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
@@ -30,6 +32,7 @@ import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
import org.apache.airavata.gfac.core.cluster.CommandInfo;
import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.impl.Factory;
@@ -41,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.TimerTask;
@@ -75,23 +79,25 @@ public class StreamData extends TimerTask {
}
// output staging should end when the job is complete
- if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE)
- || jobStatus.getJobState().equals(JobState.CANCELED)
- || jobStatus.getJobState().equals(JobState.FAILED)){
+ if (jobStatus != null && jobStatus.getJobState().equals(JobState.COMPLETE) || jobStatus.getJobState().equals(JobState.CANCELED) || jobStatus.getJobState().equals(JobState.FAILED)){
this.cancel();
}
} catch (URISyntaxException e) {
log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Erroneous path specified",
taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
taskContext.getProcessOutput().getName());
- } catch (AiravataException | SSHApiException e) {
+ } catch (IllegalAccessException | InstantiationException | AiravataException | IOException | JSchException | SSHApiException e) {
log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while streaming data",
taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
taskContext.getProcessOutput().getName());
+ } catch (CredentialStoreException e) {
+ log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error occurred while connecting with credential store",
+ taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
+ taskContext.getProcessOutput().getName());
}
}
- public void runOutputStaging() throws URISyntaxException, AiravataException, SSHApiException {
+ public void runOutputStaging() throws URISyntaxException, IllegalAccessException, InstantiationException, CredentialStoreException, AiravataException, IOException, JSchException, SSHApiException {
try {
AuthenticationInfo authenticationInfo = null;
@@ -117,7 +123,7 @@ public class StreamData extends TimerTask {
String targetPath = destinationURI.getPath().substring(0, destinationURI.getPath().lastIndexOf('/'));
SSHUtils.makeDirectory(targetPath, sshSession);
outputDataStaging(taskContext, sshSession, sourceURI, destinationURI);
- } catch (DataStagingException | GFacException e) {
+ } catch (GFacException e) {
log.error("expId: {}, processId:{}, taskId: {}:- Couldn't stage file {} , Error while output staging",
taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(),
taskContext.getProcessOutput().getName());
@@ -141,24 +147,23 @@ public class StreamData extends TimerTask {
}
private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
- throws SSHApiException, GFacException {
+ throws SSHApiException, AiravataException, IOException, JSchException, GFacException {
/**
* scp third party file transfer 'from' comute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster().thirdPartyTransfer(
- sourceURI.getPath(),
- destinationURI.getPath(),
- session -> SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true));
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster()
+ .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> {
+ try {
+ SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true);
+ } catch (Exception e) {
+ throw new DataStagingException("Error while file staging, from " + sourceURI.getPath()
+ + " to " + destinationURI.getPath());
+ }
+ });
// update output locations
- GFacUtils.saveExperimentOutput(
- taskContext.getParentProcessContext(),
- taskContext.getProcessOutput().getName(),
- destinationURI.getPath());
- GFacUtils.saveProcessOutput(
- taskContext.getParentProcessContext(),
- taskContext.getProcessOutput().getName(),
- destinationURI.getPath());
+ GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
+ GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
}
[2/3] airavata git commit: Revert scp thrird party refacotring
Posted by sh...@apache.org.
Revert scp thrird party refacotring
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a45294e4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a45294e4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a45294e4
Branch: refs/heads/develop
Commit: a45294e461cca0ea5b79fccdd75e969a201052e7
Parents: dbcfd77
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 2 17:29:28 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 2 17:29:28 2016 -0400
----------------------------------------------------------------------
.../gfac/core/DataStagingException.java | 43 --------------------
.../gfac/core/cluster/RemoteCluster.java | 33 +++++++--------
.../gfac/core/cluster/SessionConsumer.java | 28 -------------
.../airavata/gfac/impl/BESRemoteCluster.java | 3 +-
.../airavata/gfac/impl/HPCRemoteCluster.java | 18 ++++----
.../airavata/gfac/impl/LocalRemoteCluster.java | 17 ++++----
.../airavata/gfac/impl/task/ArchiveTask.java | 13 ++----
.../gfac/impl/task/SCPDataStageTask.java | 23 ++---------
.../gfac/impl/task/utils/StreamData.java | 12 +-----
9 files changed, 49 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
deleted file mode 100644
index 67ed0c7..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/DataStagingException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.gfac.core;
-
-public class DataStagingException extends Exception {
-
- public DataStagingException() {
- super();
- }
-
- public DataStagingException(String message) {
- super(message);
- }
-
- public DataStagingException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public DataStagingException(Throwable cause) {
- super(cause);
- }
-
- protected DataStagingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 5f8d0ec..3916573 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -43,7 +43,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return jobId after successful job submission
* @throws SSHApiException throws exception during error
*/
- JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException;
+ public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException;
/**
* This will copy the localFile to remoteFile location in configured cluster
@@ -52,7 +52,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param remoteFile remote file location, this can be a directory too
* @throws SSHApiException throws exception during error
*/
- void copyTo(String localFile, String remoteFile) throws SSHApiException;
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException;
/**
* This will copy a remote file in path rFile to local file lFile
@@ -60,16 +60,18 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param remoteFile remote file path, this has to be a full qualified path
* @param localFile This is the local file to copy, this can be a directory too
*/
- void copyFrom(String remoteFile, String localFile) throws SSHApiException;
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException;
/**
* This wil copy source remote file to target remote file.
*
* @param sourceFile remote file path, this has to be a full qualified path
* @param destinationFile This is the local file to copy, this can be a directory too
+ * @param session jcraft session of other coner of thirdparty file transfer.
+ * @param inOrOut direction to file transfer , to the remote cluster(DIRECTION.IN) or from the remote cluster(DIRECTION.OUT)
*
*/
- void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException;
+ public void scpThirdParty(String sourceFile, String destinationFile ,Session session , DIRECTION inOrOut, boolean ignoreEmptyFile) throws SSHApiException;
/**
* This will create directories in computing resources
@@ -77,7 +79,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param directoryPath the full qualified path for the directory user wants to create
* @throws SSHApiException throws during error
*/
- void makeDirectory(String directoryPath) throws SSHApiException;
+ public void makeDirectory(String directoryPath) throws SSHApiException;
/**
* This will delete the given job from the queue
@@ -86,7 +88,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return return the description of the deleted job
* @throws SSHApiException throws exception during error
*/
- JobStatus cancelJob(String jobID) throws SSHApiException;
+ public JobStatus cancelJob(String jobID) throws SSHApiException;
/**
* This will get the job status of the the job associated with this jobId
@@ -95,7 +97,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return job status of the given jobID
* @throws SSHApiException throws exception during error
*/
- JobStatus getJobStatus(String jobID) throws SSHApiException;
+ public JobStatus getJobStatus(String jobID) throws SSHApiException;
/**
* This will get the job status of the the job associated with this jobId
@@ -104,7 +106,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return jobId of the given jobName
* @throws SSHApiException throws exception during error
*/
- String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException;
/**
* This method can be used to poll the jobstatuses based on the given
@@ -115,7 +117,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param userName userName of the jobs which required to get the status
* @param jobIDs precises set of jobIDs
*/
- void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException;
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws SSHApiException;
/**
* This will list directories in computing resources
@@ -123,7 +125,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param directoryPath the full qualified path for the directory user wants to create
* @throws SSHApiException throws during error
*/
- List<String> listDirectory(String directoryPath) throws SSHApiException;
+ public List<String> listDirectory(String directoryPath) throws SSHApiException;
/**
* This method can use to execute custom command on remote compute resource.
@@ -131,27 +133,26 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return <code>true</code> if command successfully executed, <code>false</code> otherwise.
* @throws SSHApiException
*/
- boolean execute(CommandInfo commandInfo) throws SSHApiException;
+ public boolean execute(CommandInfo commandInfo) throws SSHApiException;
/**
* This method can be used to get created ssh session
* to reuse the created session.
*/
- Session getSession() throws SSHApiException;
+ public Session getSession() throws SSHApiException;
/**
* This method can be used to close the connections initialized
* to handle graceful shutdown of the system
*/
- void disconnect() throws SSHApiException;
+ public void disconnect() throws SSHApiException;
/**
* This gives the server Info
*/
- ServerInfo getServerInfo();
-
- AuthenticationInfo getAuthentication();
+ public ServerInfo getServerInfo();
+ public AuthenticationInfo getAuthentication();
enum DIRECTION {
TO,
FROM
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
deleted file mode 100644
index 2f39d97..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/SessionConsumer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.gfac.core.cluster;
-
-import org.apache.airavata.gfac.core.DataStagingException;
-
-public interface SessionConsumer<S> {
-
- void consume(S session) throws DataStagingException;
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
index 0f517b5..3586ee8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
@@ -53,7 +53,8 @@ public class BESRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut, boolean ignoreEmptyFile) throws SSHApiException {
+
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index c3566b8..725b6d0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -26,7 +26,6 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.UserInfo;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
@@ -165,7 +164,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
+ public void scpThirdParty(String sourceFile, String destinationFile, Session clientSession, DIRECTION direction, boolean ignoreEmptyFile) throws SSHApiException {
int retryCount= 0;
try {
while (retryCount < MAX_RETRY_COUNT) {
@@ -173,19 +172,24 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
session = Factory.getSSHSession(authenticationInfo, serverInfo);
log.info("Transferring from:" + sourceFile + " To: " + destinationFile);
try {
- sessionConsumer.consume(session);
+ if (direction == DIRECTION.TO) {
+ SSHUtils.scpThirdParty(sourceFile, clientSession, destinationFile, session, ignoreEmptyFile);
+ } else {
+ SSHUtils.scpThirdParty(sourceFile, session, destinationFile, clientSession, ignoreEmptyFile);
+ }
break; // exit while loop
- } catch (DataStagingException e) {
+ } catch (JSchException e) {
if (retryCount == MAX_RETRY_COUNT) {
log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for transferring from:"
+ sourceFile + " To: " + destinationFile, e);
throw e;
}
- log.error("Issue with file staging, Retry transferring from:" + sourceFile + " To: " + destinationFile, e);
+ log.error("Issue with jsch, Retry transferring from:" + sourceFile + " To: " + destinationFile, e);
}
}
- } catch (AiravataException| DataStagingException e) {
- throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file " + destinationFile, e);
+ } catch (IOException | AiravataException| JSchException e) {
+ throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file "
+ +destinationFile , e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
index d5422d2..9294470 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.impl;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
@@ -100,16 +99,20 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
}
@Override
- public void thirdPartyTransfer(String sourceFile, String destinationFile, SessionConsumer<Session> sessionConsumer) throws SSHApiException {
- int retryCount = 0;
+ public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut, boolean ignoreEmptyFile) throws SSHApiException {
+ int retryCount= 0;
try {
while (retryCount < MAX_RETRY_COUNT) {
retryCount++;
log.info("Transferring from:" + sourceFile + " To: " + destinationFile);
try {
- sessionConsumer.consume(null);
+ if (inOrOut == DIRECTION.TO) {
+ SSHUtils.scpThirdParty(sourceFile, session, destinationFile, session, ignoreEmptyFile);
+ } else {
+ SSHUtils.scpThirdParty(sourceFile, session, destinationFile, session, ignoreEmptyFile);
+ }
break; // exit while loop
- } catch (DataStagingException e) {
+ } catch (JSchException e) {
if (retryCount == MAX_RETRY_COUNT) {
log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for transferring from:"
+ sourceFile + " To: " + destinationFile, e);
@@ -118,9 +121,9 @@ public class LocalRemoteCluster extends AbstractRemoteCluster {
log.error("Issue with jsch, Retry transferring from:" + sourceFile + " To: " + destinationFile, e);
}
}
- } catch (DataStagingException e) {
+ } catch (IOException | JSchException e) {
throw new SSHApiException("Failed scp file:" + sourceFile + " to remote file "
- + destinationFile, e);
+ +destinationFile , e);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
index 88661f8..df22654 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -26,7 +26,6 @@ import com.jcraft.jsch.Session;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
@@ -40,7 +39,6 @@ import org.apache.airavata.gfac.core.context.TaskContext;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
import org.apache.airavata.gfac.impl.StandardOutReader;
import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
@@ -121,6 +119,7 @@ public class ArchiveTask implements Task {
ServerInfo serverInfo = new ServerInfo(userName, hostName, DEFAULT_SSH_PORT);
Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo);
URI sourceURI = new URI(subTaskModel.getSource());
+ URI destinationURI = null;
String workingDirName = null, path = null;
if (sourceURI.getPath().endsWith("/")) {
path = sourceURI.getPath().substring(0, sourceURI.getPath().length() - 1);
@@ -137,14 +136,8 @@ public class ArchiveTask implements Task {
// move tar to storage resource
remoteCluster.execute(commandInfo);
- URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, archiveTar);
- remoteCluster.thirdPartyTransfer(resourceAbsTarFilePath ,destinationURI.getPath(), session -> {
- try {
- SSHUtils.scpThirdParty(sourceURI.getPath(),session, destinationURI.getPath(), sshSession, true);
- } catch (Exception e) {
- throw new DataStagingException("Error while transferring " + sourceURI.getPath() + " to " + destinationURI.getPath());
- }
- });
+ destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, archiveTar);
+ remoteCluster.scpThirdParty(resourceAbsTarFilePath ,destinationURI.getPath() , sshSession, RemoteCluster.DIRECTION.FROM, true);
// delete tar in remote computer resource
commandInfo = new RawCommandInfo("rm " + resourceAbsTarFilePath);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 2788535..6a8800e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -24,7 +24,6 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -253,15 +252,8 @@ public class SCPDataStageTask implements Task {
/**
* scp third party file transfer 'to' compute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster()
- .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> {
- try {
- SSHUtils.scpThirdParty(sourceURI.getPath(), sshSession, destinationURI.getPath(), session, false);
- } catch (Exception e) {
- throw new DataStagingException("Error while file staging, from " + sourceURI.getPath()
- + " to " + destinationURI.getPath());
- }
- });
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.TO, false);
}
private void outputDataStaging(TaskContext taskContext, Session sshSession, URI sourceURI, URI destinationURI)
@@ -270,15 +262,8 @@ public class SCPDataStageTask implements Task {
/**
* scp third party file transfer 'from' comute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster()
- .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> {
- try {
- SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true);
- } catch (Exception e) {
- throw new DataStagingException("Error while file staging, from " + sourceURI.getPath()
- + " to " + destinationURI.getPath());
- }
- });
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, true);
// update output locations
GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.toString());
http://git-wip-us.apache.org/repos/asf/airavata/blob/a45294e4/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
index fccce0d..375e570 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/StreamData.java
@@ -25,7 +25,6 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.DataStagingException;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -152,15 +151,8 @@ public class StreamData extends TimerTask {
/**
* scp third party file transfer 'from' comute resource.
*/
- taskContext.getParentProcessContext().getDataMovementRemoteCluster()
- .thirdPartyTransfer(sourceURI.getPath(), destinationURI.getPath(), session -> {
- try {
- SSHUtils.scpThirdParty(sourceURI.getPath(), session, destinationURI.getPath(), sshSession, true);
- } catch (Exception e) {
- throw new DataStagingException("Error while file staging, from " + sourceURI.getPath()
- + " to " + destinationURI.getPath());
- }
- });
+ taskContext.getParentProcessContext().getDataMovementRemoteCluster().scpThirdParty(sourceURI.getPath(),
+ destinationURI.getPath(), sshSession, RemoteCluster.DIRECTION.FROM, true);
// update output locations
GFacUtils.saveExperimentOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
GFacUtils.saveProcessOutput(taskContext.getParentProcessContext(), taskContext.getProcessOutput().getName(), destinationURI.getPath());
[3/3] airavata git commit: Merge with develop
Posted by sh...@apache.org.
Merge with develop
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/fe6ebe9c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/fe6ebe9c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/fe6ebe9c
Branch: refs/heads/develop
Commit: fe6ebe9c0ad075ac13e2a932b494dba13a1de0a1
Parents: a45294e db027a5
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 2 17:29:55 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 2 17:29:55 2016 -0400
----------------------------------------------------------------------
.../airavata/cloud/test/CloudIntfTest.java | 21 +++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------