You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sc...@apache.org on 2015/09/05 09:05:59 UTC
[11/16] airavata git commit: Added JobSubmissionOutput pojo class to
keep output of jobsubmission command with jobId
Added JobSubmissionOutput pojo class to keep output of jobsubmission command with jobId
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/840ae1ad
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/840ae1ad
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/840ae1ad
Branch: refs/heads/master
Commit: 840ae1add331a23663687efce23a782e679e986d
Parents: 88afef1
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Fri Sep 4 15:21:49 2015 -0400
Committer: Supun Nakandala <sc...@apache.org>
Committed: Sat Sep 5 12:24:24 2015 +0530
----------------------------------------------------------------------
.../gfac/core/cluster/CommandOutput.java | 7 ++
.../gfac/core/cluster/JobSubmissionOutput.java | 70 ++++++++++++++++++++
.../gfac/core/cluster/RemoteCluster.java | 2 +-
.../airavata/gfac/impl/HPCRemoteCluster.java | 11 ++-
.../airavata/gfac/impl/StandardOutReader.java | 12 +++-
.../impl/task/SSHForkJobSubmissionTask.java | 12 +++-
.../gfac/impl/task/SSHJobSubmissionTask.java | 10 ++-
7 files changed, 116 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/CommandOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/CommandOutput.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/CommandOutput.java
index e50d25a..b64d3fb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/CommandOutput.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/CommandOutput.java
@@ -46,4 +46,11 @@ public interface CommandOutput {
* @param code The program exit code
*/
void exitCode(int code);
+
+ /**
+ * Return the exit code of the command execution.
+ * @return exit code
+ */
+ int getExitCode();
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
new file mode 100644
index 0000000..7506707
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/JobSubmissionOutput.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core.cluster;
+
+public class JobSubmissionOutput {
+
+ private int exitCode;
+ private String stdOut;
+ private String stdErr;
+ private String command;
+ private String jobId;
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void setExitCode(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public String getStdOut() {
+ return stdOut;
+ }
+
+ public void setStdOut(String stdOut) {
+ this.stdOut = stdOut;
+ }
+
+ public String getStdErr() {
+ return stdErr;
+ }
+
+ public void setStdErr(String stdErr) {
+ this.stdErr = stdErr;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
index 6afeb59..932451b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cluster/RemoteCluster.java
@@ -42,7 +42,7 @@ public interface RemoteCluster { // FIXME: replace SSHApiException with suitable
* @return jobId after successful job submission
* @throws SSHApiException throws exception during error
*/
- public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException;
+ public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException;
/**
* This will copy the localFile to remoteFile location in configured cluster
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index a3fb80e..796dd1c 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -33,6 +33,7 @@ import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
import org.apache.airavata.gfac.core.cluster.AbstractRemoteCluster;
import org.apache.airavata.gfac.core.cluster.CommandInfo;
import org.apache.airavata.gfac.core.cluster.CommandOutput;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
import org.apache.airavata.gfac.core.cluster.RawCommandInfo;
import org.apache.airavata.gfac.core.cluster.ServerInfo;
import org.apache.airavata.model.status.JobStatus;
@@ -80,14 +81,19 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}
@Override
- public String submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+ public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory) throws SSHApiException {
+ JobSubmissionOutput jsoutput = new JobSubmissionOutput();
scpTo(jobScriptFilePath, workingDirectory); // scp script file to working directory
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, jobScriptFilePath);
StandardOutReader reader = new StandardOutReader();
executeCommand(submitCommand, reader);
throwExceptionOnError(reader, submitCommand);
- return outputParser.parseJobSubmission(reader.getStdOutputString());
+ jsoutput.setJobId(outputParser.parseJobSubmission(reader.getStdOutputString()));
+ jsoutput.setExitCode(reader.getExitCode());
+ jsoutput.setStdOut(reader.getStdOutputString());
+ jsoutput.setStdErr(reader.getStdErrorString());
+ return jsoutput;
}
@Override
@@ -276,6 +282,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
}finally {
//Only disconnecting the channel, session can be reused
if (channelExec != null) {
+ commandOutput.exitCode(channelExec.getExitStatus());
channelExec.disconnect();
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
index e34858b..b0f4c74 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/StandardOutReader.java
@@ -36,7 +36,9 @@ public class StandardOutReader implements CommandOutput {
private static final Logger logger = LoggerFactory.getLogger(StandardOutReader.class);
String stdOutputString = null;
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
- public void onOutput(Channel channel) {
+ private int exitCode;
+
+ public void onOutput(Channel channel) {
try {
StringBuffer pbsOutput = new StringBuffer("");
InputStream inputStream = channel.getInputStream();
@@ -59,9 +61,15 @@ public class StandardOutReader implements CommandOutput {
public void exitCode(int code) {
System.out.println("Program exit code - " + code);
+ this.exitCode = code;
}
- public String getStdOutputString() {
+ @Override
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public String getStdOutputString() {
return stdOutputString;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
index ddd33e1..471dbfd 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHForkJobSubmissionTask.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.impl.task;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
@@ -72,8 +73,13 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
if (jobFile != null && jobFile.exists()) {
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
- if (jobId != null && !jobId.isEmpty()) {
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+ jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+ jobModel.setStderr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdout(jobSubmissionOutput.getStdOut());
+ String jobId = jobSubmissionOutput.getJobId();
+ if (jobId != null && !jobId.isEmpty()) {
jobModel.setJobId(jobId);
GFacUtils.saveJobModel(processContext, jobModel);
jobStatus.setJobState(JobState.SUBMITTED);
@@ -93,6 +99,8 @@ public class SSHForkJobSubmissionTask implements JobSubmissionTask {
taskStatus.setState(TaskState.FAILED);
taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
}
+
+ GFacUtils.saveJobModel(processContext, jobModel);
} else {
taskStatus.setState(TaskState.FAILED);
if (jobFile == null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/840ae1ad/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index 34d0945..6acc312 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -24,6 +24,7 @@ package org.apache.airavata.gfac.impl.task;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.gfac.core.*;
+import org.apache.airavata.gfac.core.cluster.JobSubmissionOutput;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.context.TaskContext;
@@ -73,7 +74,12 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
if (jobFile != null && jobFile.exists()) {
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
- String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+ JobSubmissionOutput jobSubmissionOutput = remoteCluster.submitBatchJob(jobFile.getPath(),
+ processContext.getWorkingDir());
+ jobModel.setExitCode(jobSubmissionOutput.getExitCode());
+ jobModel.setStderr(jobSubmissionOutput.getStdErr());
+ jobModel.setStdout(jobSubmissionOutput.getStdOut());
+ String jobId = jobSubmissionOutput.getJobId();
if (jobId != null && !jobId.isEmpty()) {
jobModel.setJobId(jobId);
GFacUtils.saveJobModel(processContext, jobModel);
@@ -121,6 +127,8 @@ public class SSHJobSubmissionTask implements JobSubmissionTask {
taskStatus.setState(TaskState.FAILED);
taskStatus.setReason("Couldn't find job id in both submitted and verified steps");
}
+
+ GFacUtils.saveJobModel(processContext, jobModel);
} else {
taskStatus.setState(TaskState.FAILED);
if (jobFile == null) {