You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2011/08/09 19:35:10 UTC
svn commit: r1155439 - in /incubator/airavata/trunk/modules:
commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/
commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/app/
commons/gfac-schema/src/main/java/org/apache/ai...
Author: smarru
Date: Tue Aug 9 17:35:09 2011
New Revision: 1155439
URL: http://svn.apache.org/viewvc?rev=1155439&view=rev
Log:
accepting patch from Patanachai to address AIRAVATA-71
Removed:
incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/type/
Modified:
incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/Parameter.java
incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/app/GramApplicationDeployment.java
incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/host/GlobusHost.java
incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java
incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java
incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java
Modified: incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/Parameter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/Parameter.java?rev=1155439&r1=1155438&r2=1155439&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/Parameter.java (original)
+++ incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/Parameter.java Tue Aug 9 17:35:09 2011
@@ -23,6 +23,7 @@ package org.apache.airavata.core.gfac.ty
public class Parameter implements Type{
private String name;
+ private String description;
private DataType type;
public String getName() {
@@ -30,8 +31,14 @@ public class Parameter implements Type{
}
public void setName(String name) {
this.name = name;
- }
- public DataType getType() {
+ }
+ public String getDescription() {
+ return description;
+ }
+ public void setDescription(String description) {
+ this.description = description;
+ }
+ public DataType getType() {
return type;
}
public void setType(DataType type) {
Modified: incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/app/GramApplicationDeployment.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/app/GramApplicationDeployment.java?rev=1155439&r1=1155438&r2=1155439&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/app/GramApplicationDeployment.java (original)
+++ incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/app/GramApplicationDeployment.java Tue Aug 9 17:35:09 2011
@@ -27,8 +27,9 @@ public class GramApplicationDeployment e
private int wallTime;
private int nodeCount;
private int cpuCount;
-
- public String getProjectName() {
+ private String jobType;
+
+ public String getProjectName() {
return projectName;
}
public void setProjectName(String projectName) {
@@ -57,5 +58,11 @@ public class GramApplicationDeployment e
}
public void setCpuCount(int cpuCount) {
this.cpuCount = cpuCount;
- }
+ }
+ public String getJobType() {
+ return jobType;
+ }
+ public void setJobType(String jobType) {
+ this.jobType = jobType;
+ }
}
Modified: incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/host/GlobusHost.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/host/GlobusHost.java?rev=1155439&r1=1155438&r2=1155439&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/host/GlobusHost.java (original)
+++ incubator/airavata/trunk/modules/commons/gfac-schema/src/main/java/org/apache/airavata/core/gfac/type/host/GlobusHost.java Tue Aug 9 17:35:09 2011
@@ -25,12 +25,21 @@ import org.apache.airavata.core.gfac.typ
public class GlobusHost extends HostDescription {
private String gridFTPEndPoint;
+ private String globusGateKeeperEndPoint;
- public String getGridFTPEndPoint() {
+ public String getGridFTPEndPoint() {
return gridFTPEndPoint;
}
public void setGridFTPEndPoint(String gridFTPEndPoint) {
this.gridFTPEndPoint = gridFTPEndPoint;
- }
+ }
+
+ public String getGlobusGateKeeperEndPoint() {
+ return globusGateKeeperEndPoint;
+ }
+
+ public void setGlobusGateKeeperEndPoint(String globusGateKeeperEndPoint) {
+ this.globusGateKeeperEndPoint = globusGateKeeperEndPoint;
+ }
}
\ No newline at end of file
Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java?rev=1155439&r1=1155438&r2=1155439&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java (original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/GramProvider.java Tue Aug 9 17:35:09 2011
@@ -35,10 +35,13 @@ import org.apache.airavata.core.gfac.exc
import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
import org.apache.airavata.core.gfac.exception.JobSubmissionFault;
import org.apache.airavata.core.gfac.external.GridFtp;
-import org.apache.airavata.core.gfac.model.ExecutionModel;
import org.apache.airavata.core.gfac.notification.NotificationService;
import org.apache.airavata.core.gfac.provider.utils.GramRSLGenerator;
import org.apache.airavata.core.gfac.provider.utils.JobSubmissionListener;
+import org.apache.airavata.core.gfac.type.ServiceDescription;
+import org.apache.airavata.core.gfac.type.app.GramApplicationDeployment;
+import org.apache.airavata.core.gfac.type.app.ShellApplicationDeployment;
+import org.apache.airavata.core.gfac.type.host.GlobusHost;
import org.apache.airavata.core.gfac.utils.ErrorCodes;
import org.apache.airavata.core.gfac.utils.GFacOptions.CurrentProviders;
import org.apache.airavata.core.gfac.utils.GfacUtils;
@@ -48,7 +51,6 @@ import org.globus.gram.GramException;
import org.globus.gram.GramJob;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
-import org.ogce.schemas.gfac.documents.GlobusGatekeeperType;
import edu.indiana.extreme.lead.workflow_tracking.common.DurationObj;
@@ -57,29 +59,24 @@ public class GramProvider extends Abstra
public static final String MYPROXY_SECURITY_CONTEXT = "myproxy";
public void initialize(InvocationContext invocationContext) throws GfacException {
- ExecutionContext appExecContext = invocationContext.getExecutionContext();
- ExecutionModel model = appExecContext.getExecutionModel();
-
+ GlobusHost host = (GlobusHost)invocationContext.getGfacContext().getHost();
+ ShellApplicationDeployment app = (ShellApplicationDeployment)invocationContext.getGfacContext().getApp();
+
GridFtp ftp = new GridFtp();
try {
GSSCredential gssCred = ((GSISecurityContext) invocationContext
.getSecurityContext(MYPROXY_SECURITY_CONTEXT)).getGssCredentails();
- // get Hostname
- String hostgridFTP = null;
-
- if (model.getHostDesc().getHostConfiguration().getGridFTPArray() != null
- && model.getHostDesc().getHostConfiguration().getGridFTPArray().length > 0) {
- hostgridFTP = model.getHostDesc().getHostConfiguration().getGridFTPArray(0).getEndPointReference();
- } else {
- hostgridFTP = model.getHost();
+ String hostgridFTP = host.getGridFTPEndPoint();
+ if (host.getGridFTPEndPoint() == null){
+ hostgridFTP = host.getName();
}
- URI tmpdirURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getTmpDir());
- URI workingDirURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getWorkingDir());
- URI inputURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getInputDataDir());
- URI outputURI = GfacUtils.createGsiftpURI(hostgridFTP, model.getOutputDataDir());
+ URI tmpdirURI = GfacUtils.createGsiftpURI(hostgridFTP, app.getTmpDir());
+ URI workingDirURI = GfacUtils.createGsiftpURI(hostgridFTP, app.getWorkingDir());
+ URI inputURI = GfacUtils.createGsiftpURI(hostgridFTP, app.getInputDir());
+ URI outputURI = GfacUtils.createGsiftpURI(hostgridFTP, app.getOutputDir());
log.info("Host FTP = " + hostgridFTP);
log.info("temp directory = " + tmpdirURI);
@@ -98,17 +95,16 @@ public class GramProvider extends Abstra
}
public void execute(InvocationContext invocationContext) throws GfacException {
- ExecutionContext context = invocationContext.getExecutionContext();
+ GlobusHost host = (GlobusHost)invocationContext.getGfacContext().getHost();
+ GramApplicationDeployment app = (GramApplicationDeployment)invocationContext.getGfacContext().getApp();
+ ServiceDescription service = invocationContext.getGfacContext().getService();
- String contact = null;
log.info("Searching for Gate Keeper");
- GlobusGatekeeperType gatekeeper = context.getExecutionModel().getGatekeeper();
+ String gatekeeper = host.getGlobusGateKeeperEndPoint();
if (gatekeeper == null) {
- contact = context.getExecutionModel().getHost();
- } else {
- contact = gatekeeper.getEndPointReference();
+ gatekeeper = host.getName();
}
- log.info("Using Globus GateKeeper " + contact);
+ log.info("Using Globus GateKeeper " + gatekeeper);
GramJob job = null;
boolean jobSucsseful = false;
@@ -116,45 +112,49 @@ public class GramProvider extends Abstra
int errCode = 0;
try {
- GSSCredential gssCred = ((GSISecurityContext) context.getSecurityContext()).getGssCredentails();
-
- log.info("Host desc = " + context.getExecutionModel().getHostDesc().xmlText());
+ GSSCredential gssCred = ((GSISecurityContext) invocationContext
+ .getSecurityContext(MYPROXY_SECURITY_CONTEXT)).getGssCredentails();
- GramAttributes jobAttr = GramRSLGenerator.configureRemoteJob(context);
+ GramAttributes jobAttr = GramRSLGenerator.configureRemoteJob(invocationContext);
rsl = jobAttr.toRSL();
job = new GramJob(rsl);
job.setCredentials(gssCred);
log.info("RSL = " + rsl);
- NotificationService notifier = context.getNotificationService();
+ NotificationService notifier = invocationContext.getExecutionContext().getNotificationService();
DurationObj compObj = notifier.computationStarted();
StringBuffer buf = new StringBuffer();
- JobSubmissionListener listener = new JobSubmissionListener(job, context);
+ JobSubmissionListener listener = new JobSubmissionListener(job, invocationContext);
job.addListener(listener);
- log.info("Request to contact:" + contact);
- // The first boolean is to specify the job is a batch job - use true
- // for interactive and false for batch.
- // the second boolean is to specify to use the full proxy and not
- // delegate a limited proxy.
- job.request(contact, false, false);
+ log.info("Request to contact:" + gatekeeper);
+ /*
+ * The first boolean is to specify the job is a batch job - use true for interactive and false for batch.
+ * The second boolean is to specify to use the full proxy and not delegate a limited proxy.
+ */
+ job.request(gatekeeper, false, false);
log.info("JobID = " + job.getIDAsString());
// Gram.request(contact, job, false, false);
- buf.append("Finished launching job, Host = ").append(context.getExecutionModel().getHost())
- .append(" RSL = ").append(job.getRSL()).append("working directory =")
- .append(context.getExecutionModel().getWorkingDir()).append("tempDirectory =")
- .append(context.getExecutionModel().getTmpDir()).append("Globus GateKeeper cantact = ")
- .append(contact);
- context.getNotificationService().info(buf.toString());
+ buf.append("Finished launching job, Host = ")
+ .append(host.getName())
+ .append(" RSL = ")
+ .append(job.getRSL())
+ .append(" working directory = ")
+ .append(app.getWorkingDir())
+ .append(" tempDirectory = ")
+ .append(app.getTmpDir())
+ .append(" Globus GateKeeper cantact = ")
+ .append(gatekeeper);
+ invocationContext.getExecutionContext().getNotificationService().info(buf.toString());
String gramJobid = job.getIDAsString();
- context.getNotificationService().info("JobID=" + gramJobid);
+ invocationContext.getExecutionContext().getNotificationService().info("JobID=" + gramJobid);
log.info(buf.toString());
// Send Audit Notifications
- notifier.appAudit(invocationContext.getServiceName(), new URI(job.getIDAsString()), contact, null, null,
+ notifier.appAudit(invocationContext.getServiceName(), new URI(job.getIDAsString()), gatekeeper, null, null,
gssCred.getName().toString(), null, job.getRSL());
listener.waitFor();
@@ -162,57 +162,15 @@ public class GramProvider extends Abstra
int jobStatus = listener.getStatus();
if (jobStatus == GramJob.STATUS_FAILED) {
- errCode = listener.getError();
- // Adding retry for error code to properties files as
- // gfac.retryonJobErrorCodes with comma separated
- if (context.getServiceContext().getGlobalConfiguration().getRetryonErrorCodes()
- .contains(Integer.toString(errCode))) {
- try {
- log.info("Job Failed with Error code " + errCode + " and job id: " + gramJobid);
- log.info("Retry job sumttion one more time for error code" + errCode);
- job = new GramJob(rsl);
- job.setCredentials(gssCred);
- listener = new JobSubmissionListener(job, context);
- job.addListener(listener);
- job.request(contact, false, false);
- String newGramJobid = job.getIDAsString();
- String jobStatusMessage = GfacUtils.formatJobStatus(newGramJobid, "RETRY");
- context.getNotificationService().info(jobStatusMessage);
- context.getNotificationService().info("JobID=" + newGramJobid);
- notifier.appAudit(context.getServiceContext().getService().getService().getServiceName()
- .getStringValue(), new URI(job.getIDAsString()), contact, null, null, gssCred.getName()
- .toString(), null, job.getRSL());
- listener.waitFor();
- job.removeListener(listener);
- int jobStatus1 = listener.getStatus();
- if (jobStatus1 == GramJob.STATUS_FAILED) {
- int errCode1 = listener.getError();
- String errorMsg = "Job " + job.getID() + " on host "
- + context.getExecutionModel().getHost() + " Error Code = " + errCode1;
- String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
- throw new JobSubmissionFault(new Exception(errorMsg), localHost, "", "",
- CurrentProviders.Gram);
- }
- } catch (Exception e) {
- String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
- throw new JobSubmissionFault(e, localHost, "", "", CurrentProviders.Gram);
- }
+ errCode = listener.getError();
+ String errorMsg = "Job " + job.getID() + " on host " + host.getName() + " Error Code = " + errCode;
+ GfacException error = new JobSubmissionFault(new Exception(errorMsg), "GFAC HOST", gatekeeper, rsl, CurrentProviders.Gram);
+ if (errCode == 8) {
+ error.setFaultCode(ErrorCodes.JOB_CANCELED);
} else {
- String errorMsg = "Job " + job.getID() + " on host " + context.getExecutionModel().getHost()
- + " Error Code = " + errCode;
- String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
- GfacException error = new JobSubmissionFault(new Exception(errorMsg), localHost, contact, rsl,
- CurrentProviders.Gram);
- if (errCode == 8) {
- error.setFaultCode(ErrorCodes.JOB_CANCELED);
- } else {
- error.setFaultCode(ErrorCodes.JOB_FAILED);
- }
- // error.addProperty(ErrorCodes.JOB_TYPE,
- // ErrorCodes.JobType.Gram.toString());
- // error.addProperty(ErrorCodes.CONTACT, contact);
- throw error;
+ error.setFaultCode(ErrorCodes.JOB_FAILED);
}
+ throw error;
}
notifier.computationFinished(compObj);
@@ -221,23 +179,13 @@ public class GramProvider extends Abstra
*/
GridFtp ftp = new GridFtp();
- // get Hostname
- String hostgridFTP = null;
-
- if (invocationContext.getExecutionContext().getExecutionModel().getHostDesc().getHostConfiguration()
- .getGridFTPArray() != null
- && invocationContext.getExecutionContext().getExecutionModel().getHostDesc().getHostConfiguration()
- .getGridFTPArray().length > 0) {
- hostgridFTP = invocationContext.getExecutionContext().getExecutionModel().getHostDesc()
- .getHostConfiguration().getGridFTPArray(0).getEndPointReference();
- } else {
- hostgridFTP = invocationContext.getExecutionContext().getExecutionModel().getHost();
+ String hostgridFTP = host.getGridFTPEndPoint();
+ if (host.getGridFTPEndPoint() == null){
+ hostgridFTP = host.getName();
}
- URI stdoutURI = GfacUtils.createGsiftpURI(hostgridFTP, invocationContext.getExecutionContext()
- .getExecutionModel().getStdOut());
- URI stderrURI = GfacUtils.createGsiftpURI(hostgridFTP, invocationContext.getExecutionContext()
- .getExecutionModel().getStderr());
+ URI stdoutURI = GfacUtils.createGsiftpURI(hostgridFTP, app.getStdOut());
+ URI stderrURI = GfacUtils.createGsiftpURI(hostgridFTP, app.getStdErr());
System.out.println(stdoutURI);
System.out.println(stderrURI);
@@ -262,19 +210,15 @@ public class GramProvider extends Abstra
jobSucsseful = true;
} catch (GramException e) {
String localHost = "xxxx";
- GfacException error = new JobSubmissionFault(e, localHost, contact, rsl, CurrentProviders.Gram);
+ GfacException error = new JobSubmissionFault(e, localHost, gatekeeper, rsl, CurrentProviders.Gram);
if (errCode == 8) {
error.setFaultCode(ErrorCodes.JOB_CANCELED);
} else {
error.setFaultCode(ErrorCodes.JOB_FAILED);
}
- // error.addProperty(ErrorCodes.JOB_TYPE,
- // ErrorCodes.JobType.Gram.toString());
- // error.addProperty(ErrorCodes.CONTACT, contact);
throw error;
} catch (GSSException e) {
- String localHost = context.getServiceContext().getGlobalConfiguration().getLocalHost();
- throw new JobSubmissionFault(e, localHost, contact, rsl, CurrentProviders.Gram);
+ throw new JobSubmissionFault(e, "GFAC HOST", gatekeeper, rsl, CurrentProviders.Gram);
} catch (URISyntaxException e) {
throw new GfacException(e, FaultCode.ErrorAtDependentService);
} catch (InterruptedException e) {
Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java?rev=1155439&r1=1155438&r2=1155439&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java (original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/GramRSLGenerator.java Tue Aug 9 17:35:09 2011
@@ -21,18 +21,20 @@
package org.apache.airavata.core.gfac.provider.utils;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
-import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.InvocationContext;
import org.apache.airavata.core.gfac.exception.GfacException;
import org.apache.airavata.core.gfac.exception.GfacException.FaultCode;
+import org.apache.airavata.core.gfac.type.ServiceDescription;
+import org.apache.airavata.core.gfac.type.app.GramApplicationDeployment;
+import org.apache.airavata.core.gfac.type.host.GlobusHost;
import org.apache.airavata.core.gfac.utils.GFacConstants;
import org.globus.gram.GramAttributes;
import org.ogce.namespaces.x2010.x08.x30.workflowContextHeader.WorkflowContextHeaderDocument.WorkflowContextHeader;
import org.ogce.namespaces.x2010.x08.x30.workflowResourceMapping.ResourceMappingDocument.ResourceMapping;
-import org.ogce.schemas.gfac.documents.ApplicationDescriptionType;
-import org.ogce.schemas.gfac.documents.RSLParmType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,92 +45,96 @@ public class GramRSLGenerator {
SINGLE, MPI, MULTIPLE, CONDOR
};
- public static GramAttributes configureRemoteJob(ExecutionContext appExecContext) throws GfacException {
+ public static GramAttributes configureRemoteJob(InvocationContext context) throws GfacException {
+ GlobusHost host = (GlobusHost)context.getGfacContext().getHost();
+ GramApplicationDeployment app = (GramApplicationDeployment)context.getGfacContext().getApp();
+ ServiceDescription service = context.getGfacContext().getService();
+
+
GramAttributes jobAttr = new GramAttributes();
- jobAttr.setExecutable(appExecContext.getExecutionModel().getExecutable());
- jobAttr.setDirectory(appExecContext.getExecutionModel().getWorkingDir());
- jobAttr.setStdout(appExecContext.getExecutionModel().getStdOut());
- jobAttr.setStderr(appExecContext.getExecutionModel().getStderr());
-
- // The env here contains the env of the host and the application. i.e
- // the env specified in
- // the host description and application description documents
- Map<String, String> nv = appExecContext.getExecutionModel().getEnv();
-
+ jobAttr.setExecutable(app.getExecutable());
+ jobAttr.setDirectory(app.getWorkingDir());
+ jobAttr.setStdout(app.getStdOut());
+ jobAttr.setStderr(app.getStdErr());
+
+ /*
+ * The env here contains the env of the host and the application. i.e the env specified in the host description and application description documents
+ */
+ Map<String, String> nv = app.getEnv();
for (String key : nv.keySet()) {
jobAttr.addEnvVariable(key, nv.get(key));
}
- jobAttr.addEnvVariable(GFacConstants.INPUT_DATA_DIR, appExecContext.getExecutionModel().getInputDataDir());
- String outputDataDir = GFacConstants.OUTPUT_DATA_DIR;
- if (!outputDataDir.isEmpty()) {
- jobAttr.addEnvVariable(outputDataDir, appExecContext.getExecutionModel().getOutputDataDir());
- }
- ApplicationDescriptionType app = appExecContext.getExecutionModel().getAplicationDesc();
- WorkflowContextHeader contextHeader = appExecContext.getWorkflowHeader();
+ jobAttr.addEnvVariable(GFacConstants.INPUT_DATA_DIR, app.getInputDir());
+ jobAttr.addEnvVariable(GFacConstants.INPUT_DATA_DIR, app.getOutputDir());
+
+ WorkflowContextHeader contextHeader = context.getExecutionContext().getWorkflowHeader();
ResourceMapping resourceMapping = null;
if (contextHeader != null) {
resourceMapping = contextHeader.getResourceMappings().getResourceMappingArray(0);
}
-
- log.info("Configure using App Desc = " + app.xmlText());
+
if (resourceMapping != null && resourceMapping.getMaxWallTime() > 0) {
log.info("Header Setting Max Wall Time" + resourceMapping.getMaxWallTime());
jobAttr.setMaxWallTime(resourceMapping.getMaxWallTime());
- } else if (app.getMaxWallTime() > 0) {
- log.info("Setting max wall clock time to " + app.getMaxWallTime());
+ } else if (app.getWallTime() > 0) {
+ log.info("Setting max wall clock time to " + app.getWallTime());
- if (app.getMaxWallTime() > 30 && app.getQueue() != null && app.getQueue().equals("debug")) {
+ if (app.getWallTime() > 30 && app.getQueueName() != null && app.getQueueName().equals("debug")) {
throw new GfacException("NCSA debug Queue only support jobs < 30 minutes", FaultCode.InvalidConfig);
}
- jobAttr.setMaxWallTime(app.getMaxWallTime());
+ jobAttr.setMaxWallTime(app.getWallTime());
jobAttr.set("proxy_timeout", "1");
} else {
jobAttr.setMaxWallTime(29);
}
- if (appExecContext.getExecutionModel().getStdIn() != null) {
- jobAttr.setStdin(appExecContext.getExecutionModel().getStdIn());
+ if (app.getStdIn() != null) {
+ jobAttr.setStdin(app.getStdIn());
} else {
- Iterator<String> values = appExecContext.getExecutionModel().getInputParameters().iterator();
- while (values.hasNext()) {
- jobAttr.addArgument(values.next());
- }
+ // input parameter
+ ArrayList<String> tmp = new ArrayList<String>();
+ for (Iterator<String> iterator = context.getMessageContext("input").getParameterNames(); iterator.hasNext();) {
+ String key = iterator.next();
+ jobAttr.addArgument(context.getMessageContext("input").getStringParameterValue(key));
+ }
}
if (resourceMapping != null && resourceMapping.getNodeCount() > 0) {
log.info("Setting number of procs to " + resourceMapping.getNodeCount());
jobAttr.set("hostCount", String.valueOf(resourceMapping.getNodeCount()));
- } else if (app.getHostCount() > 1) {
- jobAttr.set("hostCount", String.valueOf(app.getHostCount()));
+ } else if (app.getNodeCount() > 1) {
+ jobAttr.set("hostCount", String.valueOf(app.getNodeCount()));
}
if (resourceMapping != null && resourceMapping.getCpuCount() > 0) {
log.info("Setting host count to " + resourceMapping.getCpuCount());
jobAttr.setNumProcs(resourceMapping.getCpuCount());
- } else if (app.getCount() > 1) {
- log.info("Setting number of procs to " + app.getCount());
- jobAttr.setNumProcs(app.getCount());
+ } else if (app.getCpuCount() > 1) {
+ log.info("Setting number of procs to " + app.getCpuCount());
+ jobAttr.setNumProcs(app.getCpuCount());
}
- if (app.getProject() != null && app.getProject().getProjectName() != null) {
- log.info("Setting project to " + app.getProject());
- jobAttr.setProject(app.getProject().getProjectName());
+ if (app.getProjectName() != null){
+ log.info("Setting project to " + app.getProjectName());
+ jobAttr.setProject(app.getProjectName());
}
if (resourceMapping != null && resourceMapping.getQueueName() != null) {
jobAttr.setQueue(resourceMapping.getQueueName());
- } else if (app.getQueue() != null && app.getQueue().getQueueName() != null) {
- log.info("Setting job queue to " + app.getQueue());
- jobAttr.setQueue(app.getQueue().getQueueName());
+ } else if (app.getQueueName() != null) {
+ log.info("Setting job queue to " + app.getQueueName());
+ jobAttr.setQueue(app.getQueueName());
}
+
+
String jobType = JobType.SINGLE.toString();
-
if (app.getJobType() != null) {
jobType = app.getJobType().toString();
}
+
if (jobType.equalsIgnoreCase(JobType.SINGLE.toString())) {
log.info("Setting job type to single");
jobAttr.setJobType(GramAttributes.JOBTYPE_SINGLE);
@@ -140,27 +146,9 @@ public class GramRSLGenerator {
jobAttr.setJobType(GramAttributes.JOBTYPE_MULTIPLE);
} else if (jobType.equalsIgnoreCase(JobType.CONDOR.toString())) {
jobAttr.setJobType(GramAttributes.JOBTYPE_CONDOR);
- }
+ }
- // Support to add the Additional RSL parameters
- RSLParmType[] rslParams = app.getRslparmArray();
- if (rslParams.length > 0) {
- for (RSLParmType rslType : rslParams) {
- log.info("Adding rsl param of [" + rslType.getName() + "," + rslType.getStringValue() + "]");
- if (rslType.getName() != "") {
- jobAttr.set(rslType.getName(), rslType.getStringValue());
- }
- }
- }
-
- // support urgency/SPRUCE case
- // only add spruce rsl parameter if this host has a spruce jobmanager
- // configured
- if (appExecContext.getWorkflowHeader() != null && appExecContext.getWorkflowHeader().getURGENCY() != null
- // && GfacUtils.getSpruceGatekeeper(appExecContext) != null
- ) {
- jobAttr.set("urgency", appExecContext.getWorkflowHeader().getURGENCY());
- }
+ //TODO rsl parameter & urgency/SPRUCE
return jobAttr;
}
Modified: incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java?rev=1155439&r1=1155438&r2=1155439&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java (original)
+++ incubator/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/provider/utils/JobSubmissionListener.java Tue Aug 9 17:35:09 2011
@@ -22,6 +22,8 @@
package org.apache.airavata.core.gfac.provider.utils;
import org.apache.airavata.core.gfac.context.ExecutionContext;
+import org.apache.airavata.core.gfac.context.GFACContext;
+import org.apache.airavata.core.gfac.context.InvocationContext;
import org.apache.airavata.core.gfac.context.impl.GSISecurityContext;
import org.apache.airavata.core.gfac.exception.GfacException;
import org.apache.airavata.core.gfac.utils.GfacUtils;
@@ -35,16 +37,18 @@ import org.slf4j.LoggerFactory;
public class JobSubmissionListener implements GramJobListener {
+ public static final String MYPROXY_SECURITY_CONTEXT = "myproxy";
+
private boolean finished;
private int error;
private int status;
- private ExecutionContext executionContext;
+ private InvocationContext context;
private GramJob job;
protected final Logger log = LoggerFactory.getLogger(JobSubmissionListener.class);
- public JobSubmissionListener(GramJob job, ExecutionContext executionContext) {
+ public JobSubmissionListener(GramJob job, InvocationContext context) {
this.job = job;
- this.executionContext = executionContext;
+ this.context = context;
}
// waits for DONE or FAILED status
@@ -53,9 +57,9 @@ public class JobSubmissionListener imple
int proxyExpTime = job.getCredentials().getRemainingLifetime();
if (proxyExpTime < 900) {
log.info("Job proxy expired. Trying to renew proxy");
- GSSCredential newgssCred = ((GSISecurityContext) executionContext.getSecurityContext())
- .getGssCredentails();
- job.renew(newgssCred);
+ GSSCredential gssCred = ((GSISecurityContext) context
+ .getSecurityContext(MYPROXY_SECURITY_CONTEXT)).getGssCredentails();
+ job.renew(gssCred);
}
// job status is changed but method isn't invoked
if (status != 0) {
@@ -82,7 +86,7 @@ public class JobSubmissionListener imple
String jobStatusMessage = GfacUtils.formatJobStatus(jobId, statusString);
log.info(jobStatusMessage);
status = jobStatus;
- executionContext.getNotificationService().info(jobStatusMessage);
+ context.getExecutionContext().getNotificationService().info(jobStatusMessage);
if (jobStatus == GramJob.STATUS_DONE) {
finished = true;
} else if (jobStatus == GramJob.STATUS_FAILED) {