You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:00 UTC
[airavata] 03/17: Stabalizing DefaultJobSubmission Task
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit b199bc2090ff9bc8b9c5827adeedcbfbfe181cc0
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Feb 26 01:11:29 2018 -0500
Stabalizing DefaultJobSubmission Task
---
.../airavata-helix/agent-impl/ssh-agent/pom.xml | 21 ++--
.../airavata/helix/agent/ssh/SshAgentAdaptor.java | 132 ++++++++++++---------
.../helix/agent/ssh/StandardOutReader.java | 80 ++++---------
.../helix/task/api/support/AdaptorSupport.java | 34 +-----
modules/airavata-helix/task-core/pom.xml | 5 +
.../helix/core/support/AdaptorSupportImpl.java | 19 +--
.../airavata/helix/impl/task/EnvSetupTask.java | 4 +-
.../airavata/helix/impl/task/TaskContext.java | 9 +-
.../impl/task/submission/GroovyMapBuilder.java | 74 +++++++++++-
.../helix/impl/task/submission/GroovyMapData.java | 10 +-
.../submission/task/DefaultJobSubmissionTask.java | 8 +-
.../submission/task/ForkJobSubmissionTask.java | 4 +-
.../task/submission/task/JobSubmissionTask.java | 13 +-
.../submission/task/LocalJobSubmissionTask.java | 6 +-
.../src/main/resources/airavata-server.properties | 4 +-
.../src/main/resources/log4j.properties | 2 +
16 files changed, 242 insertions(+), 183 deletions(-)
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
index 44cf919..bc78971 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
+++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
@@ -3,9 +3,10 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>agent-impl</artifactId>
- <groupId>org.apache</groupId>
- <version>1.0-SNAPSHOT</version>
+ <artifactId>airavata-helix</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.17-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -33,19 +34,15 @@
<artifactId>airavata-credential-store</artifactId>
<version>0.17-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>agent-api</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
<plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index 19b429c..ef8d580 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -13,7 +13,6 @@ import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.ComputeResource;
import java.io.*;
import java.util.Arrays;
@@ -130,47 +129,64 @@ public class SshAgentAdaptor implements AgentAdaptor {
public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
StandardOutReader commandOutput = new StandardOutReader();
+ ChannelExec channelExec = null;
try {
- ChannelExec channelExec = ((ChannelExec) session.openChannel("exec"));
+ channelExec = ((ChannelExec) session.openChannel("exec"));
channelExec.setCommand(command);
channelExec.setInputStream(null);
- channelExec.setErrStream(commandOutput.getStandardError());
+ InputStream out = channelExec.getInputStream();
+ InputStream err = channelExec.getErrStream();
channelExec.connect();
- commandOutput.onOutput(channelExec);
+
+ commandOutput.setExitCode(channelExec.getExitStatus());
+ commandOutput.readStdOutFromStream(out);
+ commandOutput.readStdErrFromStream(err);
return commandOutput;
} catch (JSchException e) {
+ e.printStackTrace();
+ throw new AgentException(e);
+ } catch (IOException e) {
+ e.printStackTrace();
throw new AgentException(e);
+ } finally {
+ if (channelExec != null) {
+ channelExec.disconnect();
+ }
}
}
public void createDirectory(String path) throws AgentException {
+ String command = "mkdir -p " + path;
+ ChannelExec channelExec = null;
try {
- String command = "mkdir -p " + path;
- Channel channel = session.openChannel("exec");
+ channelExec = (ChannelExec)session.openChannel("exec");
StandardOutReader stdOutReader = new StandardOutReader();
- ((ChannelExec) channel).setCommand(command);
+ channelExec.setCommand(command);
+ InputStream out = channelExec.getInputStream();
+ InputStream err = channelExec.getErrStream();
+ channelExec.connect();
+
+ stdOutReader.readStdOutFromStream(out);
+ stdOutReader.readStdErrFromStream(err);
- ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
- try {
- channel.connect();
- } catch (JSchException e) {
- channel.disconnect();
- System.out.println("Unable to retrieve command output. Command - " + command +
- " on server - " + session.getHost() + ":" + session.getPort() +
- " connecting user name - "
- + session.getUserName());
- throw new AgentException(e);
- }
- stdOutReader.onOutput(channel);
- if (stdOutReader.getStdErrorString().contains("mkdir:")) {
- throw new AgentException(stdOutReader.getStdErrorString());
+ if (stdOutReader.getStdError() != null && stdOutReader.getStdError().contains("mkdir:")) {
+ throw new AgentException(stdOutReader.getStdError());
}
-
- channel.disconnect();
} catch (JSchException e) {
+ System.out.println("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName());
+ throw new AgentException(e);
+ } catch (IOException e) {
+ e.printStackTrace();
throw new AgentException(e);
+ } finally {
+ if (channelExec != null) {
+ channelExec.disconnect();
+ }
}
}
@@ -182,20 +198,22 @@ public class SshAgentAdaptor implements AgentAdaptor {
}
boolean ptimestamp = true;
+ ChannelExec channelExec = null;
try {
// exec 'scp -t rfile' remotely
String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
- Channel channel = session.openChannel("exec");
+ channelExec = (ChannelExec)session.openChannel("exec");
StandardOutReader stdOutReader = new StandardOutReader();
- ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
- ((ChannelExec) channel).setCommand(command);
+ //channelExec.setErrStream(stdOutReader.getStandardError());
+ channelExec.setCommand(command);
// get I/O streams for remote scp
- OutputStream out = channel.getOutputStream();
- InputStream in = channel.getInputStream();
+ OutputStream out = channelExec.getOutputStream();
+ InputStream in = channelExec.getInputStream();
+ InputStream err = channelExec.getErrStream();
- channel.connect();
+ channelExec.connect();
if (checkAck(in) != 0) {
String error = "Error Reading input Stream";
@@ -255,12 +273,10 @@ public class SshAgentAdaptor implements AgentAdaptor {
throw new AgentException(error);
}
out.close();
- stdOutReader.onOutput(channel);
-
+ stdOutReader.readStdErrFromStream(err);
- channel.disconnect();
- if (stdOutReader.getStdErrorString().contains("scp:")) {
- throw new AgentException(stdOutReader.getStdErrorString());
+ if (stdOutReader.getStdError().contains("scp:")) {
+ throw new AgentException(stdOutReader.getStdError());
}
//since remote file is always a file we just return the file
//return remoteFile;
@@ -273,43 +289,47 @@ public class SshAgentAdaptor implements AgentAdaptor {
} catch (IOException e) {
e.printStackTrace();
throw new AgentException(e);
+ } finally {
+ if (channelExec != null) {
+ channelExec.disconnect();
+ }
}
}
@Override
public List<String> listDirectory(String path) throws AgentException {
-
+ String command = "ls " + path;
+ ChannelExec channelExec = null;
try {
- String command = "ls " + path;
- Channel channel = session.openChannel("exec");
+ channelExec = (ChannelExec)session.openChannel("exec");
StandardOutReader stdOutReader = new StandardOutReader();
- ((ChannelExec) channel).setCommand(command);
+ channelExec.setCommand(command);
+ InputStream out = channelExec.getInputStream();
+ InputStream err = channelExec.getErrStream();
- ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
- try {
- channel.connect();
- } catch (JSchException e) {
-
- channel.disconnect();
-// session.disconnect();
+ channelExec.connect();
- throw new AgentException("Unable to retrieve command output. Command - " + command +
- " on server - " + session.getHost() + ":" + session.getPort() +
- " connecting user name - "
- + session.getUserName(), e);
- }
- stdOutReader.onOutput(channel);
- stdOutReader.getStdOutputString();
- if (stdOutReader.getStdErrorString().contains("ls:")) {
- throw new AgentException(stdOutReader.getStdErrorString());
+ stdOutReader.readStdOutFromStream(out);
+ stdOutReader.readStdErrFromStream(err);
+ if (stdOutReader.getStdError().contains("ls:")) {
+ throw new AgentException(stdOutReader.getStdError());
}
- channel.disconnect();
- return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+ return Arrays.asList(stdOutReader.getStdOut().split("\n"));
} catch (JSchException e) {
+ throw new AgentException("Unable to retrieve command output. Command - " + command +
+ " on server - " + session.getHost() + ":" + session.getPort() +
+ " connecting user name - "
+ + session.getUserName(), e);
+ } catch (IOException e) {
+ e.printStackTrace();
throw new AgentException(e);
+ } finally {
+ if (channelExec != null) {
+ channelExec.disconnect();
+ }
}
}
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
index 49c036e..94ba566 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
@@ -2,11 +2,9 @@ package org.apache.airavata.helix.agent.ssh;
import com.jcraft.jsch.Channel;
import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.commons.io.IOUtils;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.*;
/**
* TODO: Class level comments please
@@ -16,68 +14,38 @@ import java.io.OutputStream;
*/
public class StandardOutReader implements CommandOutput {
- // Todo improve this. We need to direct access of std out and exit code
+ private String stdOut;
+ private String stdError;
+ private Integer exitCode;
- String stdOutputString = null;
- ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
- private int exitCode;
-
- public void onOutput(Channel channel) {
- try {
- StringBuffer pbsOutput = new StringBuffer("");
- InputStream inputStream = channel.getInputStream();
- byte[] tmp = new byte[1024];
- do {
- while (inputStream.available() > 0) {
- int i = inputStream.read(tmp, 0, 1024);
- if (i < 0) break;
- pbsOutput.append(new String(tmp, 0, i));
- }
- } while (!channel.isClosed()) ;
- String output = pbsOutput.toString();
- this.setStdOutputString(output);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void exitCode(int code) {
- System.out.println("Program exit code - " + code);
- this.exitCode = code;
- }
-
- public int getExitCode() {
- return exitCode;
- }
-
- public String getStdOutputString() {
- return stdOutputString;
- }
-
- public void setStdOutputString(String stdOutputString) {
- this.stdOutputString = stdOutputString;
+ @Override
+ public String getStdOut() {
+ return this.stdOut;
}
- public String getStdErrorString() {
- return errorStream.toString();
+ @Override
+ public String getStdError() {
+ return this.stdError;
}
- public OutputStream getStandardError() {
- return errorStream;
+ @Override
+ public Integer getExitCode() {
+ return this.exitCode;
}
- @Override
- public String getStdOut() {
- return null;
+ public void readStdOutFromStream(InputStream is) throws IOException {
+ StringWriter writer = new StringWriter();
+ IOUtils.copy(is, writer, "UTF-8");
+ this.stdOut = writer.toString();
}
- @Override
- public String getStdError() {
- return null;
+ public void readStdErrFromStream(InputStream is) throws IOException {
+ StringWriter writer = new StringWriter();
+ IOUtils.copy(is, writer, "UTF-8");
+ this.stdError = writer.toString();
}
- @Override
- public String getExitCommand() {
- return null;
+ public void setExitCode(Integer exitCode) {
+ this.exitCode = exitCode;
}
}
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
index 3e24aaa..4b6e11e 100644
--- a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
@@ -15,38 +15,6 @@ import java.io.File;
public interface AdaptorSupport {
public void initializeAdaptor();
- public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws Exception;
+ public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws Exception;
-
- /**
- *
- * @param command
- * @param workingDirectory
- * @param computeResourceId
- * @param protocol
- * @param authToken
- * @throws Exception
- */
- public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws Exception;
-
- /**
- *
- * @param path
- * @param computeResourceId
- * @param protocol
- * @param authToken
- * @throws Exception
- */
- public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws Exception;
-
- /**
- *
- * @param sourceFile
- * @param destinationFile
- * @param computeResourceId
- * @param protocol
- * @param authToken
- * @throws Exception
- */
- public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws Exception;
}
diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml
index df72dac..bf860f8 100644
--- a/modules/airavata-helix/task-core/pom.xml
+++ b/modules/airavata-helix/task-core/pom.xml
@@ -28,6 +28,11 @@
<artifactId>agent-api</artifactId>
<version>0.17-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>ssh-agent</artifactId>
+ <version>0.17-SNAPSHOT</version>
+ </dependency>
</dependencies>
<!--<build>
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
index 87a1e17..a98b8f0 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -1,6 +1,7 @@
package org.apache.airavata.helix.core.support;
import org.apache.airavata.agents.api.*;
+import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
import java.io.File;
@@ -29,19 +30,9 @@ public class AdaptorSupportImpl implements AdaptorSupport {
public void initializeAdaptor() {
}
- public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws AgentException {
- return fetchAdaptor(computeResourceId, protocol, authToken).executeCommand(command, workingDirectory);
- }
-
- public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws AgentException {
- fetchAdaptor(computeResourceId, protocol, authToken).createDirectory(path);
- }
-
- public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws AgentException {
- fetchAdaptor(computeResourceId, protocol, authToken).copyFile(sourceFile, destinationFile);
- }
-
- public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws AgentException {
- return agentStore.fetchAdaptor(computeResource, protocol, authToken);
+ public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws AgentException {
+ SshAgentAdaptor agentAdaptor = new SshAgentAdaptor();
+ agentAdaptor.init(computeResource, gatewayId, userId, authToken);
+ return agentAdaptor;
}
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index f079b9f..cabc014 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -24,9 +24,11 @@ public class EnvSetupTask extends AiravataTask {
try {
publishTaskState(TaskState.EXECUTING);
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
getTaskContext().getComputeResourceId(),
getTaskContext().getJobSubmissionProtocol().name(),
- getTaskContext().getComputeResourceCredentialToken());
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
adaptor.createDirectory(workingDirectory);
publishTaskState(TaskState.COMPLETED);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 7de738e..f33d8a1 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -85,7 +85,7 @@ public class TaskContext {
* Note: process context property use lazy loading approach. In runtime you will see some properties as null
* unless you have access it previously. Once that property access using the api,it will be set to correct value.
*/
- private TaskContext(String taskId, String processId, String gatewayId) {
+ private TaskContext(String processId, String gatewayId, String taskId) {
this.processId = processId;
this.gatewayId = gatewayId;
this.taskId = taskId;
@@ -784,7 +784,12 @@ public class TaskContext {
ctx.setGatewayResourceProfile(gatewayResourceProfile);
ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
ctx.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
-
+ ctx.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment()
+ .getApplicationDeployement(processModel.getApplicationDeploymentId()));
+ ctx.setApplicationInterfaceDescription(appCatalog.getApplicationInterface()
+ .getApplicationInterface(processModel.getApplicationInterfaceId()));
+ ctx.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
+ (ctx.getComputeResourceId()));
return ctx;
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
index 0b92922..16e8114 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
@@ -2,17 +2,21 @@ package org.apache.airavata.helix.impl.task.submission;
import groovy.text.GStringTemplateEngine;
import groovy.text.TemplateEngine;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
import org.apache.airavata.model.task.JobSubmissionTaskModel;
+import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -38,6 +42,8 @@ public class GroovyMapBuilder {
public GroovyMapData build() throws Exception {
GroovyMapData mapData = new GroovyMapData();
+
+ setMailAddresses(taskContext, mapData);
mapData.setInputDir(taskContext.getInputDir());
mapData.setOutputDir(taskContext.getOutputDir());
mapData.setExecutablePath(taskContext.getApplicationDeploymentDescription().getExecutablePath());
@@ -51,6 +57,7 @@ public class GroovyMapBuilder {
mapData.setAccountString(taskContext.getAllocationProjectNumber());
mapData.setReservation(taskContext.getReservation());
mapData.setJobName("A" + String.valueOf(generateJobName()));
+ mapData.setWorkingDirectory(taskContext.getWorkingDir());
List<String> inputValues = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), true);
inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), true));
@@ -332,4 +339,69 @@ public class GroovyMapBuilder {
}
}
+ private static void setMailAddresses(TaskContext taskContext, GroovyMapData groovyMap) throws AppCatalogException,
+ ApplicationSettingsException {
+
+ ProcessModel processModel = taskContext.getProcessModel();
+ String emailIds = null;
+ if (isEmailBasedJobMonitor(taskContext)) {
+ emailIds = ServerSettings.getEmailBasedMonitorAddress();
+ }
+ if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+ String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+ if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds += ("," + userJobNotifEmailIds);
+ } else {
+ emailIds = userJobNotifEmailIds;
+ }
+ }
+ if (processModel.isEnableEmailNotification()) {
+ List<String> emailList = processModel.getEmailAddresses();
+ String elist = listToCsv(emailList, ',');
+ if (elist != null && !elist.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds = emailIds + "," + elist;
+ } else {
+ emailIds = elist;
+ }
+ }
+ }
+ }
+ if (emailIds != null && !emailIds.isEmpty()) {
+ logger.info("Email list: " + emailIds);
+ groovyMap.setMailAddress(emailIds);
+ }
+ }
+
+ public static boolean isEmailBasedJobMonitor(TaskContext taskContext) throws AppCatalogException {
+ JobSubmissionProtocol jobSubmissionProtocol = taskContext.getPreferredJobSubmissionProtocol();
+ JobSubmissionInterface jobSubmissionInterface = taskContext.getPreferredJobSubmissionInterface();
+ if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+ String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = taskContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+ } else {
+ return false;
+ }
+ }
+
+ public static String listToCsv(List<String> listOfStrings, char separator) {
+ StringBuilder sb = new StringBuilder();
+
+ // all but last
+ for (int i = 0; i < listOfStrings.size() - 1; i++) {
+ sb.append(listOfStrings.get(i));
+ sb.append(separator);
+ }
+
+ // last string, no separator
+ if (listOfStrings.size() > 0) {
+ sb.append(listOfStrings.get(listOfStrings.size() - 1));
+ }
+
+ return sb.toString();
+ }
+
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
index 995f772..6ebde21 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
@@ -1,10 +1,11 @@
package org.apache.airavata.helix.impl.task.submission;
-import com.google.common.collect.ImmutableMap;
import groovy.lang.Writable;
import groovy.text.GStringTemplateEngine;
import groovy.text.TemplateEngine;
import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.File;
import java.lang.reflect.Field;
@@ -15,6 +16,8 @@ import java.util.Map;
public class GroovyMapData {
+ private static final Logger logger = LogManager.getLogger(GroovyMapData.class);
+
@ScriptTag(name = "inputDir")
private String inputDir;
@@ -453,6 +456,11 @@ public class GroovyMapData {
} catch (Exception e) {
throw new Exception("Error while generating script using groovy map");
}
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Groovy map as string for template " + templateName);
+ logger.trace(make.toString());
+ }
return make.toString();
}
}
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index fab4747..c85e18b 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -50,9 +50,11 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
if (mapData != null) {
//jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
getTaskContext().getComputeResourceId(),
getTaskContext().getJobSubmissionProtocol().name(),
- getTaskContext().getComputeResourceCredentialToken());
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
@@ -69,6 +71,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
statusList.add(new JobStatus(JobState.FAILED));
statusList.get(0).setReason(submissionOutput.getFailureReason());
jobModel.setJobStatuses(statusList);
+ jobModel.setJobDescription("Sample description");
saveJobModel(jobModel);
logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName());
@@ -83,6 +86,8 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
//taskStatus.setReason("Job submission command didn't return a jobId");
//taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
//taskContext.setTaskStatus(taskStatus);
+ logger.error("Standard error message : " + submissionOutput.getStdErr());
+ logger.error("Standard out message : " + submissionOutput.getStdOut());
return onFail("Job submission command didn't return a jobId", false, null);
} else {
@@ -116,6 +121,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
//TODO save task status??
} else if (jobId != null && !jobId.isEmpty()) {
+ logger.info("Received job id " + jobId + " from compute resource");
jobModel.setJobId(jobId);
saveJobModel(jobModel);
JobStatus jobStatus = new JobStatus();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
index 58b70ef..2e4a052 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -36,9 +36,11 @@ public class ForkJobSubmissionTask extends JobSubmissionTask {
if (mapData != null) {
//jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
getTaskContext().getComputeResourceId(),
getTaskContext().getJobSubmissionProtocol().name(),
- getTaskContext().getComputeResourceCredentialToken());
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index 11e59eb..1a024a7 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -28,6 +28,8 @@ import org.apache.airavata.model.status.JobStatus;
import org.apache.airavata.registry.cpi.*;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.File;
import java.security.SecureRandom;
@@ -35,7 +37,7 @@ import java.util.*;
public abstract class JobSubmissionTask extends AiravataTask {
-
+ private static final Logger logger = LogManager.getLogger(JobSubmissionTask.class);
@Override
public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
@@ -52,10 +54,19 @@ public abstract class JobSubmissionTask extends AiravataTask {
int number = new SecureRandom().nextInt();
number = (number < 0 ? -number : number);
File tempJobFile = new File(getLocalDataDir(), "job_" + Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
+
FileUtils.writeStringToFile(tempJobFile, scriptAsString);
+ logger.info("Job submission file for process " + getProcessId() + " was created at : " + tempJobFile.getAbsolutePath());
+ logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory +
+ " of compute resource " + getTaskContext().getComputeResourceId());
+ agentAdaptor.copyFile(tempJobFile.getAbsolutePath(), workingDirectory);
// TODO transfer file
RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath());
+
+ logger.debug("Submit command for process id " + getProcessId() + " : " + submitCommand.getRawCommand());
+ logger.debug("Working directory for process id " + getProcessId() + " : " + workingDirectory);
+
CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
JobSubmissionOutput jsoutput = new JobSubmissionOutput();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
index 67ad0db..e3ae4fa 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -43,10 +43,12 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
saveJobModel(jobModel);
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+ getTaskContext().getGatewayId(),
getTaskContext().getComputeResourceId(),
getTaskContext().getJobSubmissionProtocol().name(),
- getTaskContext().getComputeResourceCredentialToken());
-
+ getTaskContext().getComputeResourceCredentialToken(),
+ getTaskContext().getComputeResourceLoginUserName());
+
GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, groovyMapData.getWorkingDirectory());
diff --git a/modules/helix-spectator/src/main/resources/airavata-server.properties b/modules/helix-spectator/src/main/resources/airavata-server.properties
index 5f47d79..b54b28c 100644
--- a/modules/helix-spectator/src/main/resources/airavata-server.properties
+++ b/modules/helix-spectator/src/main/resources/airavata-server.properties
@@ -202,9 +202,9 @@ job.notification.flags=abe
###########################################################################
# Credential Store module Configuration
###########################################################################
-credential.store.keystore.url=/home/pga/master-deployment/keystores/cred_store.jks
+credential.store.keystore.url=/Users/dimuthu/code/fork/airavata/modules/helix-spectator/src/main/resources/cred_store.jks
credential.store.keystore.alias=seckey
-credential.store.keystore.password=123456
+credential.store.keystore.password=credstore123
credential.store.jdbc.url=jdbc:mariadb://149.165.168.248:3306/credential_store
credential.store.jdbc.user=eroma
credential.store.jdbc.password=eroma123456
diff --git a/modules/helix-spectator/src/main/resources/log4j.properties b/modules/helix-spectator/src/main/resources/log4j.properties
index e910f32..69a4301 100644
--- a/modules/helix-spectator/src/main/resources/log4j.properties
+++ b/modules/helix-spectator/src/main/resources/log4j.properties
@@ -3,6 +3,8 @@ log4j.rootLogger=INFO, A1
log4j.category.org.apache.helix=WARN
log4j.category.org.apache.zookeeper=WARN
+log4j.category.org.apache.airavata.helix.impl.task.submission.GroovyMapData=TRACE
+log4j.category.org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask=DEBUG
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
--
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.