You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/16 22:03:02 UTC
airavata git commit: updated process context and create job descriptor
Repository: airavata
Updated Branches:
refs/heads/master d9b2df033 -> a2b6bdfd9
updated process context and create job descriptor
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a2b6bdfd
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a2b6bdfd
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a2b6bdfd
Branch: refs/heads/master
Commit: a2b6bdfd99ebc27724b5cc97fd589055f6785d89
Parents: d9b2df0
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Jun 16 16:02:58 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Jun 16 16:02:58 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gfac/core/GFacUtils.java | 891 ++++++++++++-------
.../gfac/core/context/ProcessContext.java | 66 ++
.../gfac/impl/task/JobSubmissionTaskImpl.java | 79 ++
3 files changed, 713 insertions(+), 323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a2b6bdfd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index ae48ed7..7fe289c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -20,42 +20,28 @@
*/
package org.apache.airavata.gfac.core;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.DBUtil;
-import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.states.GfacHandlerState;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
-import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.experiment.ActionableGroup;
-import org.apache.airavata.model.experiment.CorrectiveAction;
-import org.apache.airavata.model.experiment.ErrorCategory;
-import org.apache.airavata.model.experiment.ErrorDetails;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationParallelismType;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.experiment.ExperimentModel;
-import org.apache.airavata.model.experiment.ExperimentState;
-import org.apache.airavata.model.experiment.JobDetails;
-import org.apache.airavata.model.experiment.JobState;
-import org.apache.airavata.model.experiment.JobStatus;
-import org.apache.airavata.model.experiment.TaskState;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
+import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.model.status.ExperimentStatus;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.*;
-import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
@@ -70,27 +56,13 @@ import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import javax.xml.xpath.*;
+import java.io.*;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
//import org.apache.airavata.commons.gfac.type.ActualParameter;
@@ -132,20 +104,20 @@ public class GFacUtils {
}
}
- /**
- * This returns true if the give job is finished
- * otherwise false
- *
- * @param job
- * @return
- */
- public static boolean isJobFinished(JobDescriptor job) {
- if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
- return true;
- } else {
- return false;
- }
- }
+// /**
+// * This returns true if the give job is finished
+// * otherwise false
+// *
+// * @param job
+// * @return
+// */
+// public static boolean isJobFinished(JobDescriptor job) {
+// if (org.apache.airavata.gfac.core.cluster.JobStatus.C.toString().equals(job.getStatus())) {
+// return true;
+// } else {
+// return false;
+// }
+// }
/**
* This will read
@@ -171,22 +143,17 @@ public class GFacUtils {
return hours + ":" + minutes;
}
}
- /**
- * this can be used to do framework opertaions specific to different modes
- *
- * @param jobExecutionContext
- * @return
- */
- public static boolean isSynchronousMode(
- JobExecutionContext jobExecutionContext) {
- GFacConfiguration gFacConfiguration = jobExecutionContext
- .getGFacConfiguration();
- if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
- .getExecutionMode())) {
- return false;
- }
- return true;
- }
+
+// public static boolean isSynchronousMode(
+// JobExecutionContext jobExecutionContext) {
+// GFacConfiguration gFacConfiguration = jobExecutionContext
+// .getGFacConfiguration();
+// if (ExecutionMode.ASYNCHRONOUS.equals(gFacConfiguration
+// .getExecutionMode())) {
+// return false;
+// }
+// return true;
+// }
public static String readFileToString(String file)
throws FileNotFoundException, IOException {
@@ -254,65 +221,61 @@ public class GFacUtils {
return buf.toString();
}
- public static void saveJobStatus(JobExecutionContext jobExecutionContext,
- JobDetails details, JobState state) throws GFacException {
- try {
- // first we save job details to the registry for sa and then save the job status.
- ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
- JobStatus status = new JobStatus();
- status.setJobState(state);
- details.setJobStatus(status);
- experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details,
- new CompositeIdentifier(jobExecutionContext.getTaskData()
- .getTaskID(), details.getJobID()));
- JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
- jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
- } catch (Exception e) {
- throw new GFacException("Error persisting job status"
- + e.getLocalizedMessage(), e);
- }
- }
-
- public static void updateJobStatus(JobExecutionContext jobExecutionContext,
- JobDetails details, JobState state) throws GFacException {
- try {
- ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
- JobStatus status = new JobStatus();
- status.setJobState(state);
- status.setTimeOfStateChange(Calendar.getInstance()
- .getTimeInMillis());
- details.setJobStatus(status);
- experimentCatalog.update(
- ExperimentCatalogModelType.JOB_DETAIL,
- details, details.getJobID());
- } catch (Exception e) {
- throw new GFacException("Error persisting job status"
- + e.getLocalizedMessage(), e);
- }
- }
-
- public static void saveErrorDetails(
- JobExecutionContext jobExecutionContext, String errorMessage,
- CorrectiveAction action, ErrorCategory errorCatogory)
- throws GFacException {
- try {
- ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
- ErrorDetails details = new ErrorDetails();
- details.setActualErrorMessage(errorMessage);
- details.setCorrectiveAction(action);
- details.setActionableGroup(ActionableGroup.GATEWAYS_ADMINS);
- details.setCreationTime(Calendar.getInstance().getTimeInMillis());
- details.setErrorCategory(errorCatogory);
- experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
- jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e) {
- throw new GFacException("Error persisting job status"
- + e.getLocalizedMessage(), e);
- }
- }
+// public static void saveJobStatus(JobExecutionContext jobExecutionContext,
+// JobDetails details, JobState state) throws GFacException {
+// try {
+// // first we save job details to the registry for sa and then save the job status.
+// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
+// JobStatus status = new JobStatus();
+// status.setJobState(state);
+// details.setJobStatus(status);
+// experimentCatalog.add(ExpCatChildDataType.JOB_DETAIL, details,
+// new CompositeIdentifier(jobExecutionContext.getTaskData()
+// .getTaskID(), details.getJobID()));
+// JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getGatewayID());
+// JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+// jobExecutionContext.getLocalEventPublisher().publish(jobStatusChangeRequestEvent);
+// } catch (Exception e) {
+// throw new GFacException("Error persisting job status"
+// + e.getLocalizedMessage(), e);
+// }
+// }
+
+// public static void updateJobStatus(JobExecutionContext jobExecutionContext,
+// JobDetails details, JobState state) throws GFacException {
+// try {
+// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
+// JobStatus status = new JobStatus();
+// status.setJobState(state);
+// status.setTimeOfStateChange(Calendar.getInstance()
+// .getTimeInMillis());
+// details.setJobStatus(status);
+// experimentCatalog.update(
+// ExperimentCatalogModelType.JOB_DETAIL,
+// details, details.getJobID());
+// } catch (Exception e) {
+// throw new GFacException("Error persisting job status"
+// + e.getLocalizedMessage(), e);
+// }
+// }
+
+// public static void saveErrorDetails(
+// JobExecutionContext jobExecutionContext, String errorMessage)
+// throws GFacException {
+// try {
+// ExperimentCatalog experimentCatalog = jobExecutionContext.getExperimentCatalog();
+// ErrorModel details = new ErrorModel();
+// details.setActualErrorMessage(errorMessage);
+// details.setCreationTime(Calendar.getInstance().getTimeInMillis());
+// experimentCatalog.add(ExpCatChildDataType.ERROR_DETAIL, details,
+// jobExecutionContext.getTaskData().getTaskID());
+// } catch (Exception e) {
+// throw new GFacException("Error persisting job status"
+// + e.getLocalizedMessage(), e);
+// }
+// }
public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
Map<String, Object> map = new HashMap<String, Object>();
@@ -330,155 +293,155 @@ public class GFacUtils {
return map;
}
- public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
- JobExecutionContext jobExecutionContext)
- throws Exception {
- String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
- .getExperimentID());
- if (expState == null || expState.isEmpty()) {
- return GfacExperimentState.UNKNOWN;
- }
- return GfacExperimentState.findByValue(Integer.valueOf(expState));
- }
-
- public static boolean createHandlerZnode(CuratorFramework curatorClient,
- JobExecutionContext jobExecutionContext, String className)
- throws Exception {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
- Stat exists = curatorClient.checkExists().forPath(expState);
- if (exists == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
- } else {
- exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (exists == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
- }
- }
-
- exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (exists != null) {
- curatorClient.setData().withVersion(exists.getVersion())
- .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
- }
- return true;
- }
-
- public static boolean createHandlerZnode(CuratorFramework curatorClient,
- JobExecutionContext jobExecutionContext, String className,
- GfacHandlerState state) throws Exception {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
- Stat exists = curatorClient.checkExists().forPath(expState);
- if (exists == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(expState, new byte[0]);
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
- } else {
- exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (exists == null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
- }
- }
-
- exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (exists != null) {
- curatorClient.setData().withVersion(exists.getVersion())
- .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(state.getValue()).getBytes());
- }
- return true;
- }
-
- public static boolean updateHandlerState(CuratorFramework curatorClient,
- JobExecutionContext jobExecutionContext, String className,
- GfacHandlerState state) throws Exception {
- String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
- Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (exists != null) {
- curatorClient.setData().withVersion(exists.getVersion())
- .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
- } else {
- createHandlerZnode(curatorClient, jobExecutionContext, className, state);
- }
- return false;
- }
-
- public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
- JobExecutionContext jobExecutionContext, String className) {
- try {
- String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
- Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
- if (exists != null) {
- String stateVal = new String(curatorClient.getData().storingStatIn(exists)
- .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
- return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
- }
- return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
- // return false
- } catch (Exception e) {
- log.error("Error occured while getting zk node status", e);
- return null;
- }
- }
-
- // This method is dangerous because of moving the experiment data
- public static boolean createExperimentEntryForPassive(String experimentID,
- String taskID, CuratorFramework curatorClient, String experimentNode,
- String pickedChild, String tokenId, long deliveryTag) throws Exception {
- String experimentPath = experimentNode + File.separator + pickedChild;
- String newExperimentPath = experimentPath + File.separator + experimentID;
- Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
- String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
- if (oldExperimentPath == null) { // this means this is a very new experiment
- // are going to create a new node
- log.info("This is a new Job, so creating all the experiment docs from the scratch");
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
- String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(newExperimentPath + File.separator + "state",
- String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
-
- if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
- log.info("Created the node: " + stateNodePath + " successfully !");
- }else {
- log.error("Error creating node: " + stateNodePath + " successfully !");
- }
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
- } else {
- log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
- removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
- if(newExperimentPath.equals(oldExperimentPath)){
- log.info("Re-launch experiment came to the same GFac instance");
- }else {
- log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
- curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
- copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
- String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
- Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
- if(exists!=null) {
- curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
- .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
- curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
- ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
- }
- // After all the files are successfully transfered we delete the // old experiment,otherwise we do
- // not delete a single file
- log.info("After a successful copying of experiment data for an old experiment we delete the old data");
- log.info("Deleting experiment data: " + oldExperimentPath);
- ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
- }
- }
- return true;
- }
+// public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
+// JobExecutionContext jobExecutionContext)
+// throws Exception {
+// String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
+// .getExperimentID());
+// if (expState == null || expState.isEmpty()) {
+// return GfacExperimentState.UNKNOWN;
+// }
+// return GfacExperimentState.findByValue(Integer.valueOf(expState));
+// }
+//
+// public static boolean createHandlerZnode(CuratorFramework curatorClient,
+// JobExecutionContext jobExecutionContext, String className)
+// throws Exception {
+// String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+// jobExecutionContext.getExperimentID(), className);
+// Stat exists = curatorClient.checkExists().forPath(expState);
+// if (exists == null) {
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+// } else {
+// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+// if (exists == null) {
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+// }
+// }
+//
+// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+// if (exists != null) {
+// curatorClient.setData().withVersion(exists.getVersion())
+// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+// String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
+// }
+// return true;
+// }
+
+// public static boolean createHandlerZnode(CuratorFramework curatorClient,
+// JobExecutionContext jobExecutionContext, String className,
+// GfacHandlerState state) throws Exception {
+// String expState = AiravataZKUtils.getExpZnodeHandlerPath(
+// jobExecutionContext.getExperimentID(), className);
+// Stat exists = curatorClient.checkExists().forPath(expState);
+// if (exists == null) {
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(expState, new byte[0]);
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+// } else {
+// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+// if (exists == null) {
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
+// }
+// }
+//
+// exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+// if (exists != null) {
+// curatorClient.setData().withVersion(exists.getVersion())
+// .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+// String.valueOf(state.getValue()).getBytes());
+// }
+// return true;
+// }
+
+// public static boolean updateHandlerState(CuratorFramework curatorClient,
+// JobExecutionContext jobExecutionContext, String className,
+// GfacHandlerState state) throws Exception {
+// String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+// jobExecutionContext.getExperimentID(), className);
+// Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+// if (exists != null) {
+// curatorClient.setData().withVersion(exists.getVersion())
+// .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+// } else {
+// createHandlerZnode(curatorClient, jobExecutionContext, className, state);
+// }
+// return false;
+// }
+
+// public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
+// JobExecutionContext jobExecutionContext, String className) {
+// try {
+// String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+// Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+// if (exists != null) {
+// String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+// .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+// return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+// }
+// return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
+// // return false
+// } catch (Exception e) {
+// log.error("Error occured while getting zk node status", e);
+// return null;
+// }
+// }
+
+// // This method is dangerous because of moving the experiment data
+// public static boolean createExperimentEntryForPassive(String experimentID,
+// String taskID, CuratorFramework curatorClient, String experimentNode,
+// String pickedChild, String tokenId, long deliveryTag) throws Exception {
+// String experimentPath = experimentNode + File.separator + pickedChild;
+// String newExperimentPath = experimentPath + File.separator + experimentID;
+// Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+// String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
+// if (oldExperimentPath == null) { // this means this is a very new experiment
+// // are going to create a new node
+// log.info("This is a new Job, so creating all the experiment docs from the scratch");
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+// String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(newExperimentPath + File.separator + "state",
+// String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+//
+// if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+// log.info("Created the node: " + stateNodePath + " successfully !");
+// }else {
+// log.error("Error creating node: " + stateNodePath + " successfully !");
+// }
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
+// } else {
+// log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
+// removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
+// if(newExperimentPath.equals(oldExperimentPath)){
+// log.info("Re-launch experiment came to the same GFac instance");
+// }else {
+// log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+// curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+// copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+// String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
+// Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
+// if(exists!=null) {
+// curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+// .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+// curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+// ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
+// }
+// // After all the files are successfully transfered we delete the // old experiment,otherwise we do
+// // not delete a single file
+// log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+// log.info("Deleting experiment data: " + oldExperimentPath);
+// ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
+// }
+// }
+// return true;
+// }
private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
@@ -564,26 +527,26 @@ public class GFacUtils {
return false;
}
- public static void saveHandlerData(JobExecutionContext jobExecutionContext,
- StringBuffer data, String className) throws GFacHandlerException {
- try {
- CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
- if (curatorClient != null) {
- String expZnodeHandlerPath = AiravataZKUtils
- .getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(),
- className);
- Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
- if (exists != null) {
- curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
- } else {
- log.error("Saving Handler data failed, Stat is null");
- }
- }
- } catch (Exception e) {
- throw new GFacHandlerException(e);
- }
- }
+// public static void saveHandlerData(JobExecutionContext jobExecutionContext,
+// StringBuffer data, String className) throws GFacHandlerException {
+// try {
+// CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+// if (curatorClient != null) {
+// String expZnodeHandlerPath = AiravataZKUtils
+// .getExpZnodeHandlerPath(
+// jobExecutionContext.getExperimentID(),
+// className);
+// Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+// if (exists != null) {
+// curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+// } else {
+// log.error("Saving Handler data failed, Stat is null");
+// }
+// }
+// } catch (Exception e) {
+// throw new GFacHandlerException(e);
+// }
+// }
public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
@@ -685,31 +648,31 @@ public class GFacUtils {
public static ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws RegistryException {
ExperimentCatalog airavataExperimentCatalog = RegistryFactory.getDefaultExpCatalog();
- Experiment details = (Experiment) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ ExperimentModel details = (ExperimentModel) airavataExperimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
if (details == null) {
- details = new Experiment();
- details.setExperimentID(experimentId);
+ details = new ExperimentModel();
+ details.setExperimentId(experimentId);
}
- org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
- status.setExperimentState(state);
+ ExperimentStatus status = new ExperimentStatus();
+ status.setState(state);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState()) &&
- !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) {
- status.setExperimentState(state);
+ if (!ExperimentState.CANCELED.equals(details.getExperimentStatus().getState()) &&
+ !ExperimentState.CANCELING.equals(details.getExperimentStatus().getState())) {
+ status.setState(state);
} else {
- status.setExperimentState(details.getExperimentStatus().getExperimentState());
+ status.setState(details.getExperimentStatus().getState());
}
details.setExperimentStatus(status);
- log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
+ log.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getState().toString());
airavataExperimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, experimentId);
- return details.getExperimentStatus().getExperimentState();
+ return details.getExperimentStatus().getState();
}
public static boolean isFailedJob (JobExecutionContext jec) {
- JobStatus jobStatus = jec.getJobDetails().getJobStatus();
- if (jobStatus.getJobState() == JobState.FAILED) {
- return true;
- }
+// JobStatus jobStatus = jec.getJobDetails().getJobStatus();
+// if (jobStatus.getJobState() == JobState.FAILED) {
+// return true;
+// }
return false;
}
@@ -731,15 +694,297 @@ public class GFacUtils {
return false;
}
- public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskState state){
- TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
- }
+// public static void publishTaskStatus (JobExecutionContext jobExecutionContext, LocalEventPublisher publisher, TaskStatus state){
+// TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+// jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getGatewayID());
+// publisher.publish(new TaskStatusChangeRequestEvent(state, taskIdentity));
+// }
public static String getZKGfacServersParentPath() {
return GFacConstants.ZOOKEEPER_SERVERS_NODE + GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE;
}
+
+ public static JobDescriptor createJobDescriptor (ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException {
+ JobDescriptor jobDescriptor = new JobDescriptor();
+ String emailIds = null;
+ ProcessModel processModel = processContext.getProcessModel();
+ ResourceJobManager resourceJobManager = getResourceJobManager(processContext);
+ if (isEmailBasedJobMonitor(processContext)){
+ emailIds = ServerSettings.getEmailBasedMonitorAddress();
+ }
+ if (ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_ENABLE).equalsIgnoreCase("true")) {
+ String flags = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_FLAGS);
+ if (flags != null && processContext.getComputeResourceDescription().getHostName().equals("stampede.tacc.xsede.org")) {
+ flags = "ALL";
+ }
+ jobDescriptor.setMailOptions(flags);
+
+ String userJobNotifEmailIds = ServerSettings.getSetting(ServerSettings.JOB_NOTIFICATION_EMAILIDS);
+ if (userJobNotifEmailIds != null && !userJobNotifEmailIds.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds += ("," + userJobNotifEmailIds);
+ } else {
+ emailIds = userJobNotifEmailIds;
+ }
+ }
+ if (processModel.isEnableEmailNotification()) {
+ List<String> emailList = processModel.getEmailAddresses();
+ String elist = GFacUtils.listToCsv(emailList, ',');
+ if (elist != null && !elist.isEmpty()) {
+ if (emailIds != null && !emailIds.isEmpty()) {
+ emailIds = emailIds + "," + elist;
+ } else {
+ emailIds = elist;
+ }
+ }
+ }
+ }
+ if (emailIds != null && !emailIds.isEmpty()) {
+ log.info("Email list: " + emailIds);
+ jobDescriptor.setMailAddress(emailIds);
+ }
+
+ jobDescriptor.setInputDirectory(processContext.getInputDir());
+ jobDescriptor.setOutputDirectory(processContext.getOutputDir());
+ jobDescriptor.setExecutablePath(processContext.getApplicationDeploymentDescription().getExecutablePath());
+ jobDescriptor.setStandardOutFile(processContext.getStdoutLocation());
+ jobDescriptor.setStandardErrorFile(processContext.getStderrLocation());
+ String computationalProjectAccount = getComputeResourcePreference(processContext).getAllocationProjectNumber();
+ if (computationalProjectAccount != null) {
+ jobDescriptor.setAcountString(computationalProjectAccount);
+ }
+ // To make job name alpha numeric
+ jobDescriptor.setJobName("A" + String.valueOf(generateJobName()));
+ jobDescriptor.setWorkingDirectory(processContext.getWorkingDir());
+
+ List<String> inputValues = new ArrayList<String>();
+ List<InputDataObjectType> processInputs = processModel.getProcessInputs();
+
+ // sort the inputs first and then build the command ListR
+ Comparator<InputDataObjectType> inputOrderComparator = new Comparator<InputDataObjectType>() {
+ @Override
+ public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) {
+ return inputDataObjectType.getInputOrder() - t1.getInputOrder();
+ }
+ };
+ Set<InputDataObjectType> sortedInputSet = new TreeSet<InputDataObjectType>(inputOrderComparator);
+ for (InputDataObjectType input : processInputs) {
+ sortedInputSet.add(input);
+ }
+ for (InputDataObjectType inputDataObjectType : sortedInputSet) {
+ if (!inputDataObjectType.isRequiredToAddedToCommandLine()) {
+ continue;
+ }
+ if (inputDataObjectType.getApplicationArgument() != null
+ && !inputDataObjectType.getApplicationArgument().equals("")) {
+ inputValues.add(inputDataObjectType.getApplicationArgument());
+ }
+
+ if (inputDataObjectType.getValue() != null
+ && !inputDataObjectType.getValue().equals("")) {
+ if (inputDataObjectType.getType() == DataType.URI) {
+ // set only the relative path
+ String filePath = inputDataObjectType.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }else {
+ inputValues.add(inputDataObjectType.getValue());
+ }
+
+ }
+ }
+
+ List<OutputDataObjectType> processOutputs = processModel.getProcessOutputs();
+ for (OutputDataObjectType output : processOutputs) {
+ if (output.getApplicationArgument() != null
+ && !output.getApplicationArgument().equals("")) {
+ inputValues.add(output.getApplicationArgument());
+ }
+ if (output.getValue() != null && !output.getValue().equals("") && output.isRequiredToAddedToCommandLine()) {
+ if (output.getType() == DataType.URI) {
+ String filePath = output.getValue();
+ filePath = filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1, filePath.length());
+ inputValues.add(filePath);
+ }
+ }
+ }
+
+ jobDescriptor.setInputValues(inputValues);
+ jobDescriptor.setUserName(processContext.getRemoteCluster().getServerInfo().getUserName());
+ jobDescriptor.setShellName("/bin/bash");
+ jobDescriptor.setAllEnvExport(true);
+ jobDescriptor.setOwner(processContext.getRemoteCluster().getServerInfo().getUserName());
+
+ ComputationalResourceSchedulingModel scheduling = processModel.getResourceSchedule();
+ if (scheduling != null) {
+ int totalNodeCount = scheduling.getNodeCount();
+ int totalCPUCount = scheduling.getTotalCPUCount();
+
+ if (scheduling.getQueueName() != null) {
+ jobDescriptor.setQueueName(scheduling.getQueueName());
+ }
+
+ if (totalNodeCount > 0) {
+ jobDescriptor.setNodes(totalNodeCount);
+ }
+
+ if (scheduling.getQueueName() != null) {
+ jobDescriptor.setQueueName(scheduling.getQueueName());
+ }
+ if (totalCPUCount > 0) {
+ int ppn = totalCPUCount / totalNodeCount;
+ jobDescriptor.setProcessesPerNode(ppn);
+ jobDescriptor.setCPUCount(totalCPUCount);
+ }
+ if (scheduling.getWallTimeLimit() > 0) {
+ jobDescriptor.setMaxWallTime(String.valueOf(scheduling.getWallTimeLimit()));
+ if (resourceJobManager != null){
+ if(resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)){
+ jobDescriptor.setMaxWallTimeForLSF(String.valueOf(scheduling.getWallTimeLimit()));
+ }
+ }
+ }
+ if (scheduling.getTotalPhysicalMemory() > 0) {
+ jobDescriptor.setUsedMemory(scheduling.getTotalPhysicalMemory() + "");
+ }
+ } else {
+ log.error("Task scheduling cannot be null at this point..");
+ }
+ ApplicationDeploymentDescription appDepDescription = processContext.getApplicationDeploymentDescription();
+ List<String> moduleCmds = appDepDescription.getModuleLoadCmds();
+ if (moduleCmds != null) {
+ for (String moduleCmd : moduleCmds) {
+ jobDescriptor.addModuleLoadCommands(moduleCmd);
+ }
+ }
+ List<String> preJobCommands = appDepDescription.getPreJobCommands();
+ if (preJobCommands != null) {
+ for (String preJobCommand : preJobCommands) {
+ jobDescriptor.addPreJobCommand(parseCommand(preJobCommand, processContext));
+ }
+ }
+
+ List<String> postJobCommands = appDepDescription.getPostJobCommands();
+ if (postJobCommands != null) {
+ for (String postJobCommand : postJobCommands) {
+ jobDescriptor.addPostJobCommand(parseCommand(postJobCommand, processContext));
+ }
+ }
+
+ ApplicationParallelismType parallelism = appDepDescription.getParallelism();
+ if (parallelism != null){
+ if (parallelism == ApplicationParallelismType.MPI || parallelism == ApplicationParallelismType.OPENMP || parallelism == ApplicationParallelismType.OPENMP_MPI){
+ if (resourceJobManager != null){
+ Map<JobManagerCommand, String> jobManagerCommands = resourceJobManager.getJobManagerCommands();
+ if (jobManagerCommands != null && !jobManagerCommands.isEmpty()) {
+ for (JobManagerCommand command : jobManagerCommands.keySet()) {
+ if (command == JobManagerCommand.SUBMISSION) {
+ String commandVal = jobManagerCommands.get(command);
+ jobDescriptor.setJobSubmitter(commandVal);
+ }
+ }
+ }
+ }
+ }
+ }
+ return jobDescriptor;
+ }
+
+ private static int generateJobName() {
+ Random random = new Random();
+ int i = random.nextInt(Integer.MAX_VALUE);
+ i = i + 99999999;
+ if(i<0) {
+ i = i * (-1);
+ }
+ return i;
+ }
+
+ private static String parseCommand(String value, ProcessContext context) {
+ String parsedValue = value.replaceAll("\\$workingDir", context.getWorkingDir());
+ parsedValue = parsedValue.replaceAll("\\$inputDir", context.getInputDir());
+ parsedValue = parsedValue.replaceAll("\\$outputDir", context.getOutputDir());
+ return parsedValue;
+ }
+
+ public static ResourceJobManager getResourceJobManager(ProcessContext processContext) {
+ try {
+ JobSubmissionProtocol submissionProtocol = getPreferredJobSubmissionProtocol(processContext);
+ JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(processContext);
+ if (submissionProtocol == JobSubmissionProtocol.SSH) {
+ SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission != null) {
+ return sshJobSubmission.getResourceJobManager();
+ }
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (localJobSubmission != null) {
+ return localJobSubmission.getResourceJobManager();
+ }
+ }
+ } catch (AppCatalogException e) {
+ log.error("Error occured while retrieving resource job manager", e);
+ }
+ return null;
+ }
+
+ public static boolean isEmailBasedJobMonitor(ProcessContext processContext) throws GFacException, AppCatalogException {
+ JobSubmissionProtocol jobSubmissionProtocol = getPreferredJobSubmissionProtocol(processContext);
+ JobSubmissionInterface jobSubmissionInterface = getPreferredJobSubmissionInterface(processContext);
+ if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) {
+ String jobSubmissionInterfaceId = jobSubmissionInterface.getJobSubmissionInterfaceId();
+ SSHJobSubmission sshJobSubmission = processContext.getAppCatalog().getComputeResource().getSSHJobSubmission(jobSubmissionInterfaceId);
+ MonitorMode monitorMode = sshJobSubmission.getMonitorMode();
+ return monitorMode != null && monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR;
+ } else {
+ return false;
+ }
+ }
+
+ public static JobSubmissionInterface getPreferredJobSubmissionInterface (ProcessContext context) throws AppCatalogException {
+ try {
+ String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+ ComputeResourceDescription resourceDescription = context.getAppCatalog().getComputeResource().getComputeResource(resourceHostId);
+ List<JobSubmissionInterface> jobSubmissionInterfaces = resourceDescription.getJobSubmissionInterfaces();
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
+ Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
+ @Override
+ public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
+ return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder();
+ }
+ });
+ } else {
+ throw new AppCatalogException("Compute resource should have at least one job submission interface defined...");
+ }
+ return jobSubmissionInterfaces.get(0);
+ }catch (AppCatalogException e){
+ throw new AppCatalogException("Error occurred while retrieving data from app catalog", e);
+ }
+ }
+
+ public static JobSubmissionProtocol getPreferredJobSubmissionProtocol (ProcessContext context) throws AppCatalogException{
+ try {
+ GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
+ String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+ ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(context.getGatewayId(), resourceHostId);
+ return preference.getPreferredJobSubmissionProtocol();
+ } catch (AppCatalogException e) {
+ log.error("Error occurred while initializing app catalog", e);
+ throw new AppCatalogException("Error occurred while initializing app catalog", e);
+ }
+ }
+
+ public static ComputeResourcePreference getComputeResourcePreference(ProcessContext context) throws AppCatalogException {
+ try {
+ GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile();
+ String resourceHostId = context.getComputeResourceDescription().getComputeResourceId();
+ return gatewayProfile.getComputeResourcePreference(context.getGatewayId(), resourceHostId);
+ } catch (AppCatalogException e) {
+ log.error("Error occurred while initializing app catalog", e);
+ throw new AppCatalogException("Error occurred while initializing app catalog", e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a2b6bdfd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 16943fe..013e433 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -24,6 +24,9 @@ package org.apache.airavata.gfac.core.context;
import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.gfac.core.cluster.RemoteCluster;
import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.registry.cpi.AppCatalog;
@@ -44,10 +47,17 @@ public class ProcessContext {
private final String tokenId;
private ProcessModel processModel;
private String workingDir;
+ private String inputDir;
+ private String outputDir;
private List<Task> taskChain;
private GatewayResourceProfile gatewayResourceProfile;
+ private ComputeResourceDescription computeResourceDescription;
+ private ApplicationDeploymentDescription applicationDeploymentDescription;
+ private ApplicationInterfaceDescription applicationInterfaceDescription;
private RemoteCluster remoteCluster;
private Map<String, String> sshProperties;
+ private String stdoutLocation;
+ private String stderrLocation;
public ProcessContext(String processId, String gatewayId, String tokenId) {
this.processId = processId;
@@ -148,4 +158,60 @@ public class ProcessContext {
public void setSshProperties(Map<String, String> sshProperties) {
this.sshProperties = sshProperties;
}
+
+ public ComputeResourceDescription getComputeResourceDescription() {
+ return computeResourceDescription;
+ }
+
+ public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) {
+ this.computeResourceDescription = computeResourceDescription;
+ }
+
+ public ApplicationDeploymentDescription getApplicationDeploymentDescription() {
+ return applicationDeploymentDescription;
+ }
+
+ public void setApplicationDeploymentDescription(ApplicationDeploymentDescription applicationDeploymentDescription) {
+ this.applicationDeploymentDescription = applicationDeploymentDescription;
+ }
+
+ public ApplicationInterfaceDescription getApplicationInterfaceDescription() {
+ return applicationInterfaceDescription;
+ }
+
+ public void setApplicationInterfaceDescription(ApplicationInterfaceDescription applicationInterfaceDescription) {
+ this.applicationInterfaceDescription = applicationInterfaceDescription;
+ }
+
+ public String getStdoutLocation() {
+ return stdoutLocation;
+ }
+
+ public void setStdoutLocation(String stdoutLocation) {
+ this.stdoutLocation = stdoutLocation;
+ }
+
+ public String getStderrLocation() {
+ return stderrLocation;
+ }
+
+ public void setStderrLocation(String stderrLocation) {
+ this.stderrLocation = stderrLocation;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ public String getOutputDir() {
+ return outputDir;
+ }
+
+ public String getInputDir() {
+ return inputDir;
+ }
+
+ public void setInputDir(String inputDir) {
+ this.inputDir = inputDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a2b6bdfd/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
new file mode 100644
index 0000000..2696236
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/JobSubmissionTaskImpl.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.JobDescriptor;
+import org.apache.airavata.gfac.core.cluster.RemoteCluster;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class JobSubmissionTaskImpl implements JobSubmissionTask {
+ private static final Logger log = LoggerFactory.getLogger(JobSubmissionTaskImpl.class);
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskState execute(TaskContext taskContext) throws TaskException {
+ try {
+ ProcessContext processContext = taskContext.getParentProcessContext();
+ AppCatalog appCatalog = processContext.getAppCatalog();
+ String resourceHostId = processContext.getProcessModel().getResourceSchedule().getResourceHostId();
+ ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId);
+ LocalEventPublisher publisher = processContext.getLocalEventPublisher();
+ RemoteCluster remoteCluster = processContext.getRemoteCluster();
+ JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext);
+ } catch (AppCatalogException e) {
+ log.error("Error while instatiating app catalog",e);
+ throw new TaskException("Error while instatiating app catalog", e);
+ } catch (ApplicationSettingsException e) {
+ log.error("Error occurred while creating job descriptor", e);
+ throw new TaskException("Error occurred while creating job descriptor", e);
+ } catch (GFacException e) {
+ log.error("Error occurred while creating job descriptor", e);
+ throw new TaskException("Error occurred while creating job descriptor", e);
+ }
+
+
+ return null;
+ }
+
+ @Override
+ public TaskState recover(TaskContext taskContext) throws TaskException {
+ return null;
+ }
+}