You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/10/12 17:12:21 UTC

svn commit: r1531540 - in /airavata/trunk/modules/gfac-core/src: main/java/org/apache/airavata/gfac/handler/ main/java/org/apache/airavata/gfac/notification/events/ main/java/org/apache/airavata/gfac/notification/listeners/ main/java/org/apache/airavat...

Author: lahiru
Date: Sat Oct 12 15:12:20 2013
New Revision: 1531540

URL: http://svn.apache.org/r1531540
Log:
adding ssh hpc support to gfac-core.

Added:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java
      - copied, changed from r1530411, airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java
Removed:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java
Modified:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
    airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java
    airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java Sat Oct 12 15:12:20 2013
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import com.sun.tools.javac.util.Paths;
 import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
 
 import org.apache.airavata.common.utils.StringUtil;
@@ -82,14 +83,14 @@ public class SCPInputHandler implements 
         SSHSecurityContext securityContext = (SSHSecurityContext)context.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
         Cluster pbsCluster = securityContext.getPbsCluster();
         ApplicationDeploymentDescriptionType app = context.getApplicationContext().getApplicationDeploymentDescription().getType();
-        String remoteFile = app.getInputDataDirectory() + File.separatorChar + paramValue;
-
+        int i = paramValue.lastIndexOf(File.separator);
+        String substring = paramValue.substring(i + 1);
         try {
-            return pbsCluster.scpTo(app.getInputDataDirectory(), remoteFile);
+            String targetFile = app.getInputDataDirectory()+ File.separator + substring;
+            pbsCluster.scpTo(targetFile, paramValue);
+            return targetFile;
         } catch (SSHApiException e) {
             throw new GFacHandlerException("Error while input File Staging", context, e, e.getLocalizedMessage());
-        } finally {
-            return null;
         }
     }
 

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java Sat Oct 12 15:12:20 2013
@@ -22,65 +22,80 @@ package org.apache.airavata.gfac.handler
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.net.URISyntaxException;
+import java.util.*;
 
 import net.schmizz.sshj.connection.ConnectionException;
 import net.schmizz.sshj.transport.TransportException;
 
+import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
 import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gfac.utils.OutputUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
 import org.apache.xmlbeans.XmlException;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SCPOutputHandler implements GFacHandler{
+    private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
 
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
 
-	public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-		ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
-				.getApplicationDeploymentDescription().getType();
-		try {
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription().getType();
+        try {
             SSHSecurityContext securityContext = (SSHSecurityContext) jobExecutionContext
-					.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
+                    .getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
             Cluster pbsCluster = securityContext.getPbsCluster();
 
-			// Get the Stdouts and StdErrs
-			String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
-			File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
-			File localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
-
-			pbsCluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
-			pbsCluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
-
-			String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
-			String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
-			Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
-			Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
-			stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
-			if (stringMap == null || stringMap.isEmpty()) {
-				throw new GFacHandlerException(
-						"Empty Output returned from the Application, Double check the application"
-								+ "and ApplicationDescriptor output Parameter Names");
-			}
-
-		} catch (XmlException e) {
-			throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
-		} catch (ConnectionException e) {
-			throw new GFacHandlerException(e.getMessage(), e);
-		} catch (TransportException e) {
-			throw new GFacHandlerException(e.getMessage(), e);
-		} catch (IOException e) {
-			throw new GFacHandlerException(e.getMessage(), e);
-		} catch (Exception e) {
-			throw new GFacHandlerException("Error in retrieving results", e);
-		}
+            // Get the Stdouts and StdErrs
+            String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
+            File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
+            File localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
+
+            log.info("Downloading file : " + app.getStandardError() + " to : " + localStdErrFile.getAbsolutePath());
+            pbsCluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
+            log.info("Downloading file : " + app.getStandardOutput() + " to : " + localStdOutFile.getAbsolutePath());
+            pbsCluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
+
+            String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+            String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+
+            Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+            Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+            stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+            if (stringMap == null || stringMap.isEmpty()) {
+                throw new GFacHandlerException(
+                        "Empty Output returned from the Application, Double check the application"
+                                + "and ApplicationDescriptor output Parameter Names");
+            }
+        } catch (XmlException e) {
+            throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
+        } catch (ConnectionException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (TransportException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (IOException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (Exception e) {
+            throw new GFacHandlerException("Error in retrieving results", e);
+        }
 
-	}
+    }
 
     public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
 

Copied: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java (from r1530411, airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java)
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java?p2=airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java&p1=airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java&r1=1530411&r2=1531540&rev=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java Sat Oct 12 15:12:20 2013
@@ -21,12 +21,12 @@
 
 package org.apache.airavata.gfac.notification.events;
 
-public class GramJobIDEvent extends GFacEvent {
+public class JobIDEvent extends GFacEvent {
 	String statusMessage;
 
-	public GramJobIDEvent(String message) {
+	public JobIDEvent(String message) {
 		statusMessage = message;
-		this.eventType = GramJobIDEvent.class.getSimpleName();
+		this.eventType = JobIDEvent.class.getSimpleName();
 	}
 
 	public String getStatusMessage() {

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java?rev=1531540&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java Sat Oct 12 15:12:20 2013
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.notification.listeners;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
+
+public class GSISSHJobSubmissionListener extends JobSubmissionListener {
+
+    JobExecutionContext context;
+
+    public GSISSHJobSubmissionListener(JobExecutionContext context) {
+        this.context = context;
+    }
+
+    public void statusChanged(JobDescriptor jobDescriptor) throws SSHApiException {
+        this.context.getNotifier().publish(new StatusChangeEvent("Job status has changed to : " + jobDescriptor.getStatus()));
+    }
+
+    @Override
+    public void statusChanged(JobStatus jobStatus) throws SSHApiException {
+        this.context.getNotifier().publish(new StatusChangeEvent("Job status has changed to : " + jobStatus.toString()));
+    }
+
+    @Override
+    public boolean isJobDone() throws SSHApiException {
+        return getJobStatus().equals(JobStatus.C);
+    }
+
+    public void setContext(JobExecutionContext context) {
+        this.context = context;
+    }
+}

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java Sat Oct 12 15:12:20 2013
@@ -22,41 +22,50 @@
 package org.apache.airavata.gfac.provider;
 
 import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class GFacProviderException extends Exception {
+    private static final Logger log = LoggerFactory.getLogger(GFacProviderException.class);
+
     private String aditionalInfo[] = null;
 
     public GFacProviderException(String message) {
         super(message);
+        log.error(message);
     }
 
     public GFacProviderException(String message, Throwable cause) {
         super(message, cause);
+        log.error(message);
     }
 
-    public GFacProviderException(String message, Throwable cause,JobExecutionContext context) {
+    public GFacProviderException(String message, Throwable cause, JobExecutionContext context) {
         super(message, cause);
-        sendFaultNotification(message,context,new Exception(cause));
+        sendFaultNotification(message, context, new Exception(cause));
+        log.error(message);
     }
 
     public GFacProviderException(String message, JobExecutionContext context) {
         super(message);
-        sendFaultNotification(message,context,new Exception(message));
+        sendFaultNotification(message, context, new Exception(message));
+        log.error(message);
     }
 
-    public GFacProviderException(String message, JobExecutionContext context,Exception e,String... additionExceptiondata) {
+    public GFacProviderException(String message, JobExecutionContext context, Exception e, String... additionExceptiondata) {
         super(message);
         this.aditionalInfo = additionExceptiondata;
-        sendFaultNotification(message,context,e, additionExceptiondata);
+        sendFaultNotification(message, context, e, additionExceptiondata);
+        log.error(message);
     }
 
     private void sendFaultNotification(String message,
-			JobExecutionContext executionContext, Exception e,
-			String... additionalExceptiondata) {
-		if (additionalExceptiondata==null || additionalExceptiondata.length==0){
-        	additionalExceptiondata=new String[]{message,e.getLocalizedMessage()};
+                                       JobExecutionContext executionContext, Exception e,
+                                       String... additionalExceptiondata) {
+        if (additionalExceptiondata == null || additionalExceptiondata.length == 0) {
+            additionalExceptiondata = new String[]{message, e.getLocalizedMessage()};
         }
-	}
+    }
 
     public String[] getAditionalInfo() {
         return aditionalInfo;

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java Sat Oct 12 15:12:20 2013
@@ -27,13 +27,18 @@ import org.apache.airavata.gfac.GFacExce
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.MessageContext;
 import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
 import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.notification.listeners.GSISSHJobSubmissionListener;
 import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.DefaultJobSubmissionListener;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
 import org.apache.airavata.schemas.gfac.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,8 +57,9 @@ public class GSISSHProvider implements G
     }
 
     public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        log.info("Invoking GSISSH Provider Invoke ...");
         jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
-        GsisshHostType host = (GsisshHostType) jobExecutionContext.getApplicationContext().
+        HostDescriptionType host = jobExecutionContext.getApplicationContext().
                 getHostDescription().getType();
         HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
                 getApplicationDeploymentDescription().getType();
@@ -76,10 +82,10 @@ public class GSISSHProvider implements G
             jobDescriptor.setProcessesPerNode(app.getProcessorsPerNode());
             jobDescriptor.setMaxWallTime(String.valueOf(app.getMaxWallTime()));
             jobDescriptor.setJobSubmitter(app.getJobSubmitterCommand());
-            if (app.getProjectAccount() != null) {
+            if (app.getProjectAccount().getProjectAccountNumber() != null) {
                 jobDescriptor.setAcountString(app.getProjectAccount().getProjectAccountNumber());
             }
-            if (app.getQueue() != null) {
+            if (app.getQueue().getQueueName() != null) {
                 jobDescriptor.setQueueName(app.getQueue().getQueueName());
             }
             jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
@@ -108,8 +114,64 @@ public class GSISSHProvider implements G
             }
             jobDescriptor.setInputValues(inputValues);
 
-            System.out.println(jobDescriptor.toXML());
-            cluster.submitBatchJob(jobDescriptor);
+            log.info(jobDescriptor.toXML());
+            String jobID = cluster.submitBatchJob(jobDescriptor);
+            log.info("Job Submitted successfully and returned Job ID: " + jobID);
+            jobExecutionContext.getNotifier().publish(new JobIDEvent(jobID));
+
+            JobSubmissionListener listener = new GSISSHJobSubmissionListener(jobExecutionContext);
+            try {
+//            // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
+//            // to configure this.
+//            Thread.sleep(5000);
+//        } catch (InterruptedException e) {
+//            log.error("Error during job status monitoring");
+//            throw new SSHApiException("Error during job status monitoring", e);
+//        }
+//        // Get the job status first
+//        try {
+////
+////            Thread t = new Thread() {
+////                @Override
+////                public void run() {
+////                    try {
+                // p
+                JobStatus jobStatus = cluster.getJobStatus(jobID);
+                listener.statusChanged(jobStatus);
+                while (true) {
+                    while (!jobStatus.equals(JobStatus.C)) {
+                        if (!jobStatus.equals(listener.getJobStatus().toString())) {
+                            listener.setJobStatus(jobStatus);
+                            listener.statusChanged(jobStatus);
+                        }
+                        Thread.sleep(60000);
+
+                        jobStatus = cluster.getJobStatus(jobID);
+                    }
+                    //Set the job status to Complete
+                    listener.setJobStatus(JobStatus.C);
+                    listener.statusChanged(jobStatus);
+                    break;
+                }
+//                    } catch (InterruptedException e) {
+//                        log.error("Error listening to the submitted job", e);
+//                    } catch (SSHApiException e) {
+//                        log.error("Error listening to the submitted job", e);
+//                    }
+//                }
+//            };
+                //  This thread runs until the program termination, so that use can provide
+//            // any action in onChange method of the listener, without worrying for waiting in the caller thread.
+                //t.setDaemon(true);
+//            t.start();
+            } catch (Exception e) {
+                String error = "Error during job status monitoring";
+                log.error(error);
+                throw new GFacProviderException(error, e);
+            }
+            while (!listener.isJobDone()) {
+                Thread.sleep(10000);
+            }
         } catch (SSHApiException e) {
             String error = "Error submitting the job to host " + host.getHostAddress() + e.getMessage();
             log.error(error);

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java Sat Oct 12 15:12:20 2013
@@ -33,7 +33,7 @@ import org.apache.airavata.gfac.GFacExce
 import org.apache.airavata.gfac.JobSubmissionFault;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.notification.events.GramJobIDEvent;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
 import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
@@ -191,7 +191,7 @@ public class GramProvider implements GFa
                 applicationSaved=true;
                 String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
                 log.info(jobStatusMessage);
-                jobExecutionContext.getNotifier().publish(new GramJobIDEvent(jobStatusMessage));
+                jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
 
                 log.info("JobID = " + job.getIDAsString());
 
@@ -226,7 +226,7 @@ public class GramProvider implements GFa
 
                 jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
                 log.info(jobStatusMessage);
-                jobExecutionContext.getNotifier().publish(new GramJobIDEvent(jobStatusMessage));
+                jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
 
             } catch (GSSException e) {
                 // Renew credentials and re-submit
@@ -258,7 +258,7 @@ public class GramProvider implements GFa
 
             String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
             log.info(jobStatusMessage);
-            jobExecutionContext.getNotifier().publish(new GramJobIDEvent(jobStatusMessage));
+            jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
 
         }
 

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java Sat Oct 12 15:12:20 2013
@@ -63,78 +63,80 @@ import sun.reflect.generics.reflectiveOb
  * Execute application using remote SSH
  */
 public class SSHProvider implements GFacProvider {
-	private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
-	private SSHSecurityContext securityContext;
-	private String jobID=null;
-
-	public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
-        if(!((SSHHostType)jobExecutionContext.getApplicationContext().getHostDescription()).getHpcResource()){
-		jobID="SSH_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
-		
-		securityContext = (SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
-		ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-		String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
-		saveApplicationJob(jobExecutionContext, remoteFile);
-		log.info(remoteFile);
-		try {
-			File runscript = createShellScript(jobExecutionContext);
-			SCPFileTransfer fileTransfer = securityContext.getSSHClient().newSCPFileTransfer();
-			GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.STAGING);
-			fileTransfer.upload(runscript.getAbsolutePath(), remoteFile);
-		} catch (IOException e) {
-			throw new GFacProviderException(e.getLocalizedMessage(), e);
-		}
+    private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+    private SSHSecurityContext securityContext;
+    private String jobID = null;
+
+    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+        if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+            jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+
+            securityContext = (SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
+            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+            String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+            saveApplicationJob(jobExecutionContext, remoteFile);
+            log.info(remoteFile);
+            try {
+                File runscript = createShellScript(jobExecutionContext);
+                SCPFileTransfer fileTransfer = securityContext.getSSHClient().newSCPFileTransfer();
+                GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.STAGING);
+                fileTransfer.upload(runscript.getAbsolutePath(), remoteFile);
+            } catch (IOException e) {
+                throw new GFacProviderException(e.getLocalizedMessage(), e);
+            }
         }
-	}
+    }
 
-	private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
-		ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
-		job.setJobId(jobID);
-		job.setStatus(ApplicationJobStatus.INITIALIZE);
-		job.setSubmittedTime(Calendar.getInstance().getTime());
-		job.setStatusUpdateTime(job.getSubmittedTime());
-		job.setJobData(executableName);
-		GFacUtils.recordApplicationJob(jobExecutionContext, job);
-	}
-
-	public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-        if(((SSHHostType)jobExecutionContext.getApplicationContext().getHostDescription()).getHpcResource()){
-		ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
-		Session session = null;
-		try {
-			session = securityContext.getSession(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress());
-			/*
-			 * Execute
-			 */
-			String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
-			GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.SUBMITTED);
-			Command cmd = session.exec("/bin/chmod 755 " + execuable + "; " + execuable);
-			GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.RESULTS_RETRIEVE);
-			log.info("stdout=" + GFacUtils.readFromStream(session.getInputStream()));
-			cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT, TimeUnit.SECONDS);
-
-			/*
-			 * check return value. usually not very helpful to draw conclusions
-			 * based on return values so don't bother. just provide warning in
-			 * the log messages
-			 */
-			if (cmd.getExitStatus() != 0) {
-				log.error("Process finished with non zero return value. Process may have failed");
-			} else {
-				log.info("Process finished with return value of zero.");
-			}
-			
-			GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.FINISHED);
-		} catch (ConnectionException e) {
-			throw new GFacProviderException(e.getMessage(), e);
-		} catch (TransportException e) {
-			throw new GFacProviderException(e.getMessage(), e);
-		} catch (IOException e) {
-			throw new GFacProviderException(e.getMessage(), e);
-		}finally{
-			securityContext.closeSession(session);
-		}
-        }else {
+    private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
+        ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
+        job.setJobId(jobID);
+        job.setStatus(ApplicationJobStatus.INITIALIZE);
+        job.setSubmittedTime(Calendar.getInstance().getTime());
+        job.setStatusUpdateTime(job.getSubmittedTime());
+        job.setJobData(executableName);
+        GFacUtils.recordApplicationJob(jobExecutionContext, job);
+    }
+
+    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+        if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+            ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+            Session session = null;
+            try {
+                session = securityContext.getSession(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress());
+                /*
+                 * Execute
+                 */
+                String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+                GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.SUBMITTED);
+                Command cmd = session.exec("/bin/chmod 755 " + execuable + "; " + execuable);
+                GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.RESULTS_RETRIEVE);
+                log.info("stdout=" + GFacUtils.readFromStream(session.getInputStream()));
+                cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT, TimeUnit.SECONDS);
+
+                /*
+                 * check return value. usually not very helpful to draw conclusions
+                 * based on return values so don't bother. just provide warning in
+                 * the log messages
+                 */
+                if (cmd.getExitStatus() != 0) {
+                    log.error("Process finished with non zero return value. Process may have failed");
+                } else {
+                    log.info("Process finished with return value of zero.");
+                }
+
+                GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.FINISHED);
+            } catch (ConnectionException e) {
+                throw new GFacProviderException(e.getMessage(), e);
+            } catch (TransportException e) {
+                throw new GFacProviderException(e.getMessage(), e);
+            } catch (IOException e) {
+                throw new GFacProviderException(e.getMessage(), e);
+            } finally {
+                if (securityContext != null) {
+                    securityContext.closeSession(session);
+                }
+            }
+        } else {
             GSISSHProvider gsisshProvider = new GSISSHProvider();
             try {
                 gsisshProvider.execute(jobExecutionContext);
@@ -143,10 +145,10 @@ public class SSHProvider implements GFac
             }
         }
 
-	}
+    }
 
-	public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
-	}
+    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+    }
 
     @Override
     public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
@@ -155,80 +157,81 @@ public class SSHProvider implements GFac
 
 
     private File createShellScript(JobExecutionContext context) throws IOException {
-		ApplicationDeploymentDescriptionType app = context.getApplicationContext()
-				.getApplicationDeploymentDescription().getType();
-		String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
-				+ new Random().nextLong();
-
-		File shellScript = File.createTempFile(uniqueDir, "sh");
-		OutputStream out = new FileOutputStream(shellScript);
-
-		out.write("#!/bin/bash\n".getBytes());
-		out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
-		out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
-		out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
-				.getBytes());
-		// get the env of the host and the application
-		NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
-		Map<String, String> nv = new HashMap<String, String>();
-		if (env != null) {
-			for (int i = 0; i < env.length; i++) {
-				String key = env[i].getName();
-				String value = env[i].getValue();
-				nv.put(key, value);
-			}
-		}
-		for (Entry<String, String> entry : nv.entrySet()) {
-			log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
-			out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-
-		}
-
-		// prepare the command
-		final String SPACE = " ";
-		StringBuffer cmd = new StringBuffer();
-		cmd.append(app.getExecutableLocation());
-		cmd.append(SPACE);
-
-		MessageContext input = context.getInMessageContext();
-		;
-		Map<String, Object> inputs = input.getParameters();
-		Set<String> keys = inputs.keySet();
-		for (String paramName : keys) {
-			ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
-			if ("URIArray".equals(actualParameter.getType().getType().toString())) {
-				String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
-				for (String value : values) {
-					cmd.append(value);
-					cmd.append(SPACE);
-				}
-			} else {
-				String paramValue = MappingFactory.toString(actualParameter);
-				cmd.append(paramValue);
-				cmd.append(SPACE);
-			}
-		}
-		// We redirect the error and stdout to remote files, they will be read
-		// in later
-		cmd.append(SPACE);
-		cmd.append("1>");
-		cmd.append(SPACE);
-		cmd.append(app.getStandardOutput());
-		cmd.append(SPACE);
-		cmd.append("2>");
-		cmd.append(SPACE);
-		cmd.append(app.getStandardError());
-
-		String cmdStr = cmd.toString();
-		log.info("Command = " + cmdStr);
-		out.write((cmdStr + "\n").getBytes());
-		String message = "\"execuationSuceeded\"";
-		out.write(("echo " + message + "\n").getBytes());
-		out.close();
+        ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+                .getApplicationDeploymentDescription().getType();
+        String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+                + new Random().nextLong();
+
+        File shellScript = File.createTempFile(uniqueDir, "sh");
+        OutputStream out = new FileOutputStream(shellScript);
+
+        out.write("#!/bin/bash\n".getBytes());
+        out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+        out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+        out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+                .getBytes());
+        // get the env of the host and the application
+        NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+        Map<String, String> nv = new HashMap<String, String>();
+        if (env != null) {
+            for (int i = 0; i < env.length; i++) {
+                String key = env[i].getName();
+                String value = env[i].getValue();
+                nv.put(key, value);
+            }
+        }
+        for (Entry<String, String> entry : nv.entrySet()) {
+            log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+            out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+        }
+
+        // prepare the command
+        final String SPACE = " ";
+        StringBuffer cmd = new StringBuffer();
+        cmd.append(app.getExecutableLocation());
+        cmd.append(SPACE);
+
+        MessageContext input = context.getInMessageContext();
+        ;
+        Map<String, Object> inputs = input.getParameters();
+        Set<String> keys = inputs.keySet();
+        for (String paramName : keys) {
+            ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+            if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+                String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+                for (String value : values) {
+                    cmd.append(value);
+                    cmd.append(SPACE);
+                }
+            } else {
+                String paramValue = MappingFactory.toString(actualParameter);
+                cmd.append(paramValue);
+                cmd.append(SPACE);
+            }
+        }
+        // We redirect the error and stdout to remote files, they will be read
+        // in later
+        cmd.append(SPACE);
+        cmd.append("1>");
+        cmd.append(SPACE);
+        cmd.append(app.getStandardOutput());
+        cmd.append(SPACE);
+        cmd.append("2>");
+        cmd.append(SPACE);
+        cmd.append(app.getStandardError());
+
+        String cmdStr = cmd.toString();
+        log.info("Command = " + cmdStr);
+        out.write((cmdStr + "\n").getBytes());
+        String message = "\"execuationSuceeded\"";
+        out.write(("echo " + message + "\n").getBytes());
+        out.close();
+
+        return shellScript;
+    }
 
-		return shellScript;
-	}
     public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
 
     }

Modified: airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java (original)
+++ airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java Sat Oct 12 15:12:20 2013
@@ -54,11 +54,18 @@ public class BigRed2Test {
 
     private static final String hostAddress = "bigred2";
     private static final String hostName = "bigred2.uits.iu.edu";
-    private static final String userName = "lginnali";
-    private static final String password = "";
+    private static  String userName = "lginnali";
+    private static  String password = "";
 
     @Before
     public void setUp() throws Exception {
+
+        if(System.getProperty("bigred2.password") == null || System.getProperty("bigred2.username") == null){
+            System.out.println("set the bigred2 password/username in maven command : mvn clean install -Dbigred2.username=xxx -Dbigred2.password=yyy");
+            throw new Exception("Wrong inputs given");
+        }
+        userName = System.getProperty("bigred2.username");
+        password = System.getProperty("bigred2.password");
         URL resource = GramProviderTest.class.getClassLoader().getResource("gfac-config.xml");
         assert resource != null;
         System.out.println(resource.getFile());
@@ -76,9 +83,10 @@ public class BigRed2Test {
         /*
         * Host
         */
-        HostDescription host = new HostDescription(GsisshHostType.type);
+        HostDescription host = new HostDescription(SSHHostType.type);
         host.getType().setHostAddress(hostAddress);
         host.getType().setHostName(hostName);
+        ((SSHHostType)host.getType()).setHpcResource(true);
         /*
         * App
         */

Modified: airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties (original)
+++ airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties Sat Oct 12 15:12:20 2013
@@ -202,7 +202,14 @@ broker.delivery.method=serial
 
 #
 # Advanced Message Box Configurations
-# 
+#
+#trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
+public.ssh.key=/Users/lahirugunathilake/.ssh/id_dsa.pub
+# SSH PKI key pair is SSH based authentication is used.
+ssh.key=/home/user/.ssh/id_rsa
+ssh.username=lginnali
+private.ssh.key=/Users/lahirugunathilake/.ssh/id_dsa
+ssh.password=
 msgBox.usedatabase=true
 messagePreservationDays=2
 messagePreservationHours=0