You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/02/27 23:09:28 UTC
[1/2] git commit: Implementing initial version of orchestrator-service
Repository: airavata
Updated Branches:
refs/heads/master 2d2be1469 -> c9b9575a0
Implementing initial version of orchestrator-service
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/fa2601a7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/fa2601a7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/fa2601a7
Branch: refs/heads/master
Commit: fa2601a7d34e5f643ece14c5baa7b236497936f3
Parents: 7322247
Author: lahiru <la...@apache.org>
Authored: Thu Feb 27 17:08:54 2014 -0500
Committer: lahiru <la...@apache.org>
Committed: Thu Feb 27 17:08:54 2014 -0500
----------------------------------------------------------------------
.../apache/airavata/client/AiravataClient.java | 8 --
.../apache/airavata/job/monitor/MonitorID.java | 53 +++++++-
.../airavata/job/monitor/state/JobStatus.java | 3 +
.../gfac/context/JobExecutionContext.java | 10 ++
.../java/org/apache/airavata/gfac/cpi/GFac.java | 7 +-
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 94 +++++++-------
.../airavata-orchestrator-service/pom.xml | 5 +
.../server/OrchestratorServerHandler.java | 81 +++++++++---
.../src/main/resources/gfac-config.xml | 90 ++++++++++++++
.../src/main/resources/gsissh.properties | 26 ++++
.../src/main/resources/monitor.properties | 10 ++
.../src/main/resources/orchestrator.properties | 26 ++++
modules/orchestrator/orchestrator-core/pom.xml | 97 ++++++++-------
.../orchestrator/core/HangedJobWorker.java | 123 -------------------
.../orchestrator/core/NewJobWorker.java | 117 ------------------
.../core/impl/EmbeddedGFACJobSubmitter.java | 58 ++++-----
.../orchestrator/core/job/JobSubmitter.java | 18 +--
.../core/utils/OrchestratorUtils.java | 28 ++++-
.../validator/impl/SimpleAppDataValidator.java | 22 ++++
.../airavata/orchestrator/cpi/Orchestrator.java | 16 ++-
.../cpi/impl/SimpleOrchestratorImpl.java | 94 +++++++++++---
.../xbaya/invoker/EmbeddedGFacInvoker.java | 4 +-
22 files changed, 571 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/airavata-client/src/main/java/org/apache/airavata/client/AiravataClient.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/AiravataClient.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/AiravataClient.java
index daee364..1fa4909 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/AiravataClient.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/AiravataClient.java
@@ -63,18 +63,10 @@ public class AiravataClient extends Observable implements AiravataAPI {
.getLogger(AiravataClient.class);
public static final String REGISTRY = "JCR";
public static final String GFAC = "gfac";
- public static final String PROXYSERVER = "proxyserver";
public static final String MSGBOX = "msgbox";
public static final String BROKER = "broker";
- public static final String DEFAULT_GFAC_URL = "gfac.url";
- public static final String DEFAULT_MYPROXY_SERVER = "myproxy.url";
- public static final String DEFAULT_MESSAGE_BOX_URL = "messagebox.url";
- public static final String DEFAULT_BROKER_URL = "messagebroker.url";
- public static final String MYPROXYUSERNAME = "myproxy.username";
- public static final String MYPROXYPASS = "myproxy.password";
public static final String WITHLISTENER = "with.Listener";
public static final String WORKFLOWSERVICEURL = "xbaya.service.url";
- public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
private AiravataClientConfiguration clientConfiguration;
private String currentUser;
private URI regitryURI;
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index eae078a..5dee039 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
import java.util.Date;
+import java.util.Map;
+import java.util.Properties;
/*
This is the object which contains the data to identify a particular
@@ -49,19 +51,30 @@ public class MonitorID {
private AuthenticationInfo authenticationInfo = null;
- public MonitorID(HostDescription host, String jobID, String userName) {
+ private Map<String, Object> parameters;
+
+ private String experimentID;
+
+ private String taskID;
+
+
+ public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) {
this.host = host;
- this.jobID = jobID;
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.userName = userName;
+ this.jobID = jobID;
+ this.taskID = taskID;
+ this.experimentID = experimentID;
}
- public MonitorID(HostDescription host, String jobID, String userName,AuthenticationInfo authenticationInfo) {
+ public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName,AuthenticationInfo authenticationInfo) {
this.host = host;
- this.jobID = jobID;
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.authenticationInfo = authenticationInfo;
this.userName = userName;
+ this.jobID = jobID;
+ this.taskID = taskID;
+ this.experimentID = experimentID;
}
public HostDescription getHost() {
return host;
@@ -118,4 +131,36 @@ public class MonitorID {
public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
this.authenticationInfo = authenticationInfo;
}
+
+ public void addParameter(String key,Object value) {
+ this.parameters.put(key, value);
+ }
+
+ public Object getParameter(String key) {
+ return this.parameters.get(key);
+ }
+
+ public Map<String, Object> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Map<String, Object> parameters) {
+ this.parameters = parameters;
+ }
+
+ public String getExperimentID() {
+ return experimentID;
+ }
+
+ public void setExperimentID(String experimentID) {
+ this.experimentID = experimentID;
+ }
+
+ public String getTaskID() {
+ return taskID;
+ }
+
+ public void setTaskID(String taskID) {
+ this.taskID = taskID;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
index 957e168..e062757 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
@@ -23,6 +23,8 @@ package org.apache.airavata.job.monitor.state;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.model.workspace.experiment.JobState;
+import java.util.Properties;
+
/**
* This is the primary job state object used in
* through out the monitor module. This use airavata-data-model JobState enum
@@ -35,6 +37,7 @@ public class JobStatus {
private MonitorID monitorID;
+
public JobState getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
index 8f9e8d8..784d689 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java
@@ -31,6 +31,7 @@ import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.SecurityContext;
import org.apache.airavata.gfac.notification.GFacNotifier;
import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
public class JobExecutionContext extends AbstractContext{
@@ -47,6 +48,8 @@ public class JobExecutionContext extends AbstractContext{
private TaskDetails taskData;
+ private JobDetails jobDetails;
+
// private ContextHeaderDocument.ContextHeader contextHeader;
// Keep track of the current path of the message. Before hitting provider its in-path.
@@ -219,4 +222,11 @@ public class JobExecutionContext extends AbstractContext{
this.status = status;
}
+ public JobDetails getJobDetails() {
+ return jobDetails;
+ }
+
+ public void setJobDetails(JobDetails jobDetails) {
+ this.jobDetails = jobDetails;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
index 2358735..d18df07 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java
@@ -40,10 +40,11 @@ public interface GFac {
public JobExecutionContext submitJob(String experimentID,String taskID) throws GFacException;
/**
- * This method has to be invoked after submitting the job and have to make sure job is properly finished
- * @param jobExecutionContext
+ * This method has to be invoked after submitting the job and have to make sure job is properly finished
+ * @param experimentID
+ * @param taskID
* @throws GFacException
*/
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException;
+ public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 4dbca11..b728868 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -114,58 +114,63 @@ public class GFacImpl implements GFac {
public JobExecutionContext submitJob(String experimentID,String taskID) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
- TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID);
- // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
- // has to be same.
+ jobExecutionContext = createJEC(experimentID, taskID);
- // 1. Get the Task from the task ID and construct the Job object and save it in to registry
- // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
- String serviceName = taskData.getApplicationId();
+ submitJob(jobExecutionContext);
+ } catch (Exception e) {
+ log.error("Error inovoking the job with experiment ID: " + experimentID);
+ throw new GFacException(e);
+ }
+ return jobExecutionContext;
+ }
+
+ private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+ JobExecutionContext jobExecutionContext;TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID);
+ // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
+ // has to be same.
+
+ // 1. Get the Task from the task ID and construct the Job object and save it in to registry
+ // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
+ String serviceName = taskData.getApplicationId();
if (serviceName == null) {
throw new GFacException("Error executing the job because there is not Application Name in this Experiment");
}
- List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
- Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
- for (String hostDescName : applicationDescriptors.keySet()) {
- registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
- }
- Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
- HostScheduler hostScheduler = aClass.newInstance();
- HostDescription hostDescription = hostScheduler.schedule(registeredHosts);
+ List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+ Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+ for (String hostDescName : applicationDescriptors.keySet()) {
+ registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+ }
+ Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+ HostScheduler hostScheduler = aClass.newInstance();
+ HostDescription hostDescription = hostScheduler.schedule(registeredHosts);
- ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
+ ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
- ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostDescription.getType().getHostName());
- URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
- Properties configurationProperties = ServerSettings.getProperties();
- GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+ ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostDescription.getType().getHostName());
+ URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ Properties configurationProperties = ServerSettings.getProperties();
+ GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
- jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
- jobExecutionContext.setTaskData(taskData);
-
- ApplicationContext applicationContext = new ApplicationContext();
- applicationContext.setApplicationDeploymentDescription(applicationDescription);
- applicationContext.setHostDescription(hostDescription);
- applicationContext.setServiceDescription(serviceDescription);
- jobExecutionContext.setApplicationContext(applicationContext);
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+ jobExecutionContext.setTaskData(taskData);
- List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
- jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
- serviceDescription.getType().getInputParametersArray())));
+ ApplicationContext applicationContext = new ApplicationContext();
+ applicationContext.setApplicationDeploymentDescription(applicationDescription);
+ applicationContext.setHostDescription(hostDescription);
+ applicationContext.setServiceDescription(serviceDescription);
+ jobExecutionContext.setApplicationContext(applicationContext);
- HashMap<String, Object> outputData = new HashMap<String, Object>();
- jobExecutionContext.setOutMessageContext(new MessageContext(outputData));
+ List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
+ jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
+ serviceDescription.getType().getInputParametersArray())));
- jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
- jobExecutionContext.setExperimentID(experimentID);
+ HashMap<String, Object> outputData = new HashMap<String, Object>();
+ jobExecutionContext.setOutMessageContext(new MessageContext(outputData));
- addSecurityContext(hostDescription, configurationProperties, jobExecutionContext);
+ jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+ jobExecutionContext.setExperimentID(experimentID);
- submitJob(jobExecutionContext);
- } catch (Exception e) {
- log.error("Error inovoking the job with experiment ID: " + experimentID);
- throw new GFacException(e);
- }
+ addSecurityContext(hostDescription, configurationProperties, jobExecutionContext);
return jobExecutionContext;
}
@@ -271,7 +276,14 @@ public class GFacImpl implements GFac {
}
}
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ public void invokeOutFlowHandlers(String experimentID,String taskID) throws GFacException {
+ JobExecutionContext jobExecutionContext = null;
+ try {
+ jobExecutionContext = createJEC(experimentID, taskID);
+ Scheduler.schedule(jobExecutionContext);
+ } catch (Exception e) {
+ throw new GFacException(e);
+ }
List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
for (GFacHandlerConfig handlerClassName : handlers) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/airavata-orchestrator-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/pom.xml b/modules/orchestrator/airavata-orchestrator-service/pom.xml
index 24f4793..c1af70b 100644
--- a/modules/orchestrator/airavata-orchestrator-service/pom.xml
+++ b/modules/orchestrator/airavata-orchestrator-service/pom.xml
@@ -51,6 +51,11 @@
<artifactId>airavata-gfac-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-model-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<properties>
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index de5b47d..1ef609a 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -21,25 +21,38 @@
package org.apache.airavata.orchestrator.server;
+import org.apache.airavata.common.utils.ApplicationSettings;
import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.MonitorManager;
import org.apache.airavata.job.monitor.core.Monitor;
import org.apache.airavata.job.monitor.core.PullMonitor;
import org.apache.airavata.job.monitor.core.PushMonitor;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
import org.apache.airavata.orchestrator.cpi.OrchestratorService;
import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
+import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.validation.constraints.Null;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
+import java.util.List;
import java.util.Properties;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
@@ -51,6 +64,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private Registry registry;
+ private boolean pushMode = true;
+
+ GSIAuthenticationInfo authenticationInfo = null;
+
/**
* Query orchestrator server to fetch the CPI version
*/
@@ -68,12 +85,23 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
// first constructing the monitorManager and orchestrator, then fill the required properties
monitorManager = new MonitorManager();
orchestrator = new SimpleOrchestratorImpl();
- registry = new RegistryImpl();
+ registry = RegistryFactory.getDefaultRegistry();
// Filling monitorManager properties
properties.load(monitorUrl.openStream());
+ // we can keep a single user to do all the monitoring authentication for required machine..
+ String myProxyUser = properties.getProperty("myproxy.user");
+ String myProxyPass = properties.getProperty("myproxy.password");
+ String certPath = properties.getProperty("certificate.path");
+ String myProxyServer = properties.getProperty("myproxy.server");
+ authenticationInfo = new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
+ 7512, 17280000, certPath);
+
+ // loading Monitor configuration
String primaryMonitor = properties.getProperty("primaryMonitor");
String secondaryMonitor = properties.getProperty("secondaryMonitor");
+
+
if (primaryMonitor == null) {
log.error("Error loading primaryMonitor and there has to be a primary monitor");
} else {
@@ -81,6 +109,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
Monitor monitor = aClass.newInstance();
if (monitor instanceof PullMonitor) {
monitorManager.addPullMonitor((PullMonitor) monitor);
+ pushMode = false;
} else if (monitor instanceof PushMonitor) {
monitorManager.addPushMonitor((PushMonitor) monitor);
} else {
@@ -93,6 +122,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
// todo we do not support a secondary Monitor at this point
}
+ monitorManager.registerListener(orchestrator);
// Now Monitor Manager is properly configured, now we have to start the monitoring system.
// This will initialize all the required threads and required queues
monitorManager.launchMonitor();
@@ -128,23 +158,40 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
@Override
public boolean launchExperiment(String experimentId) throws TException {
//TODO: Write the Orchestrator implementaion
-
- /*
- Use the registry to take the Experiment Model object
- check the ExperimentModel object to check airavataAutoSchedule property
- if its set give an error telling that we do not support it
- else create a Task and save to the registry
- This should return the task ID
- if monitoring is in push mode, add the job to monitor queue, i hope by this time the host has finalized
- Get the task ID and invoke GFAC
- GFac will submit the job and return the jobID
- submit the job to minitor to monitoring queue after the submission if the monitoring is in pull mode
- RETURN;
- */
try {
- orchestrator.launchExperiment(experimentId);
- } catch (OrchestratorException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
+ MonitorID monitorID = null;
+ if(tasks.size() > 1){
+ log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
+ }
+ for(TaskDetails taskID:tasks) {
+ //iterate through all the generated tasks and performs the job submisssion+monitoring
+
+ Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
+ String userName = experiment.getUserName();
+
+ HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
+
+ // creating monitorID to register with monitoring queue
+
+ if(pushMode){
+ // during the pull we need the monitorID in the queue inadvance
+ // For this we have enough data at this point
+ monitorID = new MonitorID(hostDescription, null,taskID.getTaskID(),experimentId, userName);
+ monitorManager.addAJobToMonitor(monitorID);
+ }
+ // Launching job for each task
+ String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+ log.debug("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+ // if the monitoring is pull mode then we add the monitorID for each task after submitting
+ // the job with the jobID, otherwise we don't need the jobID
+ if(!pushMode) {
+ monitorID = new MonitorID(hostDescription, jobID,taskID.getTaskID(),experimentId, userName, authenticationInfo);
+ monitorManager.addAJobToMonitor(monitorID);
+ }
+ }
+ } catch (Exception e) {
+ throw new TException(e);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
new file mode 100644
index 0000000..61dca4f
--- /dev/null
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml
@@ -0,0 +1,90 @@
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one ~ or more
+ contributor license agreements. See the NOTICE file ~ distributed with this
+ work for additional information ~ regarding copyright ownership. The ASF
+ licenses this file ~ to you under the Apache License, Version 2.0 (the ~
+ "License"); you may not use this file except in compliance ~ with the License.
+ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, ~ software
+ distributed under the License is distributed on an ~ "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY ~ KIND, either express or implied. See the
+ License for the ~ specific language governing permissions and limitations
+ ~ under the License. -->
+
+<GFac>
+ <GlobalHandlers>
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.AppDescriptorCheckHandler">
+ <property name="name" value="value"/>
+ </Handler>
+ </InHandlers>
+ <OutHandlers></OutHandlers>
+ </GlobalHandlers>
+ <Provider class="org.apache.airavata.gfac.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl">
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.LocalDirectorySetupHandler"/>
+ </InHandlers>
+ </Provider>
+ <Provider class="org.apache.airavata.gfac.provider.impl.GramProvider" host="org.apache.airavata.schemas.gfac.impl.GlobusHostTypeImpl">
+ <property name="name" value="value"/>
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler">
+ <property name="name" value="value"/>
+ </Handler>
+ <Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.GridFTPOutputHandler"/>
+ </OutHandlers>
+ </Provider>
+ <Provider class="org.apache.airavata.gfac.provider.impl.BESProvider" host="org.apache.airavata.schemas.gfac.impl.UnicoreHostTypeImpl">
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.GridFTPOutputHandler"/>
+ </OutHandlers>
+ </Provider>
+
+ <Provider class="org.apache.airavata.gfac.ec2.EC2Provider" host="org.apache.airavata.schemas.gfac.impl.Ec2HostTypeImpl">
+ <InHandlers/>
+ <OutHandlers/>
+ </Provider>
+
+ <Provider class="org.apache.airavata.gfac.provider.impl.HadoopProvider" host="org.apache.airavata.schemas.gfac.impl.HadoopHostTypeImpl">
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.HadoopDeploymentHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.HDFSDataMovementHandler"/>
+ </InHandlers>
+ <OutHandlers/>
+ </Provider>
+
+ <Application name="UltraScan">
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.GridFTPOutputHandler"/>
+ </OutHandlers>
+ </Application>
+
+ <Provider class="org.apache.airavata.gfac.provider.impl.SSHProvider" host="org.apache.airavata.schemas.gfac.impl.SSHHostTypeImpl">
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+ </OutHandlers>
+ </Provider>
+ <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl">
+ <InHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
+ <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+ </InHandlers>
+ <OutHandlers>
+ <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+ </OutHandlers>
+ </Provider>
+</GFac>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gsissh.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gsissh.properties b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gsissh.properties
new file mode 100644
index 0000000..3fdf76d
--- /dev/null
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gsissh.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###########################################################################
+# Specifies system level configurations as a key/value pairs.
+###########################################################################
+
+StrictHostKeyChecking=no
+ssh.session.timeout=360000
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
new file mode 100644
index 0000000..3353e7b
--- /dev/null
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/monitor.properties
@@ -0,0 +1,10 @@
+primaryMonitor=org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
+secondaryMonitor=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor
+amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+connection.name=xsede_private
+trusted.certificate.location
+certificate.path=
+myproxy.server=myproxy.teragrid.org
+myproxy.user=ogce
+myproxy.password=
+myproxy.life=3600
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/airavata-orchestrator-service/src/main/resources/orchestrator.properties
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/orchestrator.properties b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/orchestrator.properties
new file mode 100644
index 0000000..54147e9
--- /dev/null
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/orchestrator.properties
@@ -0,0 +1,26 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter
+job.validator=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator
+submitter.interval=10000
+threadpool.size=10
+start.submitter=true
+embedded.mode=true
+enable.validation=false
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index cbc4f80..e5081d6 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -8,14 +8,15 @@ in writing, software distributed under the License is distributed on an "AS IS"
ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
- <groupId>org.apache.airavata</groupId>
- <artifactId>orchestrator</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>orchestrator</artifactId>
<version>0.12-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
+ <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>airavata-orchestrator-core</artifactId>
@@ -61,50 +62,60 @@ the License. -->
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>airavata-data-models</artifactId>
+ <artifactId>airavata-data-models</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-model-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-job-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <version>6.1.1</version>
- <scope>test</scope>
- </dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.1.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**/ssh/**</exclude>
- </excludes>
- <forkMode>always</forkMode>
- <failIfNoTests>false</failIfNoTests>
- </configuration>
- </plugin>
- </plugins>
- <testSourceDirectory>${project.basedir}/src/test/java</testSourceDirectory>
- <testOutputDirectory>${project.build.directory}/test-classes</testOutputDirectory>
- <testResources>
- <testResource>
- <directory>${project.basedir}/src/test/resources</directory>
- </testResource>
- </testResources>
- </build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/ssh/**</exclude>
+ </excludes>
+ <forkMode>always</forkMode>
+ <failIfNoTests>false</failIfNoTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ <testSourceDirectory>${project.basedir}/src/test/java</testSourceDirectory>
+ <testOutputDirectory>${project.build.directory}/test-classes</testOutputDirectory>
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ </testResource>
+ </testResources>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
deleted file mode 100644
index b3d2878..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/HangedJobWorker.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.core;
-
-import java.util.List;
-
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * this worker is handling hanged jobs and invoke the submitter
- * after finding hanged jobs
- */
-public class HangedJobWorker implements Runnable{
- private final static Logger logger = LoggerFactory.getLogger(HangedJobWorker.class);
-
- private OrchestratorContext orchestratorContext;
-
- private JobSubmitter jobSubmitter;
-
- // Set the default submit interval value
- private int submitInterval = 1000;
-
-
- public HangedJobWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
- this.orchestratorContext = orchestratorContext;
- try {
- String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getNewJobSubmitterClass();
- //FIXME: (MEP) Do you want to use the same submit interval for hung jobs as newly submitted jobs? Suggest separate parameters.
- submitInterval = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterInterval();
- //FIXME: (MEP) It is possible that you want to have a different JobSubmitter for hung jobs and for new jobs, so the property file needs to have separate name/value pairs for these.
- Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
- jobSubmitter = aClass.newInstance();
- jobSubmitter.initialize(this.orchestratorContext);
- } catch (ClassNotFoundException e) {
- logger.error("Error while loading Job Submitter");
- } catch (InstantiationException e) {
- logger.error("Error while loading Job Submitter");
- throw new OrchestratorException(e);
- } catch (IllegalAccessException e) {
- logger.error("Error while loading Job Submitter");
- throw new OrchestratorException(e);
- }
-
- }
-
- public void run() {
- /* implement logic to submit job batches time to time */
- int idleCount = 0;
- while (true) {
- try {
- Thread.sleep(submitInterval);
- } catch (InterruptedException e) {
- logger.error("Error in JobSubmitter during sleeping process before submit jobs");
- e.printStackTrace();
- }
- /* Here the worker pick bunch of jobs available to submit and submit that to a single
- GFAC instance, we do not handle job by job submission to each gfac instance
- */
- try {
-
- GFACInstance gfacInstance = jobSubmitter.selectGFACInstance();
-
- // Now we have picked a gfac instance to submit set of jobs at this time, now its time to
- // select what are the jobs available to submit
-
- List<String> allHangedJobs = orchestratorContext.getRegistry().getAllHangedJobs();
- //FIXME: (MEP) Suggest putting this in a separate method, and you'll need a method to also decrease the submitInterval if you are busy. This submitInterval adjustment seems to be too fined grained of a detail to worry about now.
- if (allHangedJobs.size() == 0) {
- idleCount++;
-
- if (idleCount == 10) {
- try {
- Thread.sleep(submitInterval*2);
- } catch (InterruptedException e) {
- logger.error("Error in JobSubmitter during sleeping process before submit jobs");
- e.printStackTrace();
- }
- idleCount=0;
- }
- continue;
- }
-
- jobSubmitter.submitJob(gfacInstance,allHangedJobs);
-
- /* After submitting available jobs try to schedule again and then submit*/
- jobSubmitter.submitJob(jobSubmitter.selectGFACInstance(),allHangedJobs);
- } catch (Exception e) {
- logger.error("Error while trying to retrieve available ");
- }
- }
- }
-
- public OrchestratorContext getOrchestratorContext() {
- return orchestratorContext;
- }
-
- public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
- this.orchestratorContext = orchestratorContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java
deleted file mode 100644
index 1b22702..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/NewJobWorker.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.core;
-
-import java.util.List;
-
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
-This is the worker class to handle the jobs stored in to registry as
-fresh and this will pick those jobs and invoke the defined submitter
- */
-public class NewJobWorker implements Runnable {
- private final static Logger logger = LoggerFactory.getLogger(NewJobWorker.class);
-
- private OrchestratorContext orchestratorContext;
-
- private JobSubmitter jobSubmitter;
-
- // Set the default submit interval value
- private int submitInterval = 1000;
-
-
- public NewJobWorker(OrchestratorContext orchestratorContext) throws OrchestratorException {
- this.orchestratorContext = orchestratorContext;
- try {
- String submitterClass = this.orchestratorContext.getOrchestratorConfiguration().getNewJobSubmitterClass();
- submitInterval = this.orchestratorContext.getOrchestratorConfiguration().getSubmitterInterval();
- Class<? extends JobSubmitter> aClass = Class.forName(submitterClass.trim()).asSubclass(JobSubmitter.class);
- jobSubmitter = aClass.newInstance();
- jobSubmitter.initialize(this.orchestratorContext);
- } catch (ClassNotFoundException e) {
- logger.error("Error while loading Job Submitter");
- } catch (InstantiationException e) {
- logger.error("Error while loading Job Submitter");
- throw new OrchestratorException(e);
- } catch (IllegalAccessException e) {
- logger.error("Error while loading Job Submitter");
- throw new OrchestratorException(e);
- }
-
- }
-
- public void run() {
- /* implement logic to submit job batches time to time */
- int idleCount = 0;
- while (true) {
- try {
- Thread.sleep(submitInterval);
- } catch (InterruptedException e) {
- logger.error("Error in JobSubmitter during sleeping process before submit jobs");
- e.printStackTrace();
- }
- /* Here the worker pick bunch of jobs available to submit and submit that to a single
- GFAC instance, we do not handle job by job submission to each gfac instance
-
- */
- try {
- GFACInstance gfacInstance = jobSubmitter.selectGFACInstance();
-
- List<String> allAcceptedJobs = orchestratorContext.getRegistry().getAllAcceptedJobs();
- if (allAcceptedJobs.size() == 0) {
- //FIXME: (MEP) this stuff should be in a separate method, and I'm not sure it is necessary. You have no way to decrease the submit interval if busy.
- idleCount++;
-
- if (idleCount == 10) {
- try {
- Thread.sleep(submitInterval * 2);
- } catch (InterruptedException e) {
- logger.error("Error in JobSubmitter during sleeping process before submit jobs");
- e.printStackTrace();
- }
- idleCount = 0;
- }
- continue;
- }
- jobSubmitter.submitJob(gfacInstance, allAcceptedJobs);
- } catch (Exception e) {
- logger.error("Error while trying to retrieve available ");
- }
- // Now we have picked a gfac instance to submit set of jobs at this time, now its time to
- // select what are the jobs available to submit
-
- }
- }
-
- public OrchestratorContext getOrchestratorContext() {
- return orchestratorContext;
- }
-
- public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
- this.orchestratorContext = orchestratorContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index 27109e9..783f2f8 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -24,6 +24,7 @@ package org.apache.airavata.orchestrator.core.impl;
import java.util.*;
import org.apache.airavata.common.utils.AiravataJobState;
+import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.cpi.GFac;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
@@ -61,44 +62,39 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
}
- public boolean submitJob(GFACInstance gfac, List<String> experimentIDList) throws OrchestratorException {
-
- for (int i = 0; i < experimentIDList.size(); i++) {
- try {
- // once its fetched it's status will changed to fetched state
- launchGfacWithExperimentID(experimentIDList.get(i));
- } catch (Exception e) {
- logger.error("Error getting job related information");
- throw new OrchestratorException(e);
- }
+ public String submit(String experimentID, String taskID) throws OrchestratorException {
+ JobExecutionContext jobExecutionContext;
+ try {
+ jobExecutionContext = gfac.submitJob(experimentID, taskID);
+ } catch (Exception e) {
+ String error = "Error launching the job : " + experimentID;
+ logger.error(error);
+ throw new OrchestratorException(error);
}
- return true;
+ return jobExecutionContext.getJobDetails().getJobID();
}
- //FIXME: (MEP) This method is pretty gruesome. If we really expect multiple implementations of the JobSubmitter
- // interface and at least some of them will need to do the stuff in this method, then we need a parent class
- // GenericJobSubmitterImpl.java (maybe abstract) that includes launchGfacWithExperimentID() so that subclasses can inherit it.
- private void launchGfacWithExperimentID(String experimentID) throws OrchestratorException {
- Registry newRegistry = orchestratorContext.getNewRegistry();
- try {
- //todo init this during submitter init
- JobExecutionContext jobExecutionContext = gfac.submitJob(experimentID,null);
- orchestratorContext.getRegistry().changeStatus(experimentID, AiravataJobState.State.SUBMITTED);
- } catch (Exception e)
- {
- throw new OrchestratorException("Error launching the Job", e);
- }
+ public GFac getGfac() {
+ return gfac;
+ }
+ public void setGfac(GFac gfac) {
+ this.gfac = gfac;
}
- public boolean directJobSubmit(String experimentID) throws OrchestratorException {
+ public OrchestratorContext getOrchestratorContext() {
+ return orchestratorContext;
+ }
+
+ public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
+ this.orchestratorContext = orchestratorContext;
+ }
+
+ public void runAfterJobTask(String experimentID, String taskID) throws OrchestratorException {
try {
- launchGfacWithExperimentID(experimentID);
- } catch (Exception e) {
- String error = "Error launching the job : " + experimentID;
- logger.error(error);
- throw new OrchestratorException(error);
+ gfac.invokeOutFlowHandlers(experimentID,taskID);
+ } catch (GFacException e) {
+ throw new OrchestratorException(e);
}
- return true;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index 6405c4b..6790643 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -44,17 +44,21 @@ public interface JobSubmitter {
*/
GFACInstance selectGFACInstance() throws OrchestratorException;
- /**
- * @param gfac
- * @return
- */
- boolean submitJob(GFACInstance gfac,List<String> experimentIDList) throws OrchestratorException;
/**
* This can be used when user doesn't want to run in a threaded pull mode
* just get the request data and do the submission
+ * @param experimentID experimentID cannot be null
+ * @param taskID taskID cannot be null
+ * @return jobID return the jobID from GFac
+ */
+ String submit(String experimentID, String taskID) throws OrchestratorException;
+
+ /**
+ * This can be use to handle any after Jobsubmission task
* @param experimentID
- * @return
+ * @param taskID
+ * @throws OrchestratorException
*/
- boolean directJobSubmit(String experimentID) throws OrchestratorException;
+ void runAfterJobTask(String experimentID,String taskID) throws OrchestratorException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 7ff07e0..b5166fe 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -20,11 +20,18 @@
*/
package org.apache.airavata.orchestrator.core.utils;
+import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.orchestrator.core.NewJobWorker;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.schemas.gfac.Parameter;
+import org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter;
+import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.orchestrator.cpi.Orchestrator;
+import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.api.exception.RegistryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +46,8 @@ public class OrchestratorUtils {
private final static Logger logger = LoggerFactory.getLogger(OrchestratorUtils.class);
public static OrchestratorConfiguration loadOrchestratorConfiguration() throws OrchestratorException, IOException {
- //FIXME: (MEP) why are you using the NewJobWorker class to get the properties file here?
URL resource =
- NewJobWorker.class.getClassLoader().getResource(OrchestratorConstants.ORCHESTRATOR_PROPERTIES);
+ OrchestratorUtils.class.getClassLoader().getResource(OrchestratorConstants.ORCHESTRATOR_PROPERTIES);
if (resource == null) {
String error = "orchestrator.properties cannot be found, Failed to initialize Orchestrator";
logger.error(error);
@@ -56,10 +62,22 @@ public class OrchestratorUtils {
orchestratorConfiguration.setStartSubmitter(Boolean.valueOf(orchestratorProps.getProperty(OrchestratorConstants.START_SUBMITTER)));
orchestratorConfiguration.setEmbeddedMode(Boolean.valueOf(orchestratorProps.getProperty(OrchestratorConstants.EMBEDDED_MODE)));
orchestratorConfiguration.setEnableValidation(Boolean.valueOf(orchestratorProps.getProperty(OrchestratorConstants.ENABLE_VALIDATION)));
- if(orchestratorConfiguration.isEnableValidation()){
+ if (orchestratorConfiguration.isEnableValidation()) {
orchestratorConfiguration.setValidatorClass((String) orchestratorProps.get(OrchestratorConstants.JOB_VALIDATOR));
}
return orchestratorConfiguration;
}
+ public static HostDescription getHostDescription(Orchestrator orchestrator, TaskDetails taskDetails)throws OrchestratorException {
+ JobSubmitter jobSubmitter = ((SimpleOrchestratorImpl) orchestrator).getJobSubmitter();
+ AiravataRegistry2 registry = ((EmbeddedGFACJobSubmitter) jobSubmitter).getOrchestratorContext().getRegistry();
+ ComputationalResourceScheduling taskScheduling = taskDetails.getTaskScheduling();
+ String resourceHostId = taskScheduling.getResourceHostId();
+ try {
+ return registry.getHostDescriptor(resourceHostId);
+ } catch (RegistryException e) {
+ throw new OrchestratorException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/SimpleAppDataValidator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/SimpleAppDataValidator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/SimpleAppDataValidator.java
index 0d16062..0f83e25 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/SimpleAppDataValidator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/SimpleAppDataValidator.java
@@ -20,13 +20,22 @@
*/
package org.apache.airavata.orchestrator.core.validator.impl;
+import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleAppDataValidator extends AbstractJobMetadataValidator {
private final static Logger logger = LoggerFactory.getLogger(SimpleAppDataValidator.class);
+ private Registry registry;
+
+ public SimpleAppDataValidator() {
+ this.registry = RegistryFactory.getDefaultRegistry();
+ }
+
public boolean runAppSpecificValidation(String experimentID) throws OrchestratorException{
// implement simple application specific validator to be used for
// all the applications.
@@ -37,6 +46,19 @@ public class SimpleAppDataValidator extends AbstractJobMetadataValidator {
boolean result = false;
if (super.runBasicValidation(experimentID)) {
+ Experiment experiment = null;
+ try {
+ experiment = (Experiment) registry.get(org.apache.airavata.registry.cpi.DataType.EXPERIMENT, experimentID);
+ } catch (Exception e) {
+ throw new OrchestratorException(e);
+ }
+ if (experiment.getUserConfigurationData().isAiravataAutoSchedule()) {
+ logger.error("We dont' support auto scheduling at this point, We will simply use user data as it is");
+ }
+
+ /* todo like this do more validation and if they are suppose to fail return false otherwise give some
+ log messages in server side logs
+ */
if (runAppSpecificValidation(experimentID)) {
return true;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index 5adde42..21b4866 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -20,8 +20,11 @@
*/
package org.apache.airavata.orchestrator.cpi;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import java.util.List;
+
/*
This is the interface for orchestrator functionality exposed to the out side of the
module
@@ -34,10 +37,21 @@ public interface Orchestrator {
* We just have to give the experimentID
*
* @param experimentID
+ * @return jobID
+ * @throws OrchestratorException
+ */
+ String launchExperiment(String experimentID, String taskID) throws OrchestratorException;
+
+
+ /**
+ * This method will parse the ExperimentConfiguration and based on the configuration
+ * we create a single or multiple tasks for the experiment.
+ * @param experimentId
* @return
* @throws OrchestratorException
*/
- boolean launchExperiment(String experimentID) throws OrchestratorException;
+ public List<TaskDetails> createTasks(String experimentId) throws OrchestratorException;
+
//todo have to add another method to handle failed or jobs to be recovered by orchestrator
//todo if you don't add these this is not an orchestrator, its just an intemediate component which invoke gfac
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 25b0a04..f8887d8 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -20,15 +20,24 @@
*/
package org.apache.airavata.orchestrator.cpi.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.airavata.common.utils.AiravataJobState;
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.util.ExperimentModelUtil;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
-import org.apache.airavata.registry.api.exception.RegistryException;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,26 +79,77 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator {
}
}
- public boolean launchExperiment(String experimentID) throws OrchestratorException {
+ public String launchExperiment(String experimentID, String taskID) throws OrchestratorException {
// we give higher priority to userExperimentID
//todo support multiple validators
+ String jobID = null;
if (this.orchestratorConfiguration.isEnableValidation()) {
- if(jobMetadataValidator.validate(experimentID)){
- logger.info("validation Successful for the experiment: " + experimentID);
- }else {
+ if (jobMetadataValidator.validate(experimentID)) {
+ logger.info("validation Successful for the experiment: " + experimentID);
+ } else {
throw new OrchestratorException("Validation Failed, so Job will not be submitted to GFAC");
}
}
+ if (orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() == 0) {
+ jobID = jobSubmitter.submit(experimentID, taskID);
+ }
+ return jobID;
+ }
+
+ /**
+ * This method will parse the ExperimentConfiguration and based on the configuration
+ * we create a single or multiple tasks for the experiment.
+ *
+ * @param experimentId
+ * @return
+ * @throws OrchestratorException
+ */
+ public List<TaskDetails> createTasks(String experimentId) throws OrchestratorException {
+ Experiment experiment = null;
+ List<TaskDetails> tasks = new ArrayList<TaskDetails>();
try {
- airavataRegistry.changeStatus(experimentID, AiravataJobState.State.ACCEPTED);
- if (orchestratorContext.getOrchestratorConfiguration().getThreadPoolSize() == 0) {
- jobSubmitter.directJobSubmit(experimentID);
- }
- } catch (RegistryException e) {
- //todo put more meaningful error message
- logger.error("Failed to create experiment for the request from " + experimentID);
- return false;
+ Registry newRegistry = orchestratorContext.getNewRegistry();
+ experiment = (Experiment) newRegistry.get(DataType.EXPERIMENT, experimentId);
+
+
+ WorkflowNodeDetails iDontNeedaNode = ExperimentModelUtil.createWorkflowNode("IDontNeedaNode", null);
+ String nodeID = (String) newRegistry.add(ChildDataType.WORKFLOW_NODE_DETAIL, iDontNeedaNode, experimentId);
+
+ TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromExperiment(experiment);
+ taskDetails.setTaskID((String) newRegistry.add(ChildDataType.TASK_DETAIL, taskDetails, nodeID));
+ tasks.add(taskDetails);
+ } catch (Exception e) {
+ throw new OrchestratorException("Error during creating a task");
}
- return true;
+ return tasks;
+ }
+
+ @Subscribe
+ public void handlePostExperimentTask(JobStatus status) throws OrchestratorException {
+ MonitorID monitorID = status.getMonitorID();
+ jobSubmitter.runAfterJobTask(monitorID.getExperimentID(),monitorID.getTaskID());
+ }
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ public JobMetadataValidator getJobMetadataValidator() {
+ return jobMetadataValidator;
+ }
+
+ public void setJobMetadataValidator(JobMetadataValidator jobMetadataValidator) {
+ this.jobMetadataValidator = jobMetadataValidator;
+ }
+
+ public JobSubmitter getJobSubmitter() {
+ return jobSubmitter;
+ }
+
+ public void setJobSubmitter(JobSubmitter jobSubmitter) {
+ this.jobSubmitter = jobSubmitter;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/fa2601a7/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
index 16724e7..564eb2b 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
@@ -627,10 +627,10 @@ public class EmbeddedGFacInvoker implements Invoker {
XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(XMLUtil.xmlElementToString((XmlElement) value)));
StAXOMBuilder builder = new StAXOMBuilder(reader);
OMElement input = builder.getDocumentElement();
- actualParameters.put(parameter.getParameterName(), GFacUtils.getInputActualParameter(parameter, input));
+// actualParameters.put(parameter.getParameterName(), GFacUtils.getInputActualParameter(parameter, input));
} else if (value instanceof String) {
omElement.setText((String) value);
- actualParameters.put(parameter.getParameterName(), GFacUtils.getInputActualParameter(parameter, AXIOMUtil.stringToOM("<value>" + value + "</value>")));
+// actualParameters.put(parameter.getParameterName(), GFacUtils.getInputActualParameter(parameter, AXIOMUtil.stringToOM("<value>" + value + "</value>")));
}
invoke_inputParams.addChild(omElement);
}
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by la...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c9b9575a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c9b9575a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c9b9575a
Branch: refs/heads/master
Commit: c9b9575a077a1b41f24475c919273e923f00e963
Parents: fa2601a 2d2be14
Author: lahiru <la...@apache.org>
Authored: Thu Feb 27 17:09:20 2014 -0500
Committer: lahiru <la...@apache.org>
Committed: Thu Feb 27 17:09:20 2014 -0500
----------------------------------------------------------------------
airavata-api/airavata-api-server/pom.xml | 7 +-
airavata-api/airavata-api-stubs/pom.xml | 1 -
.../api/client/AiravataClientFactory.java | 2 +-
.../experiment/AdvancedInputDataHandling.java | 706 -----
.../experiment/AdvancedOutputDataHandling.java | 609 -----
.../model/experiment/BasicMetadata.java | 914 -------
.../ComputationalResourceScheduling.java | 1375 ----------
.../model/experiment/ConfigurationData.java | 1402 ----------
.../experiment/QualityOfServiceParams.java | 607 -----
.../airavata/model/workspace/Gateway.java | 496 ----
.../apache/airavata/model/workspace/Group.java | 503 ----
.../airavata/model/workspace/Project.java | 1099 --------
.../apache/airavata/model/workspace/User.java | 557 ----
.../workspace/experiment/ActionableGroup.java | 70 -
.../experiment/AdvancedInputDataHandling.java | 706 -----
.../experiment/AdvancedOutputDataHandling.java | 609 -----
.../workspace/experiment/ApplicationStatus.java | 500 ----
.../ComputationalResourceScheduling.java | 1184 ---------
.../workspace/experiment/CorrectiveAction.java | 64 -
.../workspace/experiment/DataObjectType.java | 711 ------
.../experiment/DataTransferDetails.java | 706 -----
.../workspace/experiment/ErrorCategory.java | 79 -
.../workspace/experiment/ErrorDetails.java | 1309 ----------
.../model/workspace/experiment/Experiment.java | 2416 ------------------
.../workspace/experiment/ExperimentState.java | 82 -
.../workspace/experiment/ExperimentStatus.java | 516 ----
.../model/workspace/experiment/JobDetails.java | 1071 --------
.../model/workspace/experiment/JobState.java | 82 -
.../model/workspace/experiment/JobStatus.java | 516 ----
.../experiment/QualityOfServiceParams.java | 607 -----
.../model/workspace/experiment/TaskDetails.java | 1921 --------------
.../model/workspace/experiment/TaskState.java | 91 -
.../model/workspace/experiment/TaskStatus.java | 516 ----
.../workspace/experiment/TransferState.java | 82 -
.../workspace/experiment/TransferStatus.java | 516 ----
.../experiment/UserConfigurationData.java | 1025 --------
.../experiment/WorkflowNodeDetails.java | 1337 ----------
.../workspace/experiment/WorkflowNodeState.java | 73 -
.../experiment/WorkflowNodeStatus.java | 516 ----
.../experiment/experimentModelConstants.java | 59 -
.../java-client-samples/pom.xml | 10 +
.../client/samples/CreateLaunchExperiment.java | 32 +-
airavata-api/generate-thrift-files.sh | 3 +-
airavata-api/pom.xml | 2 +-
modules/gfac/gfac-core/pom.xml | 5 -
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 1 -
.../gfac/provider/utils/DataTransferrer.java | 1 -
.../apache/airavata/gfac/utils/GFacUtils.java | 178 ++
.../airavata/gfac/utils/GramRSLGenerator.java | 1 -
.../orchestrator/core/NewOrchestratorTest.java | 53 +-
.../core/OrchestratorTestWithGRAM.java | 51 +-
.../core/OrchestratorTestWithGSISSH.java | 51 +-
.../persistance/registry/jpa/ResourceUtils.java | 7 +-
53 files changed, 307 insertions(+), 25730 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/c9b9575a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------