You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/10/12 17:12:21 UTC
svn commit: r1531540 - in /airavata/trunk/modules/gfac-core/src:
main/java/org/apache/airavata/gfac/handler/
main/java/org/apache/airavata/gfac/notification/events/
main/java/org/apache/airavata/gfac/notification/listeners/
main/java/org/apache/airavat...
Author: lahiru
Date: Sat Oct 12 15:12:20 2013
New Revision: 1531540
URL: http://svn.apache.org/r1531540
Log:
adding ssh hpc support to gfac-core.
Added:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java
- copied, changed from r1530411, airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java
Removed:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java
airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPInputHandler.java Sat Oct 12 15:12:20 2013
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
+import com.sun.tools.javac.util.Paths;
import net.schmizz.sshj.xfer.scp.SCPFileTransfer;
import org.apache.airavata.common.utils.StringUtil;
@@ -82,14 +83,14 @@ public class SCPInputHandler implements
SSHSecurityContext securityContext = (SSHSecurityContext)context.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
Cluster pbsCluster = securityContext.getPbsCluster();
ApplicationDeploymentDescriptionType app = context.getApplicationContext().getApplicationDeploymentDescription().getType();
- String remoteFile = app.getInputDataDirectory() + File.separatorChar + paramValue;
-
+ int i = paramValue.lastIndexOf(File.separator);
+ String substring = paramValue.substring(i + 1);
try {
- return pbsCluster.scpTo(app.getInputDataDirectory(), remoteFile);
+ String targetFile = app.getInputDataDirectory()+ File.separator + substring;
+ pbsCluster.scpTo(targetFile, paramValue);
+ return targetFile;
} catch (SSHApiException e) {
throw new GFacHandlerException("Error while input File Staging", context, e, e.getLocalizedMessage());
- } finally {
- return null;
}
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/handler/SCPOutputHandler.java Sat Oct 12 15:12:20 2013
@@ -22,65 +22,80 @@ package org.apache.airavata.gfac.handler
import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.net.URISyntaxException;
+import java.util.*;
import net.schmizz.sshj.connection.ConnectionException;
import net.schmizz.sshj.transport.TransportException;
+import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.ToolsException;
import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.OutputUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
import org.apache.xmlbeans.XmlException;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SCPOutputHandler implements GFacHandler{
+ private static final Logger log = LoggerFactory.getLogger(SCPOutputHandler.class);
+ public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- try {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ try {
SSHSecurityContext securityContext = (SSHSecurityContext) jobExecutionContext
- .getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
+ .getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
Cluster pbsCluster = securityContext.getPbsCluster();
- // Get the Stdouts and StdErrs
- String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
- File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
- File localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
-
- pbsCluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
- pbsCluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
-
- String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
- String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
- Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
- Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
- stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
- if (stringMap == null || stringMap.isEmpty()) {
- throw new GFacHandlerException(
- "Empty Output returned from the Application, Double check the application"
- + "and ApplicationDescriptor output Parameter Names");
- }
-
- } catch (XmlException e) {
- throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
- } catch (ConnectionException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (TransportException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (IOException e) {
- throw new GFacHandlerException(e.getMessage(), e);
- } catch (Exception e) {
- throw new GFacHandlerException("Error in retrieving results", e);
- }
+ // Get the Stdouts and StdErrs
+ String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
+ File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout");
+ File localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr");
+
+ log.info("Downloading file : " + app.getStandardError() + " to : " + localStdErrFile.getAbsolutePath());
+ pbsCluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
+ log.info("Downloading file : " + app.getStandardOutput() + " to : " + localStdOutFile.getAbsolutePath());
+ pbsCluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
+
+ String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+ String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+
+ Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+ Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+ stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+ if (stringMap == null || stringMap.isEmpty()) {
+ throw new GFacHandlerException(
+ "Empty Output returned from the Application, Double check the application"
+ + "and ApplicationDescriptor output Parameter Names");
+ }
+ } catch (XmlException e) {
+ throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
+ } catch (ConnectionException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (TransportException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (IOException e) {
+ throw new GFacHandlerException(e.getMessage(), e);
+ } catch (Exception e) {
+ throw new GFacHandlerException("Error in retrieving results", e);
+ }
- }
+ }
public void initProperties(Map<String, String> properties) throws GFacHandlerException, GFacException {
Copied: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java (from r1530411, airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java)
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java?p2=airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java&p1=airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java&r1=1530411&r2=1531540&rev=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/GramJobIDEvent.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/events/JobIDEvent.java Sat Oct 12 15:12:20 2013
@@ -21,12 +21,12 @@
package org.apache.airavata.gfac.notification.events;
-public class GramJobIDEvent extends GFacEvent {
+public class JobIDEvent extends GFacEvent {
String statusMessage;
- public GramJobIDEvent(String message) {
+ public JobIDEvent(String message) {
statusMessage = message;
- this.eventType = GramJobIDEvent.class.getSimpleName();
+ this.eventType = JobIDEvent.class.getSimpleName();
}
public String getStatusMessage() {
Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java?rev=1531540&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/notification/listeners/GSISSHJobSubmissionListener.java Sat Oct 12 15:12:20 2013
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.notification.listeners;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.StatusChangeEvent;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
+
+public class GSISSHJobSubmissionListener extends JobSubmissionListener {
+
+ JobExecutionContext context;
+
+ public GSISSHJobSubmissionListener(JobExecutionContext context) {
+ this.context = context;
+ }
+
+ public void statusChanged(JobDescriptor jobDescriptor) throws SSHApiException {
+ this.context.getNotifier().publish(new StatusChangeEvent("Job status has changed to : " + jobDescriptor.getStatus()));
+ }
+
+ @Override
+ public void statusChanged(JobStatus jobStatus) throws SSHApiException {
+ this.context.getNotifier().publish(new StatusChangeEvent("Job status has changed to : " + jobStatus.toString()));
+ }
+
+ @Override
+ public boolean isJobDone() throws SSHApiException {
+ return getJobStatus().equals(JobStatus.C);
+ }
+
+ public void setContext(JobExecutionContext context) {
+ this.context = context;
+ }
+}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java Sat Oct 12 15:12:20 2013
@@ -22,41 +22,50 @@
package org.apache.airavata.gfac.provider;
import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GFacProviderException extends Exception {
+ private static final Logger log = LoggerFactory.getLogger(GFacProviderException.class);
+
private String aditionalInfo[] = null;
public GFacProviderException(String message) {
super(message);
+ log.error(message);
}
public GFacProviderException(String message, Throwable cause) {
super(message, cause);
+ log.error(message);
}
- public GFacProviderException(String message, Throwable cause,JobExecutionContext context) {
+ public GFacProviderException(String message, Throwable cause, JobExecutionContext context) {
super(message, cause);
- sendFaultNotification(message,context,new Exception(cause));
+ sendFaultNotification(message, context, new Exception(cause));
+ log.error(message);
}
public GFacProviderException(String message, JobExecutionContext context) {
super(message);
- sendFaultNotification(message,context,new Exception(message));
+ sendFaultNotification(message, context, new Exception(message));
+ log.error(message);
}
- public GFacProviderException(String message, JobExecutionContext context,Exception e,String... additionExceptiondata) {
+ public GFacProviderException(String message, JobExecutionContext context, Exception e, String... additionExceptiondata) {
super(message);
this.aditionalInfo = additionExceptiondata;
- sendFaultNotification(message,context,e, additionExceptiondata);
+ sendFaultNotification(message, context, e, additionExceptiondata);
+ log.error(message);
}
private void sendFaultNotification(String message,
- JobExecutionContext executionContext, Exception e,
- String... additionalExceptiondata) {
- if (additionalExceptiondata==null || additionalExceptiondata.length==0){
- additionalExceptiondata=new String[]{message,e.getLocalizedMessage()};
+ JobExecutionContext executionContext, Exception e,
+ String... additionalExceptiondata) {
+ if (additionalExceptiondata == null || additionalExceptiondata.length == 0) {
+ additionalExceptiondata = new String[]{message, e.getLocalizedMessage()};
}
- }
+ }
public String[] getAditionalInfo() {
return aditionalInfo;
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java Sat Oct 12 15:12:20 2013
@@ -27,13 +27,18 @@ import org.apache.airavata.gfac.GFacExce
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.notification.listeners.GSISSHJobSubmissionListener;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.DefaultJobSubmissionListener;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.listener.JobSubmissionListener;
import org.apache.airavata.schemas.gfac.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +57,9 @@ public class GSISSHProvider implements G
}
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ log.info("Invoking GSISSH Provider Invoke ...");
jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
- GsisshHostType host = (GsisshHostType) jobExecutionContext.getApplicationContext().
+ HostDescriptionType host = jobExecutionContext.getApplicationContext().
getHostDescription().getType();
HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
getApplicationDeploymentDescription().getType();
@@ -76,10 +82,10 @@ public class GSISSHProvider implements G
jobDescriptor.setProcessesPerNode(app.getProcessorsPerNode());
jobDescriptor.setMaxWallTime(String.valueOf(app.getMaxWallTime()));
jobDescriptor.setJobSubmitter(app.getJobSubmitterCommand());
- if (app.getProjectAccount() != null) {
+ if (app.getProjectAccount().getProjectAccountNumber() != null) {
jobDescriptor.setAcountString(app.getProjectAccount().getProjectAccountNumber());
}
- if (app.getQueue() != null) {
+ if (app.getQueue().getQueueName() != null) {
jobDescriptor.setQueueName(app.getQueue().getQueueName());
}
jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName());
@@ -108,8 +114,64 @@ public class GSISSHProvider implements G
}
jobDescriptor.setInputValues(inputValues);
- System.out.println(jobDescriptor.toXML());
- cluster.submitBatchJob(jobDescriptor);
+ log.info(jobDescriptor.toXML());
+ String jobID = cluster.submitBatchJob(jobDescriptor);
+ log.info("Job Submitted successfully and returned Job ID: " + jobID);
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobID));
+
+ JobSubmissionListener listener = new GSISSHJobSubmissionListener(jobExecutionContext);
+ try {
+// // Wait 5 seconds to start the first poll, this is hard coded, user doesn't have
+// // to configure this.
+// Thread.sleep(5000);
+// } catch (InterruptedException e) {
+// log.error("Error during job status monitoring");
+// throw new SSHApiException("Error during job status monitoring", e);
+// }
+// // Get the job status first
+// try {
+////
+//// Thread t = new Thread() {
+//// @Override
+//// public void run() {
+//// try {
+ // p
+ JobStatus jobStatus = cluster.getJobStatus(jobID);
+ listener.statusChanged(jobStatus);
+ while (true) {
+ while (!jobStatus.equals(JobStatus.C)) {
+ if (!jobStatus.equals(listener.getJobStatus().toString())) {
+ listener.setJobStatus(jobStatus);
+ listener.statusChanged(jobStatus);
+ }
+ Thread.sleep(60000);
+
+ jobStatus = cluster.getJobStatus(jobID);
+ }
+ //Set the job status to Complete
+ listener.setJobStatus(JobStatus.C);
+ listener.statusChanged(jobStatus);
+ break;
+ }
+// } catch (InterruptedException e) {
+// log.error("Error listening to the submitted job", e);
+// } catch (SSHApiException e) {
+// log.error("Error listening to the submitted job", e);
+// }
+// }
+// };
+ // This thread runs until the program termination, so that use can provide
+// // any action in onChange method of the listener, without worrying for waiting in the caller thread.
+ //t.setDaemon(true);
+// t.start();
+ } catch (Exception e) {
+ String error = "Error during job status monitoring";
+ log.error(error);
+ throw new GFacProviderException(error, e);
+ }
+ while (!listener.isJobDone()) {
+ Thread.sleep(10000);
+ }
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + host.getHostAddress() + e.getMessage();
log.error(error);
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java Sat Oct 12 15:12:20 2013
@@ -33,7 +33,7 @@ import org.apache.airavata.gfac.GFacExce
import org.apache.airavata.gfac.JobSubmissionFault;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
-import org.apache.airavata.gfac.notification.events.GramJobIDEvent;
+import org.apache.airavata.gfac.notification.events.JobIDEvent;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
@@ -191,7 +191,7 @@ public class GramProvider implements GFa
applicationSaved=true;
String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
log.info(jobStatusMessage);
- jobExecutionContext.getNotifier().publish(new GramJobIDEvent(jobStatusMessage));
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
log.info("JobID = " + job.getIDAsString());
@@ -226,7 +226,7 @@ public class GramProvider implements GFa
jobStatusMessage = "Submitted JobID= " + job.getIDAsString();
log.info(jobStatusMessage);
- jobExecutionContext.getNotifier().publish(new GramJobIDEvent(jobStatusMessage));
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
} catch (GSSException e) {
// Renew credentials and re-submit
@@ -258,7 +258,7 @@ public class GramProvider implements GFa
String jobStatusMessage = "Un-submitted JobID= " + job.getIDAsString();
log.info(jobStatusMessage);
- jobExecutionContext.getNotifier().publish(new GramJobIDEvent(jobStatusMessage));
+ jobExecutionContext.getNotifier().publish(new JobIDEvent(jobStatusMessage));
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java Sat Oct 12 15:12:20 2013
@@ -63,78 +63,80 @@ import sun.reflect.generics.reflectiveOb
* Execute application using remote SSH
*/
public class SSHProvider implements GFacProvider {
- private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
- private SSHSecurityContext securityContext;
- private String jobID=null;
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
- if(!((SSHHostType)jobExecutionContext.getApplicationContext().getHostDescription()).getHpcResource()){
- jobID="SSH_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
-
- securityContext = (SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- saveApplicationJob(jobExecutionContext, remoteFile);
- log.info(remoteFile);
- try {
- File runscript = createShellScript(jobExecutionContext);
- SCPFileTransfer fileTransfer = securityContext.getSSHClient().newSCPFileTransfer();
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.STAGING);
- fileTransfer.upload(runscript.getAbsolutePath(), remoteFile);
- } catch (IOException e) {
- throw new GFacProviderException(e.getLocalizedMessage(), e);
- }
+ private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
+ private SSHSecurityContext securityContext;
+ private String jobID = null;
+
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis();
+
+ securityContext = (SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT);
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ saveApplicationJob(jobExecutionContext, remoteFile);
+ log.info(remoteFile);
+ try {
+ File runscript = createShellScript(jobExecutionContext);
+ SCPFileTransfer fileTransfer = securityContext.getSSHClient().newSCPFileTransfer();
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.STAGING);
+ fileTransfer.upload(runscript.getAbsolutePath(), remoteFile);
+ } catch (IOException e) {
+ throw new GFacProviderException(e.getLocalizedMessage(), e);
+ }
}
- }
+ }
- private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
- ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
- job.setJobId(jobID);
- job.setStatus(ApplicationJobStatus.INITIALIZE);
- job.setSubmittedTime(Calendar.getInstance().getTime());
- job.setStatusUpdateTime(job.getSubmittedTime());
- job.setJobData(executableName);
- GFacUtils.recordApplicationJob(jobExecutionContext, job);
- }
-
- public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- if(((SSHHostType)jobExecutionContext.getApplicationContext().getHostDescription()).getHpcResource()){
- ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
- Session session = null;
- try {
- session = securityContext.getSession(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress());
- /*
- * Execute
- */
- String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.SUBMITTED);
- Command cmd = session.exec("/bin/chmod 755 " + execuable + "; " + execuable);
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.RESULTS_RETRIEVE);
- log.info("stdout=" + GFacUtils.readFromStream(session.getInputStream()));
- cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT, TimeUnit.SECONDS);
-
- /*
- * check return value. usually not very helpful to draw conclusions
- * based on return values so don't bother. just provide warning in
- * the log messages
- */
- if (cmd.getExitStatus() != 0) {
- log.error("Process finished with non zero return value. Process may have failed");
- } else {
- log.info("Process finished with return value of zero.");
- }
-
- GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.FINISHED);
- } catch (ConnectionException e) {
- throw new GFacProviderException(e.getMessage(), e);
- } catch (TransportException e) {
- throw new GFacProviderException(e.getMessage(), e);
- } catch (IOException e) {
- throw new GFacProviderException(e.getMessage(), e);
- }finally{
- securityContext.closeSession(session);
- }
- }else {
+ private void saveApplicationJob(JobExecutionContext jobExecutionContext, String executableName) {
+ ApplicationJob job = GFacUtils.createApplicationJob(jobExecutionContext);
+ job.setJobId(jobID);
+ job.setStatus(ApplicationJobStatus.INITIALIZE);
+ job.setSubmittedTime(Calendar.getInstance().getTime());
+ job.setStatusUpdateTime(job.getSubmittedTime());
+ job.setJobData(executableName);
+ GFacUtils.recordApplicationJob(jobExecutionContext, job);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) {
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ Session session = null;
+ try {
+ session = securityContext.getSession(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress());
+ /*
+ * Execute
+ */
+ String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME;
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.SUBMITTED);
+ Command cmd = session.exec("/bin/chmod 755 " + execuable + "; " + execuable);
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.RESULTS_RETRIEVE);
+ log.info("stdout=" + GFacUtils.readFromStream(session.getInputStream()));
+ cmd.join(Constants.COMMAND_EXECUTION_TIMEOUT, TimeUnit.SECONDS);
+
+ /*
+ * check return value. usually not very helpful to draw conclusions
+ * based on return values so don't bother. just provide warning in
+ * the log messages
+ */
+ if (cmd.getExitStatus() != 0) {
+ log.error("Process finished with non zero return value. Process may have failed");
+ } else {
+ log.info("Process finished with return value of zero.");
+ }
+
+ GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobID, ApplicationJobStatus.FINISHED);
+ } catch (ConnectionException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } catch (TransportException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } catch (IOException e) {
+ throw new GFacProviderException(e.getMessage(), e);
+ } finally {
+ if (securityContext != null) {
+ securityContext.closeSession(session);
+ }
+ }
+ } else {
GSISSHProvider gsisshProvider = new GSISSHProvider();
try {
gsisshProvider.execute(jobExecutionContext);
@@ -143,10 +145,10 @@ public class SSHProvider implements GFac
}
}
- }
+ }
- public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
- }
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ }
@Override
public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException {
@@ -155,80 +157,81 @@ public class SSHProvider implements GFac
private File createShellScript(JobExecutionContext context) throws IOException {
- ApplicationDeploymentDescriptionType app = context.getApplicationContext()
- .getApplicationDeploymentDescription().getType();
- String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
- + new Random().nextLong();
-
- File shellScript = File.createTempFile(uniqueDir, "sh");
- OutputStream out = new FileOutputStream(shellScript);
-
- out.write("#!/bin/bash\n".getBytes());
- out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
- out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
- out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
- .getBytes());
- // get the env of the host and the application
- NameValuePairType[] env = app.getApplicationEnvironmentArray();
-
- Map<String, String> nv = new HashMap<String, String>();
- if (env != null) {
- for (int i = 0; i < env.length; i++) {
- String key = env[i].getName();
- String value = env[i].getValue();
- nv.put(key, value);
- }
- }
- for (Entry<String, String> entry : nv.entrySet()) {
- log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
- out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-
- }
-
- // prepare the command
- final String SPACE = " ";
- StringBuffer cmd = new StringBuffer();
- cmd.append(app.getExecutableLocation());
- cmd.append(SPACE);
-
- MessageContext input = context.getInMessageContext();
- ;
- Map<String, Object> inputs = input.getParameters();
- Set<String> keys = inputs.keySet();
- for (String paramName : keys) {
- ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
- if ("URIArray".equals(actualParameter.getType().getType().toString())) {
- String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
- for (String value : values) {
- cmd.append(value);
- cmd.append(SPACE);
- }
- } else {
- String paramValue = MappingFactory.toString(actualParameter);
- cmd.append(paramValue);
- cmd.append(SPACE);
- }
- }
- // We redirect the error and stdout to remote files, they will be read
- // in later
- cmd.append(SPACE);
- cmd.append("1>");
- cmd.append(SPACE);
- cmd.append(app.getStandardOutput());
- cmd.append(SPACE);
- cmd.append("2>");
- cmd.append(SPACE);
- cmd.append(app.getStandardError());
-
- String cmdStr = cmd.toString();
- log.info("Command = " + cmdStr);
- out.write((cmdStr + "\n").getBytes());
- String message = "\"execuationSuceeded\"";
- out.write(("echo " + message + "\n").getBytes());
- out.close();
+ ApplicationDeploymentDescriptionType app = context.getApplicationContext()
+ .getApplicationDeploymentDescription().getType();
+ String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis()
+ + new Random().nextLong();
+
+ File shellScript = File.createTempFile(uniqueDir, "sh");
+ OutputStream out = new FileOutputStream(shellScript);
+
+ out.write("#!/bin/bash\n".getBytes());
+ out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes());
+ out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n")
+ .getBytes());
+ // get the env of the host and the application
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+
+ Map<String, String> nv = new HashMap<String, String>();
+ if (env != null) {
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+ }
+ for (Entry<String, String> entry : nv.entrySet()) {
+ log.debug("Env[" + entry.getKey() + "] = " + entry.getValue());
+ out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+
+ }
+
+ // prepare the command
+ final String SPACE = " ";
+ StringBuffer cmd = new StringBuffer();
+ cmd.append(app.getExecutableLocation());
+ cmd.append(SPACE);
+
+ MessageContext input = context.getInMessageContext();
+ ;
+ Map<String, Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ cmd.append(value);
+ cmd.append(SPACE);
+ }
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ cmd.append(paramValue);
+ cmd.append(SPACE);
+ }
+ }
+ // We redirect the error and stdout to remote files, they will be read
+ // in later
+ cmd.append(SPACE);
+ cmd.append("1>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardOutput());
+ cmd.append(SPACE);
+ cmd.append("2>");
+ cmd.append(SPACE);
+ cmd.append(app.getStandardError());
+
+ String cmdStr = cmd.toString();
+ log.info("Command = " + cmdStr);
+ out.write((cmdStr + "\n").getBytes());
+ String message = "\"execuationSuceeded\"";
+ out.write(("echo " + message + "\n").getBytes());
+ out.close();
+
+ return shellScript;
+ }
- return shellScript;
- }
public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
}
Modified: airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java (original)
+++ airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/BigRed2Test.java Sat Oct 12 15:12:20 2013
@@ -54,11 +54,18 @@ public class BigRed2Test {
private static final String hostAddress = "bigred2";
private static final String hostName = "bigred2.uits.iu.edu";
- private static final String userName = "lginnali";
- private static final String password = "";
+ private static String userName = "lginnali";
+ private static String password = "";
@Before
public void setUp() throws Exception {
+
+ if(System.getProperty("bigred2.password") == null || System.getProperty("bigred2.username") == null){
+ System.out.println("set the bigred2 password/username in maven command : mvn clean install -Dbigred2.username=xxx -Dbigred2.password=yyy");
+ throw new Exception("Wrong inputs given");
+ }
+ userName = System.getProperty("bigred2.username");
+ password = System.getProperty("bigred2.password");
URL resource = GramProviderTest.class.getClassLoader().getResource("gfac-config.xml");
assert resource != null;
System.out.println(resource.getFile());
@@ -76,9 +83,10 @@ public class BigRed2Test {
/*
* Host
*/
- HostDescription host = new HostDescription(GsisshHostType.type);
+ HostDescription host = new HostDescription(SSHHostType.type);
host.getType().setHostAddress(hostAddress);
host.getType().setHostName(hostName);
+ ((SSHHostType)host.getType()).setHpcResource(true);
/*
* App
*/
Modified: airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties?rev=1531540&r1=1531539&r2=1531540&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties (original)
+++ airavata/trunk/modules/gfac-core/src/test/resources/airavata-server.properties Sat Oct 12 15:12:20 2013
@@ -202,7 +202,14 @@ broker.delivery.method=serial
#
# Advanced Message Box Configurations
-#
+#
+#trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
+public.ssh.key=/Users/lahirugunathilake/.ssh/id_dsa.pub
+# SSH PKI key pair is SSH based authentication is used.
+ssh.key=/home/user/.ssh/id_rsa
+ssh.username=lginnali
+private.ssh.key=/Users/lahirugunathilake/.ssh/id_dsa
+ssh.password=
msgBox.usedatabase=true
messagePreservationDays=2
messagePreservationHours=0