You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2018/03/07 21:10:00 UTC

[airavata] 03/17: Stabalizing DefaultJobSubmission Task

This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch helix-integration
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit b199bc2090ff9bc8b9c5827adeedcbfbfe181cc0
Author: dimuthu <di...@gmail.com>
AuthorDate: Mon Feb 26 01:11:29 2018 -0500

    Stabalizing DefaultJobSubmission Task
---
 .../airavata-helix/agent-impl/ssh-agent/pom.xml    |  21 ++--
 .../airavata/helix/agent/ssh/SshAgentAdaptor.java  | 132 ++++++++++++---------
 .../helix/agent/ssh/StandardOutReader.java         |  80 ++++---------
 .../helix/task/api/support/AdaptorSupport.java     |  34 +-----
 modules/airavata-helix/task-core/pom.xml           |   5 +
 .../helix/core/support/AdaptorSupportImpl.java     |  19 +--
 .../airavata/helix/impl/task/EnvSetupTask.java     |   4 +-
 .../airavata/helix/impl/task/TaskContext.java      |   9 +-
 .../impl/task/submission/GroovyMapBuilder.java     |  74 +++++++++++-
 .../helix/impl/task/submission/GroovyMapData.java  |  10 +-
 .../submission/task/DefaultJobSubmissionTask.java  |   8 +-
 .../submission/task/ForkJobSubmissionTask.java     |   4 +-
 .../task/submission/task/JobSubmissionTask.java    |  13 +-
 .../submission/task/LocalJobSubmissionTask.java    |   6 +-
 .../src/main/resources/airavata-server.properties  |   4 +-
 .../src/main/resources/log4j.properties            |   2 +
 16 files changed, 242 insertions(+), 183 deletions(-)

diff --git a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
index 44cf919..bc78971 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
+++ b/modules/airavata-helix/agent-impl/ssh-agent/pom.xml
@@ -3,9 +3,10 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>agent-impl</artifactId>
-        <groupId>org.apache</groupId>
-        <version>1.0-SNAPSHOT</version>
+        <artifactId>airavata-helix</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.17-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
@@ -33,19 +34,15 @@
             <artifactId>airavata-credential-store</artifactId>
             <version>0.17-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>agent-api</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.5.1</version>
-                <configuration>
-                    <source>${java.version}</source>
-                    <target>${java.version}</target>
-                </configuration>
-            </plugin>
 
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
index 19b429c..ef8d580 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java
@@ -13,7 +13,6 @@ import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.ComputeResource;
 
 import java.io.*;
 import java.util.Arrays;
@@ -130,47 +129,64 @@ public class SshAgentAdaptor implements AgentAdaptor {
 
     public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
         StandardOutReader commandOutput = new StandardOutReader();
+        ChannelExec channelExec = null;
         try {
-            ChannelExec channelExec = ((ChannelExec) session.openChannel("exec"));
+            channelExec = ((ChannelExec) session.openChannel("exec"));
             channelExec.setCommand(command);
             channelExec.setInputStream(null);
-            channelExec.setErrStream(commandOutput.getStandardError());
+            InputStream out = channelExec.getInputStream();
+            InputStream err = channelExec.getErrStream();
             channelExec.connect();
-            commandOutput.onOutput(channelExec);
+
+            commandOutput.setExitCode(channelExec.getExitStatus());
+            commandOutput.readStdOutFromStream(out);
+            commandOutput.readStdErrFromStream(err);
             return commandOutput;
         } catch (JSchException e) {
+            e.printStackTrace();
+            throw new AgentException(e);
+        } catch (IOException e) {
+            e.printStackTrace();
             throw new AgentException(e);
+        } finally {
+            if (channelExec != null) {
+                channelExec.disconnect();
+            }
         }
     }
 
     public void createDirectory(String path) throws AgentException {
+        String command = "mkdir -p " + path;
+        ChannelExec channelExec = null;
         try {
-            String command = "mkdir -p " + path;
-            Channel channel = session.openChannel("exec");
+            channelExec = (ChannelExec)session.openChannel("exec");
             StandardOutReader stdOutReader = new StandardOutReader();
 
-            ((ChannelExec) channel).setCommand(command);
+            channelExec.setCommand(command);
+            InputStream out = channelExec.getInputStream();
+            InputStream err = channelExec.getErrStream();
+            channelExec.connect();
+
+            stdOutReader.readStdOutFromStream(out);
+            stdOutReader.readStdErrFromStream(err);
 
-            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
-            try {
-                channel.connect();
-            } catch (JSchException e) {
 
-                channel.disconnect();
-                System.out.println("Unable to retrieve command output. Command - " + command +
-                        " on server - " + session.getHost() + ":" + session.getPort() +
-                        " connecting user name - "
-                        + session.getUserName());
-                throw new AgentException(e);
-            }
-            stdOutReader.onOutput(channel);
-            if (stdOutReader.getStdErrorString().contains("mkdir:")) {
-                throw new AgentException(stdOutReader.getStdErrorString());
+            if (stdOutReader.getStdError() != null && stdOutReader.getStdError().contains("mkdir:")) {
+                throw new AgentException(stdOutReader.getStdError());
             }
-
-            channel.disconnect();
         } catch (JSchException e) {
+            System.out.println("Unable to retrieve command output. Command - " + command +
+                    " on server - " + session.getHost() + ":" + session.getPort() +
+                    " connecting user name - "
+                    + session.getUserName());
+            throw new AgentException(e);
+        } catch (IOException e) {
+            e.printStackTrace();
             throw new AgentException(e);
+        } finally {
+            if (channelExec != null) {
+                channelExec.disconnect();
+            }
         }
     }
 
@@ -182,20 +198,22 @@ public class SshAgentAdaptor implements AgentAdaptor {
         }
         boolean ptimestamp = true;
 
+        ChannelExec channelExec = null;
         try {
             // exec 'scp -t rfile' remotely
             String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile;
-            Channel channel = session.openChannel("exec");
+            channelExec = (ChannelExec)session.openChannel("exec");
 
             StandardOutReader stdOutReader = new StandardOutReader();
-            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
-            ((ChannelExec) channel).setCommand(command);
+            //channelExec.setErrStream(stdOutReader.getStandardError());
+            channelExec.setCommand(command);
 
             // get I/O streams for remote scp
-            OutputStream out = channel.getOutputStream();
-            InputStream in = channel.getInputStream();
+            OutputStream out = channelExec.getOutputStream();
+            InputStream in = channelExec.getInputStream();
+            InputStream err = channelExec.getErrStream();
 
-            channel.connect();
+            channelExec.connect();
 
             if (checkAck(in) != 0) {
                 String error = "Error Reading input Stream";
@@ -255,12 +273,10 @@ public class SshAgentAdaptor implements AgentAdaptor {
                 throw new AgentException(error);
             }
             out.close();
-            stdOutReader.onOutput(channel);
-
+            stdOutReader.readStdErrFromStream(err);
 
-            channel.disconnect();
-            if (stdOutReader.getStdErrorString().contains("scp:")) {
-                throw new AgentException(stdOutReader.getStdErrorString());
+            if (stdOutReader.getStdError().contains("scp:")) {
+                throw new AgentException(stdOutReader.getStdError());
             }
             //since remote file is always a file  we just return the file
             //return remoteFile;
@@ -273,43 +289,47 @@ public class SshAgentAdaptor implements AgentAdaptor {
         } catch (IOException e) {
             e.printStackTrace();
             throw new AgentException(e);
+        } finally {
+            if (channelExec != null) {
+                channelExec.disconnect();
+            }
         }
     }
 
     @Override
     public List<String> listDirectory(String path) throws AgentException {
-
+        String command = "ls " + path;
+        ChannelExec channelExec = null;
         try {
-            String command = "ls " + path;
-            Channel channel = session.openChannel("exec");
+            channelExec = (ChannelExec)session.openChannel("exec");
             StandardOutReader stdOutReader = new StandardOutReader();
 
-            ((ChannelExec) channel).setCommand(command);
+            channelExec.setCommand(command);
 
+            InputStream out = channelExec.getInputStream();
+            InputStream err = channelExec.getErrStream();
 
-            ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError());
-            try {
-                channel.connect();
-            } catch (JSchException e) {
-
-                channel.disconnect();
-//            session.disconnect();
+            channelExec.connect();
 
-                throw new AgentException("Unable to retrieve command output. Command - " + command +
-                        " on server - " + session.getHost() + ":" + session.getPort() +
-                        " connecting user name - "
-                        + session.getUserName(), e);
-            }
-            stdOutReader.onOutput(channel);
-            stdOutReader.getStdOutputString();
-            if (stdOutReader.getStdErrorString().contains("ls:")) {
-                throw new AgentException(stdOutReader.getStdErrorString());
+            stdOutReader.readStdOutFromStream(out);
+            stdOutReader.readStdErrFromStream(err);
+            if (stdOutReader.getStdError().contains("ls:")) {
+                throw new AgentException(stdOutReader.getStdError());
             }
-            channel.disconnect();
-            return Arrays.asList(stdOutReader.getStdOutputString().split("\n"));
+            return Arrays.asList(stdOutReader.getStdOut().split("\n"));
 
         } catch (JSchException e) {
+            throw new AgentException("Unable to retrieve command output. Command - " + command +
+                    " on server - " + session.getHost() + ":" + session.getPort() +
+                    " connecting user name - "
+                    + session.getUserName(), e);
+        } catch (IOException e) {
+            e.printStackTrace();
             throw new AgentException(e);
+        } finally {
+            if (channelExec != null) {
+                channelExec.disconnect();
+            }
         }
     }
 
diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
index 49c036e..94ba566 100644
--- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
+++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/StandardOutReader.java
@@ -2,11 +2,9 @@ package org.apache.airavata.helix.agent.ssh;
 
 import com.jcraft.jsch.Channel;
 import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.commons.io.IOUtils;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.*;
 
 /**
  * TODO: Class level comments please
@@ -16,68 +14,38 @@ import java.io.OutputStream;
  */
 public class StandardOutReader implements CommandOutput {
 
-    // Todo improve this. We need to direct access of std out and exit code
+    private String stdOut;
+    private String stdError;
+    private Integer exitCode;
 
-    String stdOutputString = null;
-    ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
-    private int exitCode;
-
-    public void onOutput(Channel channel) {
-        try {
-            StringBuffer pbsOutput = new StringBuffer("");
-            InputStream inputStream =  channel.getInputStream();
-            byte[] tmp = new byte[1024];
-            do {
-                while (inputStream.available() > 0) {
-                    int i = inputStream.read(tmp, 0, 1024);
-                    if (i < 0) break;
-                    pbsOutput.append(new String(tmp, 0, i));
-                }
-            } while (!channel.isClosed()) ;
-            String output = pbsOutput.toString();
-            this.setStdOutputString(output);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void exitCode(int code) {
-        System.out.println("Program exit code - " + code);
-        this.exitCode = code;
-    }
-
-    public int getExitCode() {
-        return exitCode;
-    }
-
-    public String getStdOutputString() {
-        return stdOutputString;
-    }
-
-    public void setStdOutputString(String stdOutputString) {
-        this.stdOutputString = stdOutputString;
+    @Override
+    public String getStdOut() {
+        return this.stdOut;
     }
 
-    public String getStdErrorString() {
-        return errorStream.toString();
+    @Override
+    public String getStdError() {
+        return this.stdError;
     }
 
-    public OutputStream getStandardError() {
-        return errorStream;
+    @Override
+    public Integer getExitCode() {
+        return this.exitCode;
     }
 
-    @Override
-    public String getStdOut() {
-        return null;
+    public void readStdOutFromStream(InputStream is) throws IOException {
+        StringWriter writer = new StringWriter();
+        IOUtils.copy(is, writer, "UTF-8");
+        this.stdOut = writer.toString();
     }
 
-    @Override
-    public String getStdError() {
-        return null;
+    public void readStdErrFromStream(InputStream is) throws IOException {
+        StringWriter writer = new StringWriter();
+        IOUtils.copy(is, writer, "UTF-8");
+        this.stdError = writer.toString();
     }
 
-    @Override
-    public String getExitCommand() {
-        return null;
+    public void setExitCode(Integer exitCode) {
+        this.exitCode = exitCode;
     }
 }
diff --git a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
index 3e24aaa..4b6e11e 100644
--- a/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
+++ b/modules/airavata-helix/task-api/src/main/java/org/apache/airavata/helix/task/api/support/AdaptorSupport.java
@@ -15,38 +15,6 @@ import java.io.File;
 public interface AdaptorSupport {
     public void initializeAdaptor();
 
-    public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws Exception;
+    public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws Exception;
 
-
-    /**
-     *
-     * @param command
-     * @param workingDirectory
-     * @param computeResourceId
-     * @param protocol
-     * @param authToken
-     * @throws Exception
-     */
-    public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws Exception;
-
-    /**
-     *
-     * @param path
-     * @param computeResourceId
-     * @param protocol
-     * @param authToken
-     * @throws Exception
-     */
-    public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws Exception;
-
-    /**
-     *
-     * @param sourceFile
-     * @param destinationFile
-     * @param computeResourceId
-     * @param protocol
-     * @param authToken
-     * @throws Exception
-     */
-    public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws Exception;
 }
diff --git a/modules/airavata-helix/task-core/pom.xml b/modules/airavata-helix/task-core/pom.xml
index df72dac..bf860f8 100644
--- a/modules/airavata-helix/task-core/pom.xml
+++ b/modules/airavata-helix/task-core/pom.xml
@@ -28,6 +28,11 @@
             <artifactId>agent-api</artifactId>
             <version>0.17-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>ssh-agent</artifactId>
+            <version>0.17-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <!--<build>
diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
index 87a1e17..a98b8f0 100644
--- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
+++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/support/AdaptorSupportImpl.java
@@ -1,6 +1,7 @@
 package org.apache.airavata.helix.core.support;
 
 import org.apache.airavata.agents.api.*;
+import org.apache.airavata.helix.agent.ssh.SshAgentAdaptor;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 
 import java.io.File;
@@ -29,19 +30,9 @@ public class AdaptorSupportImpl implements AdaptorSupport {
     public void initializeAdaptor() {
     }
 
-    public CommandOutput executeCommand(String command, String workingDirectory, String computeResourceId, String protocol, String authToken) throws AgentException {
-        return fetchAdaptor(computeResourceId, protocol, authToken).executeCommand(command, workingDirectory);
-    }
-
-    public void createDirectory(String path, String computeResourceId, String protocol, String authToken) throws AgentException {
-        fetchAdaptor(computeResourceId, protocol, authToken).createDirectory(path);
-    }
-
-    public void copyFile(String sourceFile, String destinationFile, String computeResourceId, String protocol, String authToken) throws AgentException {
-        fetchAdaptor(computeResourceId, protocol, authToken).copyFile(sourceFile, destinationFile);
-    }
-
-    public AgentAdaptor fetchAdaptor(String computeResource, String protocol, String authToken) throws AgentException {
-         return agentStore.fetchAdaptor(computeResource, protocol, authToken);
+    public AgentAdaptor fetchAdaptor(String gatewayId, String computeResource, String protocol, String authToken, String userId) throws AgentException {
+        SshAgentAdaptor agentAdaptor = new SshAgentAdaptor();
+        agentAdaptor.init(computeResource, gatewayId, userId, authToken);
+        return agentAdaptor;
     }
 }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
index f079b9f..cabc014 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java
@@ -24,9 +24,11 @@ public class EnvSetupTask extends AiravataTask {
         try {
             publishTaskState(TaskState.EXECUTING);
             AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                    getTaskContext().getGatewayId(),
                     getTaskContext().getComputeResourceId(),
                     getTaskContext().getJobSubmissionProtocol().name(),
-                    getTaskContext().getComputeResourceCredentialToken());
+                    getTaskContext().getComputeResourceCredentialToken(),
+                    getTaskContext().getComputeResourceLoginUserName());
 
             adaptor.createDirectory(workingDirectory);
             publishTaskState(TaskState.COMPLETED);
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 7de738e..f33d8a1 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -85,7 +85,7 @@ public class TaskContext {
      * Note: process context property use lazy loading approach. In runtime you will see some properties as null
      * unless you have access it previously. Once that property access using the api,it will be set to correct value.
      */
-    private TaskContext(String taskId, String processId, String gatewayId) {
+    private TaskContext(String processId, String gatewayId, String taskId) {
         this.processId = processId;
         this.gatewayId = gatewayId;
         this.taskId = taskId;
@@ -784,7 +784,12 @@ public class TaskContext {
             ctx.setGatewayResourceProfile(gatewayResourceProfile);
             ctx.setGatewayComputeResourcePreference(gatewayComputeResourcePreference);
             ctx.setGatewayStorageResourcePreference(gatewayStorageResourcePreference);
-
+            ctx.setApplicationDeploymentDescription(appCatalog.getApplicationDeployment()
+                    .getApplicationDeployement(processModel.getApplicationDeploymentId()));
+            ctx.setApplicationInterfaceDescription(appCatalog.getApplicationInterface()
+                    .getApplicationInterface(processModel.getApplicationInterfaceId()));
+            ctx.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource
+                    (ctx.getComputeResourceId()));
             return ctx;
         }
 
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
index 0b92922..16e8114 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java
@@ -2,17 +2,21 @@ package org.apache.airavata.helix.impl.task.submission;
 
 import groovy.text.GStringTemplateEngine;
 import groovy.text.TemplateEngine;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.helix.impl.task.TaskContext;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appdeployment.CommandObject;
 import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths;
-import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
 import org.apache.airavata.model.application.io.DataType;
 import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.model.process.ProcessModel;
 import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
 import org.apache.airavata.model.task.JobSubmissionTaskModel;
+import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -38,6 +42,8 @@ public class GroovyMapBuilder {
 
     public GroovyMapData build() throws Exception {
         GroovyMapData mapData = new GroovyMapData();
+
+        setMailAddresses(taskContext, mapData);
         mapData.setInputDir(taskContext.getInputDir());
         mapData.setOutputDir(taskContext.getOutputDir());
         mapData.setExecutablePath(taskContext.getApplicationDeploymentDescription().getExecutablePath());
@@ -51,6 +57,7 @@ public class GroovyMapBuilder {
         mapData.setAccountString(taskContext.getAllocationProjectNumber());
         mapData.setReservation(taskContext.getReservation());
         mapData.setJobName("A" + String.valueOf(generateJobName()));
+        mapData.setWorkingDirectory(taskContext.getWorkingDir());
 
         List<String> inputValues = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), true);
         inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), true));
@@ -332,4 +339,69 @@ public class GroovyMapBuilder {
         }
     }
 
+    private static void setMailAddresses(TaskContext taskContext, GroovyMapData groovyMap) throws AppCatalogException,
+            ApplicationSettingsException {
+
+        ProcessModel processModel =  taskContext.getProcessModel();
+        String emailIds = null;
+        if (isEmailBasedJobMonitor(taskContext)) {
+            emailIds = ServerSettings.getEmailBasedMonitorAddress();
+        }
+        if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+            String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+            if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+                if (emailIds != null && !emailIds.isEmpty()) {
+                    emailIds += ("," + userJobNotifEmailIds);
+                } else {
+                    emailIds = userJobNotifEmailIds;
+                }
+            }
+            if (processModel.isEnableEmailNotification()) {
+                List<String> emailList = processModel.getEmailAddresses();
+                String elist = listToCsv(emailList, ',');
+                if (elist != null && !elist.isEmpty()) {
+                    if (emailIds != null && !emailIds.isEmpty()) {
+                        emailIds = emailIds + "," + elist;
+                    } else {
+                        emailIds = elist;
+                    }
+                }
+            }
+        }
+        if (emailIds != null && !emailIds.isEmpty()) {
+            logger.info("Email list: " + emailIds);
+            groovyMap.setMailAddress(emailIds);
+        }
+    }
+
+    public static boolean isEmailBasedJobMonitor(TaskContext taskContext) throws AppCatalogException {
+        JobSubmissionProtocol jobSubmissionProtocol = taskContext.getPreferredJobSubmissionProtocol();
+        JobSubmissionInterface jobSubmissionInterface = taskContext.getPreferredJobSubmissionInterface();
+        if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+            String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId();
+            SSHJobSubmission sshJobSubmission = taskContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+            MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+            return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+        } else {
+            return false;
+        }
+    }
+
+    public static String listToCsv(List<String> listOfStrings, char separator) {
+        StringBuilder sb = new StringBuilder();
+
+        // all but last
+        for (int i = 0; i < listOfStrings.size() - 1; i++) {
+            sb.append(listOfStrings.get(i));
+            sb.append(separator);
+        }
+
+        // last string, no separator
+        if (listOfStrings.size() > 0) {
+            sb.append(listOfStrings.get(listOfStrings.size() - 1));
+        }
+
+        return sb.toString();
+    }
+
 }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
index 995f772..6ebde21 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapData.java
@@ -1,10 +1,11 @@
 package org.apache.airavata.helix.impl.task.submission;
 
-import com.google.common.collect.ImmutableMap;
 import groovy.lang.Writable;
 import groovy.text.GStringTemplateEngine;
 import groovy.text.TemplateEngine;
 import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.File;
 import java.lang.reflect.Field;
@@ -15,6 +16,8 @@ import java.util.Map;
 
 public class GroovyMapData {
 
+    private static final Logger logger = LogManager.getLogger(GroovyMapData.class);
+
     @ScriptTag(name = "inputDir")
     private String inputDir;
 
@@ -453,6 +456,11 @@ public class GroovyMapData {
         } catch (Exception e) {
             throw new Exception("Error while generating script using groovy map");
         }
+
+        if (logger.isTraceEnabled()) {
+            logger.trace("Groovy map as string for template " + templateName);
+            logger.trace(make.toString());
+        }
         return make.toString();
     }
 }
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
index fab4747..c85e18b 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java
@@ -50,9 +50,11 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
             if (mapData != null) {
                 //jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
                 AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                        getTaskContext().getGatewayId(),
                         getTaskContext().getComputeResourceId(),
                         getTaskContext().getJobSubmissionProtocol().name(),
-                        getTaskContext().getComputeResourceCredentialToken());
+                        getTaskContext().getComputeResourceCredentialToken(),
+                        getTaskContext().getComputeResourceLoginUserName());
 
                 JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
 
@@ -69,6 +71,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                         statusList.add(new JobStatus(JobState.FAILED));
                         statusList.get(0).setReason(submissionOutput.getFailureReason());
                         jobModel.setJobStatuses(statusList);
+                        jobModel.setJobDescription("Sample description");
                         saveJobModel(jobModel);
                         logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " +
                                 getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName());
@@ -83,6 +86,8 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
                         //taskStatus.setReason("Job submission command didn't return a jobId");
                         //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                         //taskContext.setTaskStatus(taskStatus);
+                        logger.error("Standard error message : " + submissionOutput.getStdErr());
+                        logger.error("Standard out message : " + submissionOutput.getStdOut());
                         return onFail("Job submission command didn't return a jobId", false, null);
 
                     } else {
@@ -116,6 +121,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask {
 
                     //TODO save task status??
                 } else if (jobId != null && !jobId.isEmpty()) {
+                    logger.info("Received job id " + jobId + " from compute resource");
                     jobModel.setJobId(jobId);
                     saveJobModel(jobModel);
                     JobStatus jobStatus = new JobStatus();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
index 58b70ef..2e4a052 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java
@@ -36,9 +36,11 @@ public class ForkJobSubmissionTask extends JobSubmissionTask {
             if (mapData != null) {
                 //jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
                 AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                        getTaskContext().getGatewayId(),
                         getTaskContext().getComputeResourceId(),
                         getTaskContext().getJobSubmissionProtocol().name(),
-                        getTaskContext().getComputeResourceCredentialToken());
+                        getTaskContext().getComputeResourceCredentialToken(),
+                        getTaskContext().getComputeResourceLoginUserName());
 
                 JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory());
 
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
index 11e59eb..1a024a7 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java
@@ -28,6 +28,8 @@ import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.File;
 import java.security.SecureRandom;
@@ -35,7 +37,7 @@ import java.util.*;
 
 public abstract class JobSubmissionTask extends AiravataTask {
 
-
+    private static final Logger logger = LogManager.getLogger(JobSubmissionTask.class);
 
     @Override
     public void init(HelixManager manager, String workflowName, String jobName, String taskName) {
@@ -52,10 +54,19 @@ public abstract class JobSubmissionTask extends AiravataTask {
         int number = new SecureRandom().nextInt();
         number = (number < 0 ? -number : number);
         File tempJobFile = new File(getLocalDataDir(), "job_" + Integer.toString(number) + jobManagerConfiguration.getScriptExtension());
+
         FileUtils.writeStringToFile(tempJobFile, scriptAsString);
+        logger.info("Job submission file for process " + getProcessId() + " was created at : " + tempJobFile.getAbsolutePath());
 
+        logger.info("Copying file form " + tempJobFile.getAbsolutePath() + " to remote path " + workingDirectory +
+                " of compute resource " + getTaskContext().getComputeResourceId());
+        agentAdaptor.copyFile(tempJobFile.getAbsolutePath(), workingDirectory);
         // TODO transfer file
         RawCommandInfo submitCommand = jobManagerConfiguration.getSubmitCommand(workingDirectory, tempJobFile.getPath());
+
+        logger.debug("Submit command for process id " + getProcessId() + " : " + submitCommand.getRawCommand());
+        logger.debug("Working directory for process id " + getProcessId() + " : " + workingDirectory);
+
         CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory);
 
         JobSubmissionOutput jsoutput = new JobSubmissionOutput();
diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
index 67ad0db..e3ae4fa 100644
--- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
+++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java
@@ -43,10 +43,12 @@ public class LocalJobSubmissionTask extends JobSubmissionTask {
                 saveJobModel(jobModel);
 
                 AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                        getTaskContext().getGatewayId(),
                         getTaskContext().getComputeResourceId(),
                         getTaskContext().getJobSubmissionProtocol().name(),
-                        getTaskContext().getComputeResourceCredentialToken());
-
+                        getTaskContext().getComputeResourceCredentialToken(),
+                        getTaskContext().getComputeResourceLoginUserName());
+                
                 GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build();
                 JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, groovyMapData.getWorkingDirectory());
 
diff --git a/modules/helix-spectator/src/main/resources/airavata-server.properties b/modules/helix-spectator/src/main/resources/airavata-server.properties
index 5f47d79..b54b28c 100644
--- a/modules/helix-spectator/src/main/resources/airavata-server.properties
+++ b/modules/helix-spectator/src/main/resources/airavata-server.properties
@@ -202,9 +202,9 @@ job.notification.flags=abe
 ###########################################################################
 # Credential Store module Configuration
 ###########################################################################
-credential.store.keystore.url=/home/pga/master-deployment/keystores/cred_store.jks
+credential.store.keystore.url=/Users/dimuthu/code/fork/airavata/modules/helix-spectator/src/main/resources/cred_store.jks
 credential.store.keystore.alias=seckey
-credential.store.keystore.password=123456
+credential.store.keystore.password=credstore123
 credential.store.jdbc.url=jdbc:mariadb://149.165.168.248:3306/credential_store
 credential.store.jdbc.user=eroma
 credential.store.jdbc.password=eroma123456
diff --git a/modules/helix-spectator/src/main/resources/log4j.properties b/modules/helix-spectator/src/main/resources/log4j.properties
index e910f32..69a4301 100644
--- a/modules/helix-spectator/src/main/resources/log4j.properties
+++ b/modules/helix-spectator/src/main/resources/log4j.properties
@@ -3,6 +3,8 @@ log4j.rootLogger=INFO, A1
 
 log4j.category.org.apache.helix=WARN
 log4j.category.org.apache.zookeeper=WARN
+log4j.category.org.apache.airavata.helix.impl.task.submission.GroovyMapData=TRACE
+log4j.category.org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask=DEBUG
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender
 

-- 
To stop receiving notification emails like this one, please contact
dimuthuupe@apache.org.