You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/01/31 20:52:57 UTC
svn commit: r1441167 - in /airavata/trunk/modules/gfac-core: ./
src/main/java/org/apache/airavata/gfac/
src/main/java/org/apache/airavata/gfac/context/
src/main/java/org/apache/airavata/gfac/phoebus/
src/main/java/org/apache/airavata/gfac/provider/ src...
Author: lahiru
Date: Thu Jan 31 19:52:57 2013
New Revision: 1441167
URL: http://svn.apache.org/viewvc?rev=1441167&view=rev
Log:
adding GramProvider in to new gfac structure.
Added:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
Modified:
airavata/trunk/modules/gfac-core/pom.xml
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
Modified: airavata/trunk/modules/gfac-core/pom.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/pom.xml?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/pom.xml (original)
+++ airavata/trunk/modules/gfac-core/pom.xml Thu Jan 31 19:52:57 2013
@@ -41,26 +41,26 @@
</dependency>
-->
<dependency>
- <groupId>cog-4_1_6_rc2</groupId>
+ <groupId>lead-security</groupId>
<artifactId>puretls</artifactId>
<type>jar</type>
- <version>cog-4_1_6</version>
+ <version>0.9b4-1</version>
</dependency>
- <!--dependency>
- <groupId>cog-4_1_6_rc2</groupId>
+ <dependency>
+ <groupId>lead-security</groupId>
<artifactId>cryptix32</artifactId>
- <version>cog-4_1_6</version>
+ <version>versionless</version>
</dependency>
<dependency>
- <groupId>cog-4_1_6_rc2</groupId>
+ <groupId>lead-security</groupId>
<artifactId>cryptix-asn1</artifactId>
- <version>cog-4_1_6</version>
- </dependency-->
- <!--dependency>
- <groupId>bouncycastle</groupId>
+ <version>versionless</version>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15</artifactId>
- <version>143</version>
- </dependency-->
+ <version>1.45</version>
+ </dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java?rev=1441167&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java Thu Jan 31 19:52:57 2013
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+
+public class JobSubmissionFault extends GFacProviderException{
+
+ public static final String JOB_CANCEL = "JOB_CANCEL";
+
+ public static final String JOB_FAILED = "JOB_FAILED";
+
+ private String reason;
+
+ public JobSubmissionFault(GFacProvider provider, Throwable cause, String submitHost, String contact, String rsl, JobExecutionContext jobExecutionContext) {
+ super(cause.getMessage(), cause,jobExecutionContext);
+ }
+
+ public void setReason(String reason) {
+ this.reason = reason;
+ }
+
+ public void sendFaultNotification(String message,
+ JobExecutionContext jobExecutionContext, Exception e,
+ String... additionalExceptiondata) {
+
+ }
+}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java Thu Jan 31 19:52:57 2013
@@ -24,7 +24,9 @@ package org.apache.airavata.gfac;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GramProvider;
import org.apache.airavata.gfac.provider.impl.LocalProvider;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +60,11 @@ public class Scheduler {
*/
private static GFacProvider getProvider(JobExecutionContext jobExecutionContext){
HostDescription hostDescription = jobExecutionContext.getApplicationContext().getHostDescription();
-
- return new LocalProvider();
+ if(hostDescription.getType() instanceof GlobusHostType){
+ return new GramProvider();
+ }else{
+ return new LocalProvider();
+ }
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java Thu Jan 31 19:52:57 2013
@@ -44,5 +44,8 @@ public class MessageContext extends Abst
parameters.put(name, value);
}
+ public Map<String,Object> getParameters(){
+ return parameters;
+ }
}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java Thu Jan 31 19:52:57 2013
@@ -26,14 +26,14 @@ import org.globus.ftp.DataChannelAuthent
import org.globus.ftp.GridFTPClient;
public class PhoebusGridConfigurationHandler implements GridConfigurationHandler{
- @Override
+
public void handleSourceFTPClient(GridFTPClient client) throws Exception {
if (PhoebusUtils.isPhoebusDriverConfigurationsDefined(client.getHost())) {
client.setDataChannelAuthentication(DataChannelAuthentication.NONE);
client.site("SITE SETNETSTACK phoebus:" + PhoebusUtils.getPhoebusDataChannelXIODriverParameters(client.getHost()));
}
}
- @Override
+
public void handleDestinationFTPClient(GridFTPClient client)
throws Exception {
Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1441167&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java Thu Jan 31 19:52:57 2013
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.provider;
+
+import org.apache.airavata.gfac.JobSubmissionFault;
+import org.apache.airavata.gfac.context.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
+import org.apache.airavata.gfac.utils.GramJobSubmissionListener;
+import org.apache.airavata.gfac.utils.GramProviderUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GramProvider implements GFacProvider {
+ private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+ private GramJob job;
+ private GramJobSubmissionListener listener;
+
+ // This method precpare the environment before the application invocation.
+ public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ GramProviderUtils.makeDirectory(jobExecutionContext);
+ job = GramProviderUtils.setupEnvironment(jobExecutionContext);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+ job.addListener(listener);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ System.out.println("Executing the job");
+ GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+ StringBuffer buf = new StringBuffer();
+ try {
+
+ /*
+ * Set Security
+ */
+ GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = gssContext.getGssCredentails();
+ job.setCredentials(gssCred);
+ // We do not support multiple gatekeepers in XBaya GUI, so we simply pick the 0th element in the array
+ String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
+ log.debug("Request to contact:" + gateKeeper);
+
+ buf.append("Finished launching job, Host = ").append(host.getHostAddress()).append(" RSL = ")
+ .append(job.getRSL()).append(" working directory = ").append(app.getStaticWorkingDirectory())
+ .append(" temp directory = ").append(app.getScratchWorkingDirectory())
+ .append(" Globus GateKeeper Endpoint = ").append(gateKeeper);
+
+ /*
+ * The first boolean is to specify the job is a batch job - use true for interactive and false for batch.
+ * The second boolean is to specify to use the full proxy and not delegate a limited proxy.
+ */
+ job.request(gateKeeper, false, false);
+ String gramJobid = job.getIDAsString();
+ log.info("JobID = " + gramJobid);
+
+ log.info(buf.toString());
+ /*
+ * Block untill job is done
+ */
+ listener.waitFor();
+
+ /*
+ * Remove listener
+ */
+ job.removeListener(listener);
+
+ /*
+ * Fail job
+ */
+ int jobStatus = listener.getStatus();
+
+ if (job.getExitCode() != 0 || jobStatus == GramJob.STATUS_FAILED) {
+ int errCode = listener.getError();
+ String errorMsg = "Job " + job.getID() + " on host " + host.getHostAddress() + " Job Exit Code = "
+ + listener.getError();
+ JobSubmissionFault error = new JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
+ gateKeeper, job.getRSL(), jobExecutionContext);
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(error.getCause()));
+ throw error;
+ }
+ } catch (GramException e) {
+ JobSubmissionFault error = new JobSubmissionFault(this, e, host.getHostAddress(),
+ host.getGlobusGateKeeperEndPointArray(0), job.getRSL(), jobExecutionContext);
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(error.getCause()));
+ } catch (GSSException e) {
+ throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+ } catch (InterruptedException e) {
+ throw new GFacProviderException("Thread", e, jobExecutionContext);
+ } catch (SecurityException e) {
+ throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+ } finally {
+ if (job != null) {
+ try {
+ job.cancel();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ }
+}
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java Thu Jan 31 19:52:57 2013
@@ -20,5 +20,86 @@
*/
package org.apache.airavata.gfac.utils;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramAttributes;
+import org.globus.gram.GramJob;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
public class GramProviderUtils {
+ private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+ public static void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GridFtp ftp = new GridFtp();
+
+ try {
+ GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = gssContext.getGssCredentails();
+ String[] hostgridFTP = host.getGridFTPEndPointArray();
+ if (hostgridFTP == null || hostgridFTP.length == 0) {
+ hostgridFTP = new String[]{host.getHostAddress()};
+ }
+ boolean success = false;
+ GFacProviderException pe = null;// = new ProviderException("");
+ for (String endpoint : host.getGridFTPEndPointArray()) {
+ try {
+
+ URI tmpdirURI = GFacUtils.createGsiftpURI(endpoint, app.getScratchWorkingDirectory());
+ URI workingDirURI = GFacUtils.createGsiftpURI(endpoint, app.getStaticWorkingDirectory());
+ URI inputURI = GFacUtils.createGsiftpURI(endpoint, app.getInputDataDirectory());
+ URI outputURI = GFacUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory());
+
+ log.info("Host FTP = " + hostgridFTP[0]);
+ log.info("temp directory = " + tmpdirURI);
+ log.info("Working directory = " + workingDirURI);
+ log.info("Input directory = " + inputURI);
+ log.info("Output directory = " + outputURI);
+
+ ftp.makeDir(tmpdirURI, gssCred);
+ ftp.makeDir(workingDirURI, gssCred);
+ ftp.makeDir(inputURI, gssCred);
+ ftp.makeDir(outputURI, gssCred);
+
+ success = true;
+ break;
+ } catch (URISyntaxException e) {
+ pe = new GFacProviderException("URI is malformatted:" + e.getMessage(), e, jobExecutionContext);
+
+ } catch (ToolsException e) {
+ pe = new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+ }
+ }
+ if (success == false) {
+ throw pe;
+ }
+ } catch (SecurityException e) {
+ throw new GFacProviderException(e.getMessage(), e, jobExecutionContext);
+ }
+ }
+
+ public static GramJob setupEnvironment(JobExecutionContext jobExecutionContext) throws GFacProviderException {
+ log.debug("Searching for Gate Keeper");
+ try {
+ GramAttributes jobAttr = GramRSLGenerator.configureRemoteJob(jobExecutionContext);
+ String rsl = jobAttr.toRSL();
+
+ log.debug("RSL = " + rsl);
+ GramJob job = new GramJob(rsl);
+ return job;
+ } catch (ToolsException te) {
+ throw new GFacProviderException(te.getMessage(), te, jobExecutionContext);
+ }
+ }
}
Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java?rev=1441167&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java Thu Jan 31 19:52:57 2013
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.utils;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.globus.gram.GramAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class GramRSLGenerator {
+ protected static final Logger log = LoggerFactory.getLogger(GramRSLGenerator.class);
+
+ private enum JobType {
+ SERIAL, SINGLE, MPI, MULTIPLE, CONDOR
+ }
+
+ ;
+
+ public static GramAttributes configureRemoteJob(JobExecutionContext context) throws ToolsException {
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) context.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GramAttributes jobAttr = new GramAttributes();
+ jobAttr.setExecutable(app.getExecutableLocation());
+ jobAttr.setDirectory(app.getStaticWorkingDirectory());
+ jobAttr.setStdout(app.getStandardOutput());
+ jobAttr.setStderr(app.getStandardError());
+
+ /*
+ * The env here contains the env of the host and the application. i.e the env specified in the host description
+ * and application description documents
+ */
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+ if (env.length != 0) {
+ Map<String, String> nv = new HashMap<String, String>();
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+
+ for (Map.Entry<String, String> entry : nv.entrySet()) {
+ jobAttr.addEnvVariable(entry.getKey(), entry.getValue());
+ }
+ }
+ jobAttr.addEnvVariable(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory());
+ jobAttr.addEnvVariable(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory());
+
+ if (app.getMaxWallTime() > 0) {
+ log.debug("Setting max wall clock time to " + app.getMaxWallTime());
+
+ if (app.getMaxWallTime() > 30 && app.getQueue() != null && app.getQueue().getQueueName().equals("debug")) {
+ throw new ToolsException("NCSA debug Queue only support jobs < 30 minutes");
+ }
+
+ jobAttr.setMaxWallTime(app.getMaxWallTime());
+ jobAttr.set("proxy_timeout", "1");
+ } else {
+ jobAttr.setMaxWallTime(30);
+ }
+
+ if (app.getStandardInput() != null && !"".equals(app.getStandardInput())) {
+ jobAttr.setStdin(app.getStandardInput());
+ } else {
+ MessageContext input = context.getInMessageContext();;
+ Map<String,Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys ) {
+ ActualParameter actualParameter = (ActualParameter) inputs.get(paramName);
+ if ("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ jobAttr.addArgument(value);
+ }
+ } else {
+ String paramValue = MappingFactory.toString(actualParameter);
+ jobAttr.addArgument(paramValue);
+ }
+ }
+ }
+ // Using the workflowContext Header values if user provided them in the request and overwrite the default values in DD
+ //todo finish the scheduling based on workflow execution context
+// ContextHeaderDocument.ContextHeader currentContextHeader = WorkflowContextHeaderBuilder.getCurrentContextHeader();
+// if (currentContextHeader.getWorkflowSchedulingContext() != null) {
+// if (currentContextHeader != null &&
+// currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray() != null &&
+// currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray().length > 0) {
+// try {
+// int cpuCount = currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()[0].getCpuCount();
+// if(cpuCount>0){
+// app.setCpuCount(cpuCount);
+// }
+// } catch (NullPointerException e) {
+// log.debug("No Value sent in WorkflowContextHeader for CPU Count, value in the Deployment Descriptor will be used");
+// context.getNotifier().publish(new ExecutionFailEvent(context, e, "No Value sent in WorkflowContextHeader for Node Count, value in the Deployment Descriptor will be used");
+// }
+// try {
+// int nodeCount = currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()[0].getNodeCount();
+// if(nodeCount>0){
+// app.setNodeCount(nodeCount);
+// }
+// } catch (NullPointerException e) {
+// log.debug("No Value sent in WorkflowContextHeader for Node Count, value in the Deployment Descriptor will be used");
+// context.getExecutionContext().getNotifier().executionFail(context, e, "No Value sent in WorkflowContextHeader for Node Count, value in the Deployment Descriptor will be used");
+// }
+// try {
+// String queueName = currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()[0].getQueueName();
+// if (queueName != null) {
+// if(app.getQueue() == null){
+// QueueType queueType = app.addNewQueue();
+// queueType.setQueueName(queueName);
+// }else{
+// app.getQueue().setQueueName(queueName);
+// }
+// }
+// } catch (NullPointerException e) {
+// log.debug("No Value sent in WorkflowContextHeader for Node Count, value in the Deployment Descriptor will be used");
+// context.getExecutionContext().getNotifier().executionFail(context, e, "No Value sent in WorkflowContextHeader for Node Count, value in the Deployment Descriptor will be used");
+// }
+// }
+// }
+// if(currentContextHeader.getWorkflowOutputDataHandling() != null){
+// if(currentContextHeader.getWorkflowOutputDataHandling().getApplicationOutputDataHandlingArray().length != 0)
+// app.setOutputDataDirectory(currentContextHeader.getWorkflowOutputDataHandling().getApplicationOutputDataHandlingArray()[0].getOutputDataDirectory());
+// }
+ if (app.getNodeCount() > 0) {
+ jobAttr.set("hostCount", String.valueOf(app.getNodeCount()));
+ log.debug("Setting number of Nodes to " + app.getCpuCount());
+ }
+ if (app.getCpuCount() > 0) {
+ log.debug("Setting number of procs to " + app.getCpuCount());
+ jobAttr.setNumProcs(app.getCpuCount());
+ }
+ if (app.getMinMemory() > 0) {
+ log.debug("Setting minimum memory to " + app.getMinMemory());
+ jobAttr.setMinMemory(app.getMinMemory());
+ }
+ if (app.getMaxMemory() > 0) {
+ log.debug("Setting maximum memory to " + app.getMaxMemory());
+ jobAttr.setMaxMemory(app.getMaxMemory());
+ }
+ if (app.getProjectAccount() != null) {
+ if (app.getProjectAccount().getProjectAccountNumber() != null) {
+ log.debug("Setting project to " + app.getProjectAccount().getProjectAccountNumber());
+ jobAttr.setProject(app.getProjectAccount().getProjectAccountNumber());
+ }
+ }
+ if (app.getQueue() != null) {
+ if (app.getQueue().getQueueName() != null) {
+ log.debug("Setting job queue to " + app.getQueue().getQueueName());
+ jobAttr.setQueue(app.getQueue().getQueueName());
+ }
+ }
+ String jobType = JobType.SINGLE.toString();
+ if (app.getJobType() != null) {
+ jobType = app.getJobType().toString();
+ }
+ if (jobType.equalsIgnoreCase(JobType.SINGLE.toString())) {
+ log.debug("Setting job type to single");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_SINGLE);
+ } if (jobType.equalsIgnoreCase(JobType.SERIAL.toString())) {
+ log.debug("Setting job type to single");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_SINGLE);
+ } else if (jobType.equalsIgnoreCase(JobType.MPI.toString())) {
+ log.debug("Setting job type to mpi");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_MPI);
+ } else if (jobType.equalsIgnoreCase(JobType.MULTIPLE.toString())) {
+ log.debug("Setting job type to multiple");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_MULTIPLE);
+ } else if (jobType.equalsIgnoreCase(JobType.CONDOR.toString())) {
+ jobAttr.setJobType(GramAttributes.JOBTYPE_CONDOR);
+ }
+
+ return jobAttr;
+ }
+}
Added: airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java?rev=1441167&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java (added)
+++ airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java Thu Jan 31 19:52:57 2013
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import com.amazonaws.services.importexport.model.JobType;
+import org.apache.airavata.client.AiravataAPIFactory;
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.commons.gfac.type.*;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacAPI;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.ApplicationContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.schemas.gfac.*;
+import org.apache.commons.lang.SystemUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+public class GramProviderTest {
+ private JobExecutionContext jobExecutionContext;
+ @Before
+ public void setUp() throws Exception {
+
+ GFacConfiguration gFacConfiguration = new GFacConfiguration(null);
+ gFacConfiguration.setMyProxyLifeCycle(3600);
+ gFacConfiguration.setMyProxyServer("myproxy.teragrid.org");
+ gFacConfiguration.setMyProxyUser("ogce");
+ gFacConfiguration.setMyProxyPassphrase("Jdas7wph");
+ gFacConfiguration.setTrustedCertLocation("/Users/lahirugunathilake/Downloads/certificates");
+ //have to set InFlwo Handlers and outFlowHandlers
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration);
+ ApplicationContext applicationContext = new ApplicationContext();
+ jobExecutionContext.setApplicationContext(applicationContext);
+ /*
+ * Host
+ */
+ HostDescription host = new HostDescription(GlobusHostType.type);
+ host.getType().setHostAddress("ranger.tacc.teragrid.org");
+ host.getType().setHostName("ranger");
+ ((GlobusHostType)host.getType()).setGlobusGateKeeperEndPointArray(new String[]{"gatekeeper.ranger.tacc.teragrid.org:2119/jobmanager-sge"});
+ ((GlobusHostType)host.getType()).setGridFTPEndPointArray(new String[]{"gsiftp://gridftp.ranger.tacc.teragrid.org:2811/"});
+ applicationContext.setHostDescription(host);
+ /*
+ * App
+ */
+ ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type);
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType)appDesc.getType();
+ ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
+ name.setStringValue("EchoLocal");
+ app.setApplicationName(name);
+ ProjectAccountType projectAccountType = app.addNewProjectAccount();
+ projectAccountType.setProjectAccountNumber("TG-STA110014S");
+
+ QueueType queueType = app.addNewQueue();
+ queueType.setQueueName("development");
+
+ app.setCpuCount(1);
+ app.setJobType(JobTypeType.SERIAL);
+ app.setNodeCount(1);
+ app.setProcessorsPerNode(1);
+
+ /*
+ * Use bat file if it is compiled on Windows
+ */
+ app.setExecutableLocation("/bin/echo");
+
+ /*
+ * Default tmp location
+ */
+ String tempDir = "/scratch/01437/ogce/test";
+ String date = (new Date()).toString();
+ date = date.replaceAll(" ", "_");
+ date = date.replaceAll(":", "_");
+
+ tempDir = tempDir + File.separator
+ + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID();
+
+ System.out.println(tempDir);
+ app.setScratchWorkingDirectory(tempDir);
+ app.setStaticWorkingDirectory(tempDir);
+ app.setInputDataDirectory(tempDir + File.separator + "input");
+ app.setOutputDataDirectory(tempDir + File.separator + "output");
+ app.setStandardOutput(tempDir + File.separator + "echo.stdout");
+ app.setStandardError(tempDir + File.separator + "echo.stderr");
+
+ applicationContext.setApplicationDeploymentDescription(appDesc);
+
+ /*
+ * Service
+ */
+ ServiceDescription serv = new ServiceDescription();
+ serv.getType().setName("SimpleEcho");
+
+ List<InputParameterType> inputList = new ArrayList<InputParameterType>();
+ InputParameterType input = InputParameterType.Factory.newInstance();
+ input.setParameterName("echo_input");
+ input.setParameterType(StringParameterType.Factory.newInstance());
+ inputList.add(input);
+ InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
+ .size()]);
+
+ List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
+ OutputParameterType output = OutputParameterType.Factory.newInstance();
+ output.setParameterName("echo_output");
+ output.setParameterType(StringParameterType.Factory.newInstance());
+ outputList.add(output);
+ OutputParameterType[] outputParamList = outputList
+ .toArray(new OutputParameterType[outputList.size()]);
+
+ serv.getType().setInputParametersArray(inputParamList);
+ serv.getType().setOutputParametersArray(outputParamList);
+
+ applicationContext.setServiceDescription(serv);
+
+ MessageContext inMessage = new MessageContext();
+ ActualParameter echo_input = new ActualParameter();
+ ((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ inMessage.addParameter("echo_input", echo_input);
+
+ jobExecutionContext.setInMessageContext(inMessage);
+
+ MessageContext outMessage = new MessageContext();
+ ActualParameter echo_out = new ActualParameter();
+// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ outMessage.addParameter("echo_output", echo_out);
+
+ jobExecutionContext.setOutMessageContext(outMessage);
+
+ }
+
+ @Test
+ public void testGramProvider() throws GFacException {
+ GFacAPI gFacAPI = new GFacAPI();
+ gFacAPI.submitJob(jobExecutionContext);
+ MessageContext outMessageContext = jobExecutionContext.getOutMessageContext();
+ Assert.assertEquals(MappingFactory.toString((ActualParameter)outMessageContext.getParameter("echo_output")), "hello");
+ }
+}