You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/13 17:07:34 UTC
[1/2] airavata git commit: Refactored code and Fixed Job statues
update method in GfacUtils
Repository: airavata
Updated Branches:
refs/heads/master ec9b6fe42 -> 7d391f11a
Refactored code and Fixed Job statues update method in GfacUtils
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0f5ec6f9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0f5ec6f9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0f5ec6f9
Branch: refs/heads/master
Commit: 0f5ec6f91c8faac16902b05358c773b377341332
Parents: ea93cc1
Author: shamrath <sh...@gmail.com>
Authored: Wed May 13 11:06:47 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Wed May 13 11:06:47 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/utils/GFacUtils.java | 660 +------------------
.../gsissh/provider/impl/GSISSHProvider.java | 13 +-
.../gfac/local/provider/impl/LocalProvider.java | 4 +-
.../gfac/ssh/provider/impl/SSHProvider.java | 61 +-
4 files changed, 23 insertions(+), 715 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/0f5ec6f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 4cd850d..599b1ce 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -26,6 +26,7 @@ import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.DBUtil;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
@@ -40,6 +41,8 @@ import org.apache.airavata.gfac.core.states.GfacHandlerState;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.messaging.event.JobIdentifier;
+import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ChildDataType;
@@ -186,587 +189,15 @@ public class GFacUtils {
return buf.toString();
}
-// public static ActualParameter getInputActualParameter(Parameter parameter,
-// DataObjectType element) {
-// ActualParameter actualParameter = new ActualParameter();
-// if ("String".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StringParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((StringParameterType) actualParameter.getType())
-// .setValue(element.getValue());
-// } else {
-// ((StringParameterType) actualParameter.getType()).setValue("");
-// }
-// } else if ("Double".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(DoubleParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((DoubleParameterType) actualParameter.getType())
-// .setValue(new Double(element.getValue()));
-// }
-// } else if ("Integer".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(IntegerParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((IntegerParameterType) actualParameter.getType())
-// .setValue(new Integer(element.getValue()));
-// }
-// } else if ("Float".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FloatParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((FloatParameterType) actualParameter.getType())
-// .setValue(new Float(element.getValue()));
-// }
-// } else if ("Boolean".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(BooleanParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((BooleanParameterType) actualParameter.getType())
-// .setValue(new Boolean(element.getValue()));
-// }
-// } else if ("File".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FileParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((FileParameterType) actualParameter.getType())
-// .setValue(element.getValue());
-// }
-// } else if ("URI".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(URIParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((URIParameterType) actualParameter.getType()).setValue(element
-// .getValue());
-// } else {
-// ((URIParameterType) actualParameter.getType()).setValue("");
-// }
-//
-// } else if ("StdOut".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StdOutParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((StdOutParameterType) actualParameter.getType())
-// .setValue(element.getValue());
-// } else {
-// ((StdOutParameterType) actualParameter.getType()).setValue("");
-// }
-//
-// } else if ("StdErr".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StdErrParameterType.type);
-// if (!"".equals(element.getValue())) {
-// ((StdErrParameterType) actualParameter.getType())
-// .setValue(element.getValue());
-// } else {
-// ((StdErrParameterType) actualParameter.getType()).setValue("");
-// }
-//
-// }
-// return actualParameter;
-// }
-
-// public static ActualParameter getInputActualParameter(Parameter parameter,
-// OMElement element) {
-// OMElement innerelement = null;
-// ActualParameter actualParameter = new ActualParameter();
-// if ("String".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StringParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((StringParameterType) actualParameter.getType())
-// .setValue(element.getText());
-// } else if (element.getChildrenWithLocalName("value").hasNext()) {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// ((StringParameterType) actualParameter.getType())
-// .setValue(innerelement.getText());
-// } else {
-// ((StringParameterType) actualParameter.getType()).setValue("");
-// }
-// } else if ("Double".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(DoubleParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((DoubleParameterType) actualParameter.getType())
-// .setValue(new Double(innerelement.getText()));
-// } else {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// ((DoubleParameterType) actualParameter.getType())
-// .setValue(new Double(innerelement.getText()));
-// }
-// } else if ("Integer".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(IntegerParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((IntegerParameterType) actualParameter.getType())
-// .setValue(new Integer(element.getText()));
-// } else {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// ((IntegerParameterType) actualParameter.getType())
-// .setValue(new Integer(innerelement.getText()));
-// }
-// } else if ("Float".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FloatParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((FloatParameterType) actualParameter.getType())
-// .setValue(new Float(element.getText()));
-// } else {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// ((FloatParameterType) actualParameter.getType())
-// .setValue(new Float(innerelement.getText()));
-// }
-// } else if ("Boolean".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(BooleanParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((BooleanParameterType) actualParameter.getType())
-// .setValue(new Boolean(element.getText()));
-// } else {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// ((BooleanParameterType) actualParameter.getType())
-// .setValue(Boolean.parseBoolean(innerelement.getText()));
-// }
-// } else if ("File".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FileParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((FileParameterType) actualParameter.getType())
-// .setValue(element.getText());
-// } else {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// ((FileParameterType) actualParameter.getType())
-// .setValue(innerelement.getText());
-// }
-// } else if ("URI".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(URIParameterType.type);
-// if (!"".equals(element.getText())) {
-// ((URIParameterType) actualParameter.getType()).setValue(element
-// .getText());
-// } else if (element.getChildrenWithLocalName("value").hasNext()) {
-// innerelement = (OMElement) element.getChildrenWithLocalName(
-// "value").next();
-// System.out.println(actualParameter.getType().toString());
-// log.debug(actualParameter.getType().toString());
-// ((URIParameterType) actualParameter.getType())
-// .setValue(innerelement.getText());
-// } else {
-// ((URIParameterType) actualParameter.getType()).setValue("");
-// }
-// } else if ("StringArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StringArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((StringArrayType) actualParameter.getType()).insertValue(
-// i++, arrayValue);
-// }
-// } else {
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((StringArrayType) actualParameter.getType()).insertValue(
-// i++, innerelement.getText());
-// }
-// }
-// } else if ("DoubleArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(DoubleArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((DoubleArrayType) actualParameter.getType()).insertValue(
-// i++, new Double(arrayValue));
-// }
-// } else {
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((DoubleArrayType) actualParameter.getType()).insertValue(
-// i++, new Double(innerelement.getText()));
-// }
-// }
-//
-// } else if ("IntegerArray"
-// .equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(IntegerArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((IntegerArrayType) actualParameter.getType()).insertValue(
-// i++, new Integer(arrayValue));
-// }
-// } else {
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((IntegerArrayType) actualParameter.getType()).insertValue(
-// i++, new Integer(innerelement.getText()));
-// }
-// }
-// } else if ("FloatArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FloatArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((FloatArrayType) actualParameter.getType()).insertValue(
-// i++, new Float(arrayValue));
-// }
-// } else {
-//
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((FloatArrayType) actualParameter.getType()).insertValue(
-// i++, new Float(innerelement.getText()));
-// }
-// }
-// } else if ("BooleanArray"
-// .equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(BooleanArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((BooleanArrayType) actualParameter.getType()).insertValue(
-// i++, new Boolean(arrayValue));
-// }
-// } else {
-//
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((BooleanArrayType) actualParameter.getType()).insertValue(
-// i++, new Boolean(innerelement.getText()));
-// }
-// }
-// } else if ("FileArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FileArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((FileArrayType) actualParameter.getType()).insertValue(
-// i++, arrayValue);
-// }
-// } else {
-//
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((FileArrayType) actualParameter.getType()).insertValue(
-// i++, innerelement.getText());
-// }
-// }
-// } else if ("URIArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(URIArrayType.type);
-// Iterator value = element.getChildrenWithLocalName("value");
-// int i = 0;
-// if (!"".equals(element.getText())) {
-// String[] list = StringUtil.getElementsFromString(element
-// .getText());
-// for (String arrayValue : list) {
-// ((URIArrayType) actualParameter.getType()).insertValue(i++,
-// arrayValue);
-// }
-// } else {
-//
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((URIArrayType) actualParameter.getType()).insertValue(i++,
-// innerelement.getText());
-// }
-// }
-// }
-// return actualParameter;
-// }
-
-// public static ActualParameter getInputActualParameter(Parameter parameter,
-// String inputVal) throws GFacException {
-// OMElement innerelement = null;
-// ActualParameter actualParameter = new ActualParameter();
-// if ("String".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StringParameterType.type);
-// ((StringParameterType) actualParameter.getType())
-// .setValue(inputVal);
-// } else if ("Double".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(DoubleParameterType.type);
-// ((DoubleParameterType) actualParameter.getType())
-// .setValue(new Double(inputVal));
-// } else if ("Integer".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(IntegerParameterType.type);
-// ((IntegerParameterType) actualParameter.getType())
-// .setValue(new Integer(inputVal));
-// } else if ("Float".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FloatParameterType.type);
-// ((FloatParameterType) actualParameter.getType())
-// .setValue(new Float(inputVal));
-// } else if ("Boolean".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(BooleanParameterType.type);
-// ((BooleanParameterType) actualParameter.getType())
-// .setValue(new Boolean(inputVal));
-// } else if ("File".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FileParameterType.type);
-// ((FileParameterType) actualParameter.getType()).setValue(inputVal);
-// } else if ("URI".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(URIParameterType.type);
-// ((URIParameterType) actualParameter.getType()).setValue(inputVal);
-// } else if ("StringArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(StringArrayType.type);
-// Iterator iterator = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (iterator.hasNext()) {
-// innerelement = (OMElement) iterator.next();
-// ((StringArrayType) actualParameter.getType()).insertValue(i++,
-// innerelement.getText());
-// }
-// } else if ("DoubleArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(DoubleArrayType.type);
-// Iterator value = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((DoubleArrayType) actualParameter.getType()).insertValue(i++,
-// new Double(innerelement.getText()));
-// }
-// } else if ("IntegerArray"
-// .equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(IntegerArrayType.type);
-// Iterator value = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((IntegerArrayType) actualParameter.getType()).insertValue(i++,
-// new Integer(innerelement.getText()));
-// }
-// } else if ("FloatArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FloatArrayType.type);
-// Iterator value = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((FloatArrayType) actualParameter.getType()).insertValue(i++,
-// new Float(innerelement.getText()));
-// }
-// } else if ("BooleanArray"
-// .equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(BooleanArrayType.type);
-// Iterator value = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((BooleanArrayType) actualParameter.getType()).insertValue(i++,
-// new Boolean(innerelement.getText()));
-// }
-// } else if ("FileArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(FileArrayType.type);
-// Iterator value = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((FileArrayType) actualParameter.getType()).insertValue(i++,
-// innerelement.getText());
-// }
-// } else if ("URIArray".equals(parameter.getParameterType().getName())) {
-// actualParameter = new ActualParameter(URIArrayType.type);
-// Iterator value = Arrays.asList(
-// StringUtil.getElementsFromString(inputVal)).iterator();
-// int i = 0;
-// while (value.hasNext()) {
-// innerelement = (OMElement) value.next();
-// ((URIArrayType) actualParameter.getType()).insertValue(i++,
-// innerelement.getText());
-// }
-// } else {
-// throw new GFacException(
-// "Input parameters are not configured properly ");
-// }
-// return actualParameter;
-// }
-
-// public static ApplicationJob createApplicationJob(
-// JobExecutionContext jobExecutionContext) {
-// ApplicationJob appJob = new ApplicationJob();
-// appJob.setExperimentId((String) jobExecutionContext
-// .getProperty(Constants.PROP_TOPIC));
-// appJob.setWorkflowExecutionId(appJob.getExperimentId());
-// appJob.setNodeId((String) jobExecutionContext
-// .getProperty(Constants.PROP_WORKFLOW_NODE_ID));
-// appJob.setServiceDescriptionId(jobExecutionContext
-// .getApplicationContext().getServiceDescription().getType()
-// .getName());
-// appJob.setHostDescriptionId(jobExecutionContext.getApplicationContext()
-// .getHostDescription().getType().getHostName());
-// appJob.setApplicationDescriptionId(jobExecutionContext
-// .getApplicationContext().getApplicationDeploymentDescription()
-// .getType().getApplicationName().getStringValue());
-// return appJob;
-// }
-
-// public static void updateApplicationJobStatusUpdateTime(
-// JobExecutionContext context, String jobId, Date statusUpdateTime) {
-// AiravataAPI airavataAPI = context.getGFacConfiguration()
-// .getAiravataAPI();
-// try {
-// airavataAPI.getProvenanceManager()
-// .updateApplicationJobStatusUpdateTime(jobId,
-// statusUpdateTime);
-// } catch (AiravataAPIInvocationException e) {
-// log.error("Error in updating application job status time "
-// + statusUpdateTime.toString() + " for job Id " + jobId
-// + "!!!", e);
-// }
-// }
-//
-// public static void updateApplicationJobStatus(JobExecutionContext context,
-// String jobId, ApplicationJobStatus status, Date statusUpdateTime) {
-// AiravataAPI airavataAPI = context.getGFacConfiguration()
-// .getAiravataAPI();
-// try {
-// airavataAPI.getProvenanceManager().updateApplicationJobStatus(
-// jobId, status, statusUpdateTime);
-// } catch (AiravataAPIInvocationException e) {
-// log.error(
-// "Error in updating application job status "
-// + status.toString() + " for job Id " + jobId
-// + "!!!", e);
-// }
-// }
-
-// /**
-// * Gets the job ids given experiment id.
-// *
-// * @param context
-// * The job execution context.
-// * @param experimentId
-// * The experiment id.
-// * @return List of job ids relevant to given experiment id.
-// */
-// public static List<ApplicationJob> getJobIds(JobExecutionContext context,
-// String experimentId) {
-//
-// AiravataAPI airavataAPI = context.getGFacConfiguration()
-// .getAiravataAPI();
-// try {
-// return airavataAPI.getProvenanceManager().getApplicationJobs(
-// experimentId, null, null);
-// } catch (AiravataAPIInvocationException e) {
-// log.error("Error retrieving application jobs for experiment id "
-// + experimentId, e);
-// }
-//
-// return new ArrayList<ApplicationJob>(0);
-// }
-
-// /**
-// * Gets the job ids given experiment id and workflow id.
-// *
-// * @param context
-// * The job execution context.
-// * @param experimentId
-// * The experiment id.
-// * @param workflowId
-// * The workflow id
-// * @return List of job ids relevant to given experiment id and workflow id.
-// */
-// public static List<ApplicationJob> getJobIds(JobExecutionContext context,
-// String experimentId, String workflowId) {
-//
-// AiravataAPI airavataAPI = context.getGFacConfiguration()
-// .getAiravataAPI();
-// try {
-// return airavataAPI.getProvenanceManager().getApplicationJobs(
-// experimentId, workflowId, null);
-// } catch (AiravataAPIInvocationException e) {
-// log.error("Error retrieving application jobs for experiment id "
-// + experimentId, " workflow id " + workflowId, e);
-// }
-//
-// return new ArrayList<ApplicationJob>(0);
-// }
-
-// /**
-// * Gets the job ids given experiment id and workflow id.
-// *
-// * @param context
-// * The job execution context.
-// * @param experimentId
-// * The experiment id.
-// * @param workflowId
-// * The workflow id
-// * @return List of job ids relevant to given experiment id and workflow id.
-// */
-// public static List<ApplicationJob> getJobIds(JobExecutionContext context,
-// String experimentId, String workflowId, String nodeId) {
-//
-// AiravataAPI airavataAPI = context.getGFacConfiguration()
-// .getAiravataAPI();
-// try {
-// return airavataAPI.getProvenanceManager().getApplicationJobs(
-// experimentId, workflowId, nodeId);
-// } catch (AiravataAPIInvocationException e) {
-// log.error("Error retrieving application jobs for experiment id "
-// + experimentId, " workflow id " + workflowId, e);
-// }
-//
-// return new ArrayList<ApplicationJob>(0);
-// }
-
- /*
- * public static RequestData getRequestData(Properties
- * configurationProperties) {
- *
- * RequestData requestData = new RequestData();
- *
- * requestData.setMyProxyServerUrl(configurationProperties.getProperty(Constants
- * .MYPROXY_SERVER));
- * requestData.setMyProxyUserName(configurationProperties.
- * getProperty(Constants.MYPROXY_USER));
- * requestData.setMyProxyPassword(configurationProperties
- * .getProperty(Constants.MYPROXY_PASS));
- *
- * int lifeTime; String sLife =
- * configurationProperties.getProperty(Constants.MYPROXY_LIFE); if (sLife !=
- * null) { lifeTime = Integer.parseInt(sLife);
- * requestData.setMyProxyLifeTime(lifeTime); } else {
- * log.info("The configuration does not specify a default life time"); }
- *
- *
- *
- * }
- */
-
-// public static void recordApplicationJob(JobExecutionContext context,
-// ApplicationJob job) {
-// AiravataAPI airavataAPI = context.getGFacConfiguration()
-// .getAiravataAPI();
-// try {
-// airavataAPI.getProvenanceManager().addApplicationJob(job);
-// } catch (AiravataAPIInvocationException e) {
-// log.error(
-// "Error in persisting application job data for application job "
-// + job.getJobId() + "!!!", e);
-// }
-// }
-
public static void saveJobStatus(JobExecutionContext jobExecutionContext,
- JobDetails details, JobState state) throws GFacException {
+ JobDetails details, JobState state, MonitorPublisher monitorPublisher) throws GFacException {
try {
- Registry registry = jobExecutionContext.getRegistry();
- JobStatus status = new JobStatus();
- status.setJobState(state);
- details.setJobStatus(status);
- registry.add(ChildDataType.JOB_DETAIL, details,
- new CompositeIdentifier(jobExecutionContext.getTaskData()
- .getTaskID(), details.getJobID()));
- } catch (Exception e) {
+ JobIdentifier identifier = new JobIdentifier(details.getJobID(), jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ JobStatusChangeRequestEvent jobStatusChangeRequestEvent = new JobStatusChangeRequestEvent(state, identifier);
+ monitorPublisher.publish(jobStatusChangeRequestEvent);
+ } catch (Exception e) {
throw new GFacException("Error persisting job status"
+ e.getLocalizedMessage(), e);
}
@@ -810,42 +241,6 @@ public class GFacUtils {
}
}
-// public static Map<String, Object> getInMessageContext(
-// List<DataObjectType> experimentData, Parameter[] parameters)
-// throws GFacException {
-// HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();
-// Map<String, DataObjectType> map = new HashMap<String, DataObjectType>();
-// for (DataObjectType objectType : experimentData) {
-// map.put(objectType.getKey(), objectType);
-// }
-// for (int i = 0; i < parameters.length; i++) {
-// DataObjectType input = map.get(parameters[i].getParameterName());
-// if (input != null) {
-// DataType t = DataType.STRING;
-// String type = parameters[i].getParameterType().getType().toString().toUpperCase();
-// if (type.equals("STRING")){
-// t=DataType.STRING;
-// }else if (type.equals("INTEGER")){
-// t=DataType.INTEGER;
-// }else if (type.equals("FLOAT")){
-// //FIXME
-// t=DataType.INTEGER;
-// }else if (type.equals("URI")){
-// t=DataType.URI;
-// }
-// input.setType(t);
-// stringObjectHashMap
-// .put(parameters[i].getParameterName(), GFacUtils
-// .getInputActualParameter(parameters[i], input));
-// } else {
-// throw new GFacException(
-// "Error finding the parameter: parameter Name"
-// + parameters[i].getParameterName());
-// }
-// }
-// return stringObjectHashMap;
-// }
-
public static Map<String, Object> getInputParamMap(List<InputDataObjectType> experimentData) throws GFacException {
Map<String, Object> map = new HashMap<String, Object>();
for (InputDataObjectType objectType : experimentData) {
@@ -853,41 +248,6 @@ public class GFacUtils {
}
return map;
}
-//
-// public static Map<String, Object> getOutMessageContext(
-// List<DataObjectType> experimentData, Parameter[] parameters)
-// throws GFacException {
-// HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();
-// Map<String, DataObjectType> map = new HashMap<String, DataObjectType>();
-// for (DataObjectType objectType : experimentData) {
-// map.put(objectType.getKey(), objectType);
-// }
-// for (int i = 0; i < parameters.length; i++) {
-// DataObjectType output = map.get(parameters[i].getParameterName());
-// if (output==null){
-// output=new DataObjectType();
-// output.setKey(parameters[i].getParameterName());
-// output.setValue("");
-// String type = parameters[i].getParameterType().getType().toString().toUpperCase();
-// DataType t = DataType.STRING;
-// if (type.equals("STRING")){
-// t=DataType.STRING;
-// }else if (type.equals("INTEGER")){
-// t=DataType.INTEGER;
-// }else if (type.equals("FLOAT")){
-// //FIXME
-// t=DataType.INTEGER;
-// }else if (type.equals("URI")){
-// t=DataType.URI;
-// }
-// output.setType(t);
-// }
-// stringObjectHashMap
-// .put(parameters[i].getParameterName(), GFacUtils
-// .getInputActualParameter(parameters[i], output));
-// }
-// return stringObjectHashMap;
-// }
public static Map<String, Object> getOuputParamMap(List<OutputDataObjectType> experimentData) throws GFacException {
Map<String, Object> map = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/airavata/blob/0f5ec6f9/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 32b3f93..b41ab75 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -20,7 +20,6 @@
*/
package org.apache.airavata.gfac.gsissh.provider.impl;
-import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -114,10 +113,10 @@ public class GSISSHProvider extends AbstractProvider {
jobExecutionContext.setJobDetails(jobDetails);
if (jobID == null) {
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
} else {
jobDetails.setJobID(jobID.split("\\.")[0]);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher);
}
data.append(",jobId=").append(jobDetails.getJobID());
@@ -129,7 +128,7 @@ public class GSISSHProvider extends AbstractProvider {
String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
@@ -208,20 +207,20 @@ public class GSISSHProvider extends AbstractProvider {
log.error("No Job Id is set, so cannot perform the cancel operation !!!");
return;
}
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
// we know this host is type GsiSSHHostType
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0f5ec6f9/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index 0bdf190..6d84cf2 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.local.provider.impl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -36,7 +35,6 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.provider.utils.ProviderUtils;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.core.utils.OutputUtils;
import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter;
@@ -138,7 +136,7 @@ public class LocalProvider extends AbstractProvider {
jobDetails.setJobDescription(jobExecutionContext.getApplicationContext()
.getApplicationDeploymentDescription().getAppDeploymentDescription());
jobExecutionContext.setJobDetails(jobDetails);
- GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
+ GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP, monitorPublisher);
// running cmd
Process process = builder.start();
http://git-wip-us.apache.org/repos/asf/airavata/blob/0f5ec6f9/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index ca24502..1807339 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -103,7 +103,7 @@ public class SSHProvider extends AbstractProvider {
JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster);
details.setJobDescription(jobDescriptor.toXML());
- GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP);
+ GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP, monitorPublisher);
log.info(remoteFile);
File runscript = createShellScript(jobExecutionContext);
cluster.scpTo(remoteFile, runscript.getAbsolutePath());
@@ -127,18 +127,11 @@ public class SSHProvider extends AbstractProvider {
*/
String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
details.setJobDescription(executable);
-
-// GFacUtils.updateJobStatus(details, JobState.SUBMITTED);
RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable);
-
StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
-
CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput);
String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource");
-
log.info("stdout=" + stdOutputString);
-
-// GFacUtils.updateJobStatus(details, JobState.COMPLETE);
} catch (Exception e) {
throw new GFacProviderException(e.getMessage(), e);
}
@@ -167,7 +160,7 @@ public class SSHProvider extends AbstractProvider {
String jobID = cluster.submitBatchJob(jobDescriptor);
if (jobID != null) {
jobDetails.setJobID(jobID);
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher);
}
jobExecutionContext.setJobDetails(jobDetails);
String verifyJobId = verifyJobSubmission(cluster, jobDetails);
@@ -177,7 +170,7 @@ public class SSHProvider extends AbstractProvider {
jobID = verifyJobId;
jobDetails.setJobID(jobID);
}
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher);
}
if (jobID == null) {
log.error("Couldn't find remote jobId for JobName:" + jobDetails.getJobName() + ", ExperimentId:" + jobExecutionContext.getExperimentID());
@@ -191,14 +184,14 @@ public class SSHProvider extends AbstractProvider {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED, monitorPublisher);
GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} finally {
@@ -227,7 +220,6 @@ public class SSHProvider extends AbstractProvider {
}
-
public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
JobDetails jobDetails = jobExecutionContext.getJobDetails();
StringBuffer data = new StringBuffer();
@@ -249,7 +241,7 @@ public class SSHProvider extends AbstractProvider {
try {
if (jobDetails.getJobID() != null) {
cluster.cancelJob(jobDetails.getJobID());
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED, monitorPublisher);
} else {
log.error("No Job Id is set, so cannot perform the cancel operation !!!");
throw new GFacProviderException("Cancel request failed to cancel job as JobId is null in Job Execution Context");
@@ -273,42 +265,6 @@ public class SSHProvider extends AbstractProvider {
}
}
-// public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
-// List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-// if (daemonHandlers == null) {
-// daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-// }
-// ThreadedHandler pullMonitorHandler = null;
-// ThreadedHandler pushMonitorHandler = null;
-// String monitorMode = host.getMonitorMode();
-// for (ThreadedHandler threadedHandler : daemonHandlers) {
-// if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
-// pullMonitorHandler = threadedHandler;
-// if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
-// jobExecutionContext.setProperty("cancel","true");
-// pullMonitorHandler.invoke(jobExecutionContext);
-// } else {
-// log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
-// " to handle by the GridPullMonitorHandler");
-// }
-// } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
-// pushMonitorHandler = threadedHandler;
-// if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
-// log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
-// pushMonitorHandler.invoke(jobExecutionContext);
-// } else {
-// log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
-// " to handle by the GridPushMonitorHandler");
-// }
-// }
-// // have to handle the GridPushMonitorHandler logic
-// }
-// if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
-// log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
-// ", execution is configured as asynchronous, so Outhandler will not be invoked");
-// }
-// }
-
private File createShellScript(JobExecutionContext context) throws IOException {
String uniqueDir = jobExecutionContext.getApplicationName() + System.currentTimeMillis()
+ new Random().nextLong();
@@ -344,11 +300,6 @@ public class SSHProvider extends AbstractProvider {
String value = inputParamType.getValue();
cmd.append(value);
cmd.append(SPACE);
-// String[] values = ((URIArrayType) actualParameter.getType()).getValueArray();
-// for (String value : values) {
-// cmd.append(value);
-// cmd.append(SPACE);
-// }
} else {
String paramValue = inputParamType.getValue();
cmd.append(paramValue);
[2/2] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7d391f11
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7d391f11
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7d391f11
Branch: refs/heads/master
Commit: 7d391f11a7d660a508febdd76566a486fceb8ff1
Parents: 0f5ec6f ec9b6fe
Author: shamrath <sh...@gmail.com>
Authored: Wed May 13 11:07:12 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Wed May 13 11:07:12 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 39 +++--
.../server/OrchestratorServerHandler.java | 143 +++++++++----------
2 files changed, 85 insertions(+), 97 deletions(-)
----------------------------------------------------------------------