You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/09 19:14:16 UTC
airavata git commit: Added LocalRemoteCluster implementation and
LocalCommandOutput reader
Repository: airavata
Updated Branches:
refs/heads/master 08cdad264 -> c62f74a5e
Added LocalRemoteCluster implementation and LocalCommandOutput reader
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c62f74a5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c62f74a5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c62f74a5
Branch: refs/heads/master
Commit: c62f74a5ecaa9866f7337a3c25009dcf8ee52f35
Parents: 08cdad2
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Nov 9 13:14:06 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Nov 9 13:14:06 2015 -0500
----------------------------------------------------------------------
.../gfac/core/cluster/RemoteCluster.java | 4 +-
.../airavata/gfac/impl/GFacEngineImpl.java | 26 +--
.../airavata/gfac/impl/HPCRemoteCluster.java | 6 +-
.../airavata/gfac/impl/LocalCommandOutput.java | 60 ++++++
.../airavata/gfac/impl/LocalRemoteCluster.java | 182 +++++++++++++++++++
.../gfac/impl/task/SCPDataStageTask.java | 4 +-
6 files changed, 257 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 59e7ff5..9e4544c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -51,7 +51,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param remoteFile remote file location, this can be a directory too
* @throws SSHApiException throws exception during error
*/
- public void scpTo(String localFile, String remoteFile) throws SSHApiException;
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException;
/**
* This will copy a remote file in path rFile to local file lFile
@@ -59,7 +59,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @param remoteFile remote file path, this has to be a full qualified path
* @param localFile This is the local file to copy, this can be a directory too
*/
- public void scpFrom(String remoteFile, String localFile) throws SSHApiException;
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException;
/**
* This wil copy source remote file to target remote file.
http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 4b67ffd..6444eb4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -343,21 +343,7 @@ public class GFacEngineImpl implements GFacEngine {
taskContext.setTaskStatus(taskStatus);
GFacUtils.saveAndPublishTaskStatus(taskContext);
- if (taskStatus.getState() == TaskState.FAILED) {
- log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
- "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
- .getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType
- ().name(), taskStatus.getReason());
- String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
- .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
- .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Job submission task failed. Reason: ")
- .append(taskStatus.getReason()).toString();
- ErrorModel errorModel = new ErrorModel();
- errorModel.setUserFriendlyMessage("Job submission task failed");
- errorModel.setActualErrorMessage(errorMsg);
- GFacUtils.saveTaskError(taskContext, errorModel);
- throw new GFacException("Job submission task failed");
- }
+ checkFailures(taskContext, taskStatus, jobSubmissionTask);
return false;
} catch (TException e) {
throw new GFacException(e);
@@ -419,13 +405,18 @@ public class GFacEngineImpl implements GFacEngine {
taskContext.setTaskStatus(taskStatus);
GFacUtils.saveAndPublishTaskStatus(taskContext);
+ checkFailures(taskContext, taskStatus, dMoveTask);
+ return false;
+ }
+
+ private void checkFailures(TaskContext taskContext, TaskStatus taskStatus, Task dMoveTask) throws GFacException {
if (taskStatus.getState() == TaskState.FAILED) {
log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
"reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
.getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
().name(), taskStatus.getReason());
- String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
- .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+ String errorMsg = new StringBuilder("expId: ").append(taskContext.getParentProcessContext().getExperimentId()).append(", processId: ")
+ .append(taskContext.getParentProcessContext().getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
.append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
.append(taskStatus.getReason()).toString();
ErrorModel errorModel = new ErrorModel();
@@ -434,7 +425,6 @@ public class GFacEngineImpl implements GFacEngine {
GFacUtils.saveTaskError(taskContext, errorModel);
throw new GFacException("Error while staging input data");
}
- return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 288c98c..3711f7c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -83,7 +83,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
@Override
public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
- scpTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
+ copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
StandardOutReader reader = new StandardOutReader();
@@ -97,7 +97,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void scpTo(String localFile, String remoteFile) throws SSHApiException {
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException {
int retry = 3;
while (retry > 0) {
try {
@@ -128,7 +128,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public void scpFrom(String remoteFile, String localFile) throws SSHApiException {
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
int retry = 3;
while(retry>0) {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
new file mode 100644
index 0000000..e9d683d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
@@ -0,0 +1,60 @@
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+
+/**
+ * Created by syodage on 11/9/15.
+ */
+public class LocalCommandOutput implements CommandOutput {
+ private Process process;
+
+ @Override
+ public void onOutput(Channel channel) {
+
+ }
+
+ public void readOutputs(Process process) {
+ this.process = process;
+ }
+
+ public String getStandardOut() throws IOException {
+ BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ StringBuilder sb = new StringBuilder();
+ String s = null;
+ while ((s = stdInput.readLine()) != null) {
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ public String getStandardErrorString() throws IOException {
+ BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+ StringBuilder sb = new StringBuilder();
+ String s = null;
+ while ((s = stdError.readLine()) != null) {
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public OutputStream getStandardError() {
+ return null;
+ }
+
+ @Override
+ public void exitCode(int code) {
+
+ }
+
+ @Override
+ public int getExitCode() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
new file mode 100644
index 0000000..d1aa6e0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -0,0 +1,182 @@
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.*;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.*;
+
+/**
+ * Created by shameera on 11/9/15.
+ */
+public class LocalRemoteCluster extends AbstractRemoteCluster {
+
+ public LocalRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo authenticationInfo) {
+ super(serverInfo, jobManagerConfiguration, authenticationInfo);
+ }
+
+ @Override
+ public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+ try {
+ JobSubmissionOutput jsoutput = new JobSubmissionOutput();
+ copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
+ RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
+ submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
+ LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+ executeCommand(submitCommand, localCommandOutput);
+ jsoutput.setJobId(outputParser.parseJobSubmission(localCommandOutput.getStandardOut()));
+ jsoutput.setExitCode(localCommandOutput.getExitCode());
+ jsoutput.setStdOut(localCommandOutput.getStandardOut());
+ jsoutput.setStdErr(localCommandOutput.getStandardErrorString());
+ return jsoutput;
+ } catch (IOException e) {
+ throw new SSHApiException("Error while submitting local batch job", e);
+ }
+ }
+
+ @Override
+ public void copyTo(String localFile, String remoteFile) throws SSHApiException {
+ Path sourcePath = Paths.get(localFile);
+ Path targetPath = Paths.get(remoteFile);
+ try {
+ Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new SSHApiException("Error while copying sourceFile: " + sourcePath.toString()
+ + ", to destinationFile: " + targetPath.toString(), e);
+ }
+ }
+
+ @Override
+ public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
+ Path sourcePath = Paths.get(remoteFile);
+ Path targetPath = Paths.get(localFile);
+ try {
+ Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new SSHApiException("Error while copying sourceFile: " + sourcePath.toString()
+ + ", to destinationFile: " + targetPath.toString(), e);
+ }
+ }
+
+ @Override
+ public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut) throws SSHApiException {
+ throw new UnsupportedOperationException("Scp third party is not support with LocalRemoteCluster");
+ }
+
+ @Override
+ public void makeDirectory(String directoryPath) throws SSHApiException {
+ Path dirPath = Paths.get(directoryPath);
+ Set<PosixFilePermission> perms = new HashSet<>();
+ // add permission as rw-r--r-- 644
+ perms.add(PosixFilePermission.OWNER_WRITE);
+ perms.add(PosixFilePermission.OWNER_READ);
+ perms.add(PosixFilePermission.GROUP_READ);
+ perms.add(PosixFilePermission.OTHERS_READ);
+ FileAttribute<Set<PosixFilePermission>> fileAttributes = PosixFilePermissions.asFileAttribute(perms);
+ try {
+ Files.createDirectory(dirPath, fileAttributes);
+ } catch (IOException e) {
+ throw new SSHApiException("Error making directory", e);
+ }
+
+ }
+
+ @Override
+ public JobStatus cancelJob(String jobID) throws SSHApiException {
+ JobStatus oldStatus = getJobStatus(jobID);
+ RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobID);
+ execute(cancelCommand);
+ return oldStatus;
+ }
+
+
+ @Override
+ public JobStatus getJobStatus(String jobID) throws SSHApiException {
+ RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobID);
+ LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+ try {
+ executeCommand(monitorCommand, localCommandOutput);
+ return outputParser.parseJobStatus(jobID, localCommandOutput.getStandardErrorString());
+ } catch (IOException e) {
+ throw new SSHApiException("Error while getting jobStatus", e);
+ }
+ }
+
+ @Override
+ public String getJobIdByJobName(String jobName, String userName) throws SSHApiException {
+ try {
+ RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+ LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+ executeCommand(jobIdMonitorCommand, localCommandOutput);
+ return outputParser.parseJobId(jobName, localCommandOutput.getStandardOut());
+ } catch (IOException e) {
+ throw new SSHApiException("Error while getting jobId using JobName", e);
+ }
+ }
+
+ @Override
+ public void getJobStatuses(String userName, Map<String, JobStatus> jobStatusMap) throws SSHApiException {
+ try {
+ RawCommandInfo userBasedMonitorCommand = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+ LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+ executeCommand(userBasedMonitorCommand, localCommandOutput);
+ outputParser.parseJobStatuses(userName, jobStatusMap, localCommandOutput.getStandardOut());
+ } catch (IOException e) {
+ throw new SSHApiException("Error while getting job statuses", e);
+ }
+ }
+
+ @Override
+ public List<String> listDirectory(String directoryPath) throws SSHApiException {
+ File directory = new File(directoryPath);
+ List<String> results = new ArrayList<>();
+ File[] files = directory.listFiles();
+ for (File file : files) {
+ results.add(file.getName());
+ }
+ return results;
+ }
+
+ @Override
+ public boolean execute(CommandInfo commandInfo) throws SSHApiException {
+ LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+ try {
+ executeCommand(commandInfo, localCommandOutput);
+ } catch (IOException e) {
+ throw new SSHApiException("Error while executing command " + commandInfo.getCommand(), e);
+ }
+ return true;
+ }
+
+ private void executeCommand(CommandInfo commandInfo, LocalCommandOutput localCommandOutput) throws IOException {
+ Process process = Runtime.getRuntime().exec(commandInfo.getCommand());
+ localCommandOutput.readOutputs(process);
+ }
+
+ @Override
+ public Session getSession() throws SSHApiException {
+ return null;
+ }
+
+ @Override
+ public void disconnect() throws SSHApiException {
+
+ }
+
+ @Override
+ public ServerInfo getServerInfo() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 9ffa6f2..32ee31b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -66,13 +66,13 @@ public class SCPDataStageTask implements Task {
/**
* copy local file to compute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI
+ taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
.getPath());
} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
/**
* copy remote file from compute resource.
*/
- taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI
+ taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
.getPath());
}
status.setReason("Successfully staged data");