You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/09 19:14:16 UTC

airavata git commit: Added LocalRemoteCluster implementation and LocalCommandOutput reader

Repository: airavata
Updated Branches:
  refs/heads/master 08cdad264 -> c62f74a5e


Added LocalRemoteCluster implementation and LocalCommandOutput reader


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c62f74a5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c62f74a5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c62f74a5

Branch: refs/heads/master
Commit: c62f74a5ecaa9866f7337a3c25009dcf8ee52f35
Parents: 08cdad2
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Nov 9 13:14:06 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Nov 9 13:14:06 2015 -0500

----------------------------------------------------------------------
 .../gfac/core/cluster/RemoteCluster.java        |   4 +-
 .../airavata/gfac/impl/GFacEngineImpl.java      |  26 +--
 .../airavata/gfac/impl/HPCRemoteCluster.java    |   6 +-
 .../airavata/gfac/impl/LocalCommandOutput.java  |  60 ++++++
 .../airavata/gfac/impl/LocalRemoteCluster.java  | 182 +++++++++++++++++++
 .../gfac/impl/task/SCPDataStageTask.java        |   4 +-
 6 files changed, 257 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 59e7ff5..9e4544c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -51,7 +51,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
 	 * @param remoteFile remote file location, this can be a directory too
 	 * @throws SSHApiException throws exception during error
 	 */
-	public void scpTo(String localFile, String remoteFile) throws SSHApiException;
+	public void copyTo(String localFile, String remoteFile) throws SSHApiException;
 
 	/**
 	 * This will copy a remote file in path rFile to local file lFile
@@ -59,7 +59,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
 	 * @param remoteFile      remote file path, this has to be a full qualified path
 	 * @param localFile This is the local file to copy, this can be a directory too
 	 */
-	public void scpFrom(String remoteFile, String localFile) throws SSHApiException;
+	public void copyFrom(String remoteFile, String localFile) throws SSHApiException;
 
 	/**
 	 * This wil copy source remote file to target remote file.

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 4b67ffd..6444eb4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -343,21 +343,7 @@ public class GFacEngineImpl implements GFacEngine {
             taskContext.setTaskStatus(taskStatus);
             GFacUtils.saveAndPublishTaskStatus(taskContext);
 
-            if (taskStatus.getState() == TaskState.FAILED) {
-                log.error("expId: {}, processId: {}, taskId: {} type: {},:- Job submission task failed, " +
-                        "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
-                        .getParentProcessContext().getProcessId(), taskContext.getTaskId(), jobSubmissionTask.getType
-                        ().name(), taskStatus.getReason());
-                String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
-                        .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
-                        .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Job submission task failed. Reason: ")
-                        .append(taskStatus.getReason()).toString();
-                ErrorModel errorModel = new ErrorModel();
-                errorModel.setUserFriendlyMessage("Job submission task failed");
-                errorModel.setActualErrorMessage(errorMsg);
-                GFacUtils.saveTaskError(taskContext, errorModel);
-                throw new GFacException("Job submission task failed");
-            }
+            checkFailures(taskContext, taskStatus, jobSubmissionTask);
             return false;
         } catch (TException e) {
             throw new GFacException(e);
@@ -419,13 +405,18 @@ public class GFacEngineImpl implements GFacEngine {
         taskContext.setTaskStatus(taskStatus);
         GFacUtils.saveAndPublishTaskStatus(taskContext);
 
+        checkFailures(taskContext, taskStatus, dMoveTask);
+        return false;
+    }
+
+    private void checkFailures(TaskContext taskContext, TaskStatus taskStatus, Task dMoveTask) throws GFacException {
         if (taskStatus.getState() == TaskState.FAILED) {
             log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " +
                     "reason:" + " {}", taskContext.getParentProcessContext().getExperimentId(), taskContext
                     .getParentProcessContext().getProcessId(), taskContext.getTaskId(), dMoveTask.getType
                     ().name(), taskStatus.getReason());
-            String errorMsg = new StringBuilder("expId: ").append(processContext.getExperimentId()).append(", processId: ")
-                    .append(processContext.getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
+            String errorMsg = new StringBuilder("expId: ").append(taskContext.getParentProcessContext().getExperimentId()).append(", processId: ")
+                    .append(taskContext.getParentProcessContext().getProcessId()).append(", taskId: ").append(taskContext.getTaskId())
                     .append(", type: ").append(taskContext.getTaskType().name()).append(" :- Input staging failed. Reason: ")
                     .append(taskStatus.getReason()).toString();
             ErrorModel errorModel = new ErrorModel();
@@ -434,7 +425,6 @@ public class GFacEngineImpl implements GFacEngine {
             GFacUtils.saveTaskError(taskContext, errorModel);
             throw new GFacException("Error while staging input data");
         }
-        return false;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 288c98c..3711f7c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -83,7 +83,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	@Override
 	public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
 		JobSubmissionOutput jsoutput = new JobSubmissionOutput();
-		scpTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
+		copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
 		RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
 		submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
 		StandardOutReader reader = new StandardOutReader();
@@ -97,7 +97,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public void scpTo(String localFile, String remoteFile) throws SSHApiException {
+	public void copyTo(String localFile, String remoteFile) throws SSHApiException {
 		int retry = 3;
 		while (retry > 0) {
 			try {
@@ -128,7 +128,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
 	}
 
 	@Override
-	public void scpFrom(String remoteFile, String localFile) throws SSHApiException {
+	public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
 		int retry = 3;
 		while(retry>0) {
 			try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
new file mode 100644
index 0000000..e9d683d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalCommandOutput.java
@@ -0,0 +1,60 @@
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Channel;
+import org.apache.airavata.gfac.core.cluster.CommandOutput;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+
+/**
+ * Created by syodage on 11/9/15.
+ */
+public class LocalCommandOutput implements CommandOutput {
+    private Process process;
+
+    @Override
+    public void onOutput(Channel channel) {
+
+    }
+
+    public void readOutputs(Process process) {
+        this.process = process;
+    }
+
+    public String getStandardOut() throws IOException {
+        BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream()));
+        StringBuilder sb = new StringBuilder();
+        String s = null;
+        while ((s = stdInput.readLine()) != null) {
+            sb.append(s);
+        }
+        return sb.toString();
+    }
+
+    public String getStandardErrorString() throws IOException {
+        BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+        StringBuilder sb = new StringBuilder();
+        String s = null;
+        while ((s = stdError.readLine()) != null) {
+            sb.append(s);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public OutputStream getStandardError() {
+        return null;
+    }
+
+    @Override
+    public void exitCode(int code) {
+
+    }
+
+    @Override
+    public int getExitCode() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
new file mode 100644
index 0000000..d1aa6e0
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/LocalRemoteCluster.java
@@ -0,0 +1,182 @@
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.*;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.*;
+
+/**
+ * Created by shameera on 11/9/15.
+ */
+public class LocalRemoteCluster extends AbstractRemoteCluster {
+
+    public LocalRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo authenticationInfo) {
+        super(serverInfo, jobManagerConfiguration, authenticationInfo);
+    }
+
+    @Override
+    public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+        try {
+            JobSubmissionOutput jsoutput = new JobSubmissionOutput();
+            copyTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
+            RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
+            submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
+            LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+            executeCommand(submitCommand, localCommandOutput);
+            jsoutput.setJobId(outputParser.parseJobSubmission(localCommandOutput.getStandardOut()));
+            jsoutput.setExitCode(localCommandOutput.getExitCode());
+            jsoutput.setStdOut(localCommandOutput.getStandardOut());
+            jsoutput.setStdErr(localCommandOutput.getStandardErrorString());
+            return jsoutput;
+        } catch (IOException e) {
+            throw new SSHApiException("Error while submitting local batch job", e);
+        }
+    }
+
+    @Override
+    public void copyTo(String localFile, String remoteFile) throws SSHApiException {
+        Path sourcePath = Paths.get(localFile);
+        Path targetPath = Paths.get(remoteFile);
+        try {
+            Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
+        } catch (IOException e) {
+            throw new SSHApiException("Error while copying sourceFile: " + sourcePath.toString()
+                    + ", to destinationFile: " + targetPath.toString(), e);
+        }
+    }
+
+    @Override
+    public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
+        Path sourcePath = Paths.get(remoteFile);
+        Path targetPath = Paths.get(localFile);
+        try {
+            Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING);
+        } catch (IOException e) {
+            throw new SSHApiException("Error while copying sourceFile: " + sourcePath.toString()
+                    + ", to destinationFile: " + targetPath.toString(), e);
+        }
+    }
+
+    @Override
+    public void scpThirdParty(String sourceFile, String destinationFile, Session session, DIRECTION inOrOut) throws SSHApiException {
+        throw new UnsupportedOperationException("Scp third party is not support with LocalRemoteCluster");
+    }
+
+    @Override
+    public void makeDirectory(String directoryPath) throws SSHApiException {
+        Path dirPath = Paths.get(directoryPath);
+        Set<PosixFilePermission> perms = new HashSet<>();
+        // add permission as rw-r--r-- 644
+        perms.add(PosixFilePermission.OWNER_WRITE);
+        perms.add(PosixFilePermission.OWNER_READ);
+        perms.add(PosixFilePermission.GROUP_READ);
+        perms.add(PosixFilePermission.OTHERS_READ);
+        FileAttribute<Set<PosixFilePermission>> fileAttributes = PosixFilePermissions.asFileAttribute(perms);
+        try {
+            Files.createDirectory(dirPath, fileAttributes);
+        } catch (IOException e) {
+            throw new SSHApiException("Error making directory", e);
+        }
+
+    }
+
+    @Override
+    public JobStatus cancelJob(String jobID) throws SSHApiException {
+        JobStatus oldStatus = getJobStatus(jobID);
+        RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobID);
+        execute(cancelCommand);
+        return oldStatus;
+    }
+
+
+    @Override
+    public JobStatus getJobStatus(String jobID) throws SSHApiException {
+        RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobID);
+        LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+        try {
+            executeCommand(monitorCommand, localCommandOutput);
+            return outputParser.parseJobStatus(jobID, localCommandOutput.getStandardErrorString());
+        } catch (IOException e) {
+            throw new SSHApiException("Error while getting jobStatus", e);
+        }
+    }
+
+    @Override
+    public String getJobIdByJobName(String jobName, String userName) throws SSHApiException {
+        try {
+            RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
+            LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+            executeCommand(jobIdMonitorCommand, localCommandOutput);
+            return outputParser.parseJobId(jobName, localCommandOutput.getStandardOut());
+        } catch (IOException e) {
+            throw new SSHApiException("Error while getting jobId using JobName", e);
+        }
+    }
+
+    @Override
+    public void getJobStatuses(String userName, Map<String, JobStatus> jobStatusMap) throws SSHApiException {
+        try {
+            RawCommandInfo userBasedMonitorCommand = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+            LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+            executeCommand(userBasedMonitorCommand, localCommandOutput);
+            outputParser.parseJobStatuses(userName, jobStatusMap, localCommandOutput.getStandardOut());
+        } catch (IOException e) {
+            throw new SSHApiException("Error while getting job statuses", e);
+        }
+    }
+
+    @Override
+    public List<String> listDirectory(String directoryPath) throws SSHApiException {
+        File directory = new File(directoryPath);
+        List<String> results = new ArrayList<>();
+        File[] files = directory.listFiles();
+        for (File file : files) {
+            results.add(file.getName());
+        }
+        return results;
+    }
+
+    @Override
+    public boolean execute(CommandInfo commandInfo) throws SSHApiException {
+        LocalCommandOutput localCommandOutput = new LocalCommandOutput();
+        try {
+            executeCommand(commandInfo, localCommandOutput);
+        } catch (IOException e) {
+            throw new SSHApiException("Error while executing command " + commandInfo.getCommand(), e);
+        }
+        return true;
+    }
+
+    private void executeCommand(CommandInfo commandInfo, LocalCommandOutput localCommandOutput) throws IOException {
+        Process process = Runtime.getRuntime().exec(commandInfo.getCommand());
+        localCommandOutput.readOutputs(process);
+    }
+
+    @Override
+    public Session getSession() throws SSHApiException {
+        return null;
+    }
+
+    @Override
+    public void disconnect() throws SSHApiException {
+
+    }
+
+    @Override
+    public ServerInfo getServerInfo() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/c62f74a5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 9ffa6f2..32ee31b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -66,13 +66,13 @@ public class SCPDataStageTask implements Task {
 					/**
 					 * copy local file to compute resource.
 					 */
-					taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI
+					taskContext.getParentProcessContext().getRemoteCluster().copyTo(sourceURI.getPath(), destinationURI
 							.getPath());
 				} else if (processState == ProcessState.OUTPUT_DATA_STAGING) {
 					/**
 					 * copy remote file from compute resource.
 					 */
-					taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI
+					taskContext.getParentProcessContext().getRemoteCluster().copyFrom(sourceURI.getPath(), destinationURI
 							.getPath());
 				}
 				status.setReason("Successfully staged data");