You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/22 16:44:56 UTC
[2/2] git commit: AIRAVATA-433 + fix client sample issue
AIRAVATA-433 + fix client sample issue
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/aed31d3e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/aed31d3e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/aed31d3e
Branch: refs/heads/master
Commit: aed31d3eb08c410611ef4a42205bae89a6de923d
Parents: 6791031
Author: Saminda Wijeratne <sa...@gmail.com>
Authored: Tue Apr 22 07:44:30 2014 -0700
Committer: Saminda Wijeratne <sa...@gmail.com>
Committed: Tue Apr 22 07:44:30 2014 -0700
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 17 ++--
modules/gfac/gfac-core/pom.xml | 7 +-
.../org/apache/airavata/gfac/Scheduler.java | 2 -
.../org/apache/airavata/gfac/cpi/GFacImpl.java | 66 ++++++++++++---
.../airavata/gfac/provider/GFacProvider.java | 12 ++-
.../gfac/provider/impl/AbstractProvider.java | 51 +++++++++++-
.../gfac/provider/impl/BESProvider.java | 2 +-
.../gfac/provider/impl/GSISSHProvider.java | 17 +---
.../gfac/provider/impl/GramProvider.java | 3 +-
.../gfac/provider/impl/HadoopProvider.java | 4 +-
.../gfac/provider/impl/LocalProvider.java | 9 +-
.../gfac/provider/impl/SSHProvider.java | 3 +-
modules/gfac/gfac-ec2/pom.xml | 6 +-
.../apache/airavata/gfac/ec2/EC2Provider.java | 54 ++++++------
.../AiravataExperimentStatusUpdator.java | 4 +-
.../job/monitor/AiravataJobStatusUpdator.java | 14 ++--
.../job/monitor/AiravataTaskStatusUpdator.java | 12 +--
.../AiravataWorkflowNodeStatusUpdator.java | 86 ++++++++++++++++++++
.../airavata/job/monitor/MonitorManager.java | 22 +++--
.../command/ExperimentCancelRequest.java | 38 +++++++++
.../job/monitor/command/TaskCancelRequest.java | 52 ++++++++++++
.../QstatMonitorTestWithMyProxyAuth.java | 4 +-
.../server/OrchestratorServerHandler.java | 8 +-
.../core/context/OrchestratorContext.java | 18 +++-
.../core/impl/EmbeddedGFACJobSubmitter.java | 9 +-
.../airavata/orchestrator/cpi/Orchestrator.java | 10 ++-
.../cpi/impl/AbstractOrchestrator.java | 2 -
.../cpi/impl/SimpleOrchestratorImpl.java | 37 +++++++--
.../airavata/api/samples/ExperimentSample.java | 5 +-
.../apache/airavata/job/monitor/MonitorID.java | 17 +++-
.../job/monitor/core/MessageParser.java | 2 +-
.../job/monitor/event/MonitorPublisher.java | 27 ++----
.../job/monitor/impl/LocalJobMonitor.java | 4 +-
.../monitor/impl/pull/qstat/QstatMonitor.java | 4 +-
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 4 +-
.../monitor/impl/push/amqp/BasicConsumer.java | 2 +-
.../impl/push/amqp/JSONMessageParser.java | 2 +-
.../impl/push/amqp/UnRegisterWorker.java | 4 +-
.../state/AbstractStateChangeRequest.java | 37 +++++++++
.../job/monitor/state/ExperimentStatus.java | 65 ---------------
.../state/ExperimentStatusChangeRequest.java | 55 +++++++++++++
.../airavata/job/monitor/state/JobStatus.java | 67 ---------------
.../monitor/state/JobStatusChangeRequest.java | 56 +++++++++++++
.../job/monitor/state/PublisherMessage.java | 26 ++++++
.../airavata/job/monitor/state/TaskStatus.java | 65 ---------------
.../monitor/state/TaskStatusChangeRequest.java | 53 ++++++++++++
.../apache/airavata/job/AMQPMonitorTest.java | 4 +-
.../job/QstatMonitorTestWithMyProxyAuth.java | 6 +-
48 files changed, 713 insertions(+), 361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index bbbf123..1681a35 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -27,6 +27,7 @@ import org.apache.airavata.api.error.*;
import org.apache.airavata.model.workspace.Project;
import org.apache.airavata.orchestrator.client.OrchestratorClientFactory;
import org.apache.airavata.orchestrator.cpi.OrchestratorService;
+import org.apache.airavata.orchestrator.cpi.OrchestratorService.Client;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.cpi.*;
@@ -43,12 +44,12 @@ import java.util.Map;
public class AiravataServerHandler implements Airavata.Iface {
private Registry registry;
+ private OrchestratorService.Client orchestratorClient;
private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
//FIXME: these go in a configuration file or a "constants" class.
public static final String ORCHESTRATOR_SERVER_HOST = "localhost";
//FIXME: these go in a configuration file or a "constants" class.
public static final int ORCHESTRATOR_SERVER_PORT = 8940;
- private OrchestratorService.Client orchestratorClient;
/**
* Query Airavata to fetch the API version
*/
@@ -409,9 +410,7 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
public void launchExperiment(String airavataExperimentId, String airavataCredStoreToken) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, TException {
- if(orchestratorClient == null){
- orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(ORCHESTRATOR_SERVER_HOST, ORCHESTRATOR_SERVER_PORT);
- }
+ final OrchestratorService.Client orchestratorClient = getOrchestratorClient();
final String expID = airavataExperimentId;
(new Thread(){
public void run(){
@@ -424,6 +423,13 @@ public class AiravataServerHandler implements Airavata.Iface {
}).start();
}
+ private OrchestratorService.Client getOrchestratorClient() {
+ if(orchestratorClient == null){
+ orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(ORCHESTRATOR_SERVER_HOST, ORCHESTRATOR_SERVER_PORT);
+ }
+ return orchestratorClient;
+ }
+
/**
* Clone an specified experiment with a new name. A copy of the experiment configuration is made and is persisted with new metadata.
* The client has to subsequently update this configuration if needed and launch the cloned experiment.
@@ -483,7 +489,8 @@ public class AiravataServerHandler implements Airavata.Iface {
*/
@Override
public void terminateExperiment(String airavataExperimentId) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, TException {
-
+ Client client = getOrchestratorClient();
+ client.terminateExperiment(airavataExperimentId);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 68b2120..e755c66 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -94,8 +94,11 @@
<artifactId>airavata-workflow-tracking</artifactId>
<version>${project.version}</version>
</dependency>
-
-
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>gfac-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Credential Store -->
<dependency>
<groupId>org.apache.airavata</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
index 7f0deb6..1b30240 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
@@ -27,7 +27,6 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -36,7 +35,6 @@ import javax.xml.xpath.XPathExpressionException;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.context.JobExecutionContext;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 6c80009..d6a61a3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -20,16 +20,24 @@
*/
package org.apache.airavata.gfac.cpi;
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.commons.gfac.type.ServiceDescription;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.credential.store.store.CredentialReaderFactory;
-import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
-import org.apache.airavata.gfac.*;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.Scheduler;
import org.apache.airavata.gfac.context.ApplicationContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
@@ -56,29 +64,36 @@ import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthentica
import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.AbstractActivityListener;
+import org.apache.airavata.job.monitor.MonitorManager;
+import org.apache.airavata.job.monitor.command.ExperimentCancelRequest;
+import org.apache.airavata.job.monitor.command.TaskCancelRequest;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.persistance.registry.jpa.resources.AbstractResource.TaskDetailConstants;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.schemas.gfac.*;
-import org.apache.airavata.schemas.wec.ContextHeaderDocument;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
+import org.apache.airavata.schemas.gfac.Ec2HostType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
import org.apache.airavata.schemas.wec.SecurityContextDocument;
-import org.apache.airavata.workflow.model.exceptions.WorkflowException;
-import org.apache.xmlbeans.XmlObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.net.URL;
-import java.util.*;
+import com.google.common.eventbus.Subscribe;
/**
* This is the GFac CPI class for external usage, this simply have a single method to submit a job to
* the resource, required data for the job has to be stored in registry prior to invoke this object.
*/
-public class GFacImpl implements GFac {
+public class GFacImpl implements GFac, AbstractActivityListener {
private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
public static final String ERROR_SENT = "ErrorSent";
public static final String PBS_JOB_MANAGER = "pbs";
@@ -90,6 +105,8 @@ public class GFacImpl implements GFac {
private AiravataAPI airavataAPI;
private AiravataRegistry2 airavataRegistry2;
+
+ private MonitorManager monitorManager;
/**
* Constructor for GFac
@@ -468,4 +485,29 @@ public class GFacImpl implements GFac {
}
}
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof MonitorManager){
+ monitorManager=(MonitorManager) configuration;
+ }
+ }
+
+ }
+
+ @Subscribe
+ public void experimentCancelRequested(ExperimentCancelRequest request){
+ try {
+ List<String> nodeIds = registry.getIds(DataType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, request.getExperimentId());
+ for (String nodeId : nodeIds) {
+ List<String> taskIds = registry.getIds(DataType.TASK_DETAIL, TaskDetailConstants.NODE_INSTANCE_ID, nodeId);
+ for (String taskId : taskIds) {
+ monitorManager.getMonitorPublisher().publish(new TaskCancelRequest(request.getExperimentId(),nodeId, taskId));
+ }
+ }
+ } catch (RegistryException e) {
+ log.error("Error while attempting to publish task cancel requests!!!",e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
index a6d5182..5c33ec1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
@@ -21,13 +21,16 @@
package org.apache.airavata.gfac.provider;
+import java.util.Map;
+
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.schemas.gfac.HadoopHostType;
+import org.apache.airavata.job.monitor.AbstractActivityListener;
+import org.apache.airavata.job.monitor.command.TaskCancelRequest;
-import java.util.Map;
+import com.google.common.eventbus.Subscribe;
-public interface GFacProvider {
+public interface GFacProvider extends AbstractActivityListener{
void initProperties(Map<String,String> properties) throws GFacProviderException,GFacException;
/**
@@ -60,5 +63,6 @@ public interface GFacProvider {
*/
public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
-
+ @Subscribe
+ public void taskCancelRequested(TaskCancelRequest request);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
index dbbcb62..3ba02b9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
@@ -21,22 +21,71 @@
package org.apache.airavata.gfac.provider.impl;
+import java.util.List;
+
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.command.TaskCancelRequest;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.model.workspace.experiment.JobStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.JobDetaisConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractProvider implements GFacProvider{
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
-public abstract class AbstractProvider{
protected Registry registry = null;
protected JobDetails details; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
protected JobStatus status; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
+ protected JobExecutionContext jobExecutionContext;
+
+ private MonitorPublisher monitorPublisher;
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
registry = RegistryFactory.getDefaultRegistry();
details = new JobDetails();
status = new JobStatus();
+ this.jobExecutionContext=jobExecutionContext;
+ }
+
+ @Override
+ public void taskCancelRequested(TaskCancelRequest request) {
+ try {
+ List<Object> jobDetails = registry.get(DataType.JOB_DETAIL, JobDetaisConstants.TASK_ID, request.getTaskId());
+ for (Object o : jobDetails) {
+ JobDetails jd=(JobDetails)o;
+ JobState jobState = jd.getJobStatus().getJobState();
+ if (jobState!=JobState.CANCELED || jobState!=JobState.CANCELING || jobState!=JobState.COMPLETE || jobState!=JobState.FAILED){
+ MonitorID monitorId = new MonitorID(null, jd.getJobID(), request.getTaskId(), request.getExperimentId(), null, null);
+ monitorPublisher.publish(new JobStatusChangeRequest(monitorId, JobState.CANCELING));
+ log.debug("Canceling job "+jd.getJobID());
+ cancelJob(jd.getJobID(), jobExecutionContext);
+ }
+ }
+ } catch (RegistryException e) {
+ log.error("Error retrieving job details for Task "+request.getTaskId(),e);
+ } catch (Exception e) {
+ log.error("Error canceling jobs!!!",e);
+ }
+ }
+
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
index 03dbf42..654c9ec 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
@@ -92,7 +92,7 @@ import eu.unicore.util.httpclient.DefaultClientConfiguration;
-public class BESProvider extends AbstractProvider implements GFacProvider{
+public class BESProvider extends AbstractProvider{
protected final Logger log = LoggerFactory.getLogger(this.getClass());
private DefaultClientConfiguration secProperties;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
index 8eaf5d2..69ad519 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
@@ -20,43 +20,28 @@
*/
package org.apache.airavata.gfac.provider.impl;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
import org.apache.airavata.model.workspace.experiment.ErrorCategory;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.FileArrayType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.apache.airavata.schemas.gfac.StringArrayType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GSISSHProvider extends AbstractProvider implements GFacProvider{
+public class GSISSHProvider extends AbstractProvider{
private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
index 9654b7d..4066c00 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.security.GSISecurityContext;
import org.apache.airavata.gfac.notification.events.JobIDEvent;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gfac.utils.GramJobSubmissionListener;
@@ -57,7 +56,7 @@ import org.ietf.jgss.GSSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GramProvider extends AbstractProvider implements GFacProvider{
+public class GramProvider extends AbstractProvider{
private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
private GramJob job;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
index a5762a1..c20e2ea 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
@@ -31,7 +31,6 @@ import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.provider.utils.HadoopUtils;
import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
@@ -49,13 +48,14 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* Executes hadoop job using the cluster configuration provided by handlers in
* in-flow.
*/
-public class HadoopProvider extends AbstractProvider implements GFacProvider{
+public class HadoopProvider extends AbstractProvider{
private static final Logger logger = LoggerFactory.getLogger(HadoopProvider.class);
private boolean isWhirrBasedDeployment = false;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
index e446614..a12bf5d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
@@ -20,21 +20,16 @@
*/
package org.apache.airavata.gfac.provider.impl;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.List;
import java.util.Map;
-import javax.xml.bind.JAXB;
-
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.provider.utils.ProviderUtils;
import org.apache.airavata.gfac.utils.GFacUtils;
@@ -44,8 +39,6 @@ import org.apache.airavata.gfac.utils.OutputUtils;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.NameValuePairType;
import org.apache.xmlbeans.XmlException;
@@ -54,7 +47,7 @@ import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-public class LocalProvider extends AbstractProvider implements GFacProvider{
+public class LocalProvider extends AbstractProvider{
private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
private ProcessBuilder builder;
private List<String> cmdList;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index 5914dd3..12e2ad1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@ -39,7 +39,6 @@ import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -61,7 +60,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* Execute application using remote SSH
*/
-public class SSHProvider extends AbstractProvider implements GFacProvider{
+public class SSHProvider extends AbstractProvider{
private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
private Cluster cluster;
private String jobID = null;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-ec2/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/pom.xml b/modules/gfac/gfac-ec2/pom.xml
index 7d696e6..df2d035 100644
--- a/modules/gfac/gfac-ec2/pom.xml
+++ b/modules/gfac/gfac-ec2/pom.xml
@@ -52,7 +52,11 @@
<artifactId>airavata-workflow-execution-context</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>gfac-monitor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Workflow Tracking -->
<dependency>
<groupId>org.apache.airavata</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
index 30c8c89..e773fe6 100644
--- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
+++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
@@ -21,22 +21,12 @@
package org.apache.airavata.gfac.ec2;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.ec2.AmazonEC2Client;
-import com.amazonaws.services.ec2.model.*;
-import com.sshtools.j2ssh.SshClient;
-import com.sshtools.j2ssh.authentication.AuthenticationProtocolState;
-import com.sshtools.j2ssh.authentication.PublicKeyAuthenticationClient;
-import com.sshtools.j2ssh.configuration.SshConnectionProperties;
-import com.sshtools.j2ssh.session.SessionChannelClient;
-import com.sshtools.j2ssh.transport.HostKeyVerification;
-import com.sshtools.j2ssh.transport.TransportProtocolException;
-import com.sshtools.j2ssh.transport.publickey.InvalidSshKeyException;
-import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
-import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
-import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
@@ -46,14 +36,11 @@ import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.ec2.util.AmazonEC2Util;
import org.apache.airavata.gfac.ec2.util.EC2ProviderUtil;
-import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.provider.impl.AbstractProvider;
import org.apache.airavata.gfac.provider.utils.ProviderUtils;
import org.apache.airavata.gfac.utils.GFacUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType;
import org.apache.airavata.schemas.gfac.OutputParameterType;
@@ -63,14 +50,29 @@ import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.AuthorizeSecurityGroupIngressRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesResult;
+import com.amazonaws.services.ec2.model.GroupIdentifier;
+import com.amazonaws.services.ec2.model.Instance;
+import com.amazonaws.services.ec2.model.IpPermission;
+import com.sshtools.j2ssh.SshClient;
+import com.sshtools.j2ssh.authentication.AuthenticationProtocolState;
+import com.sshtools.j2ssh.authentication.PublicKeyAuthenticationClient;
+import com.sshtools.j2ssh.configuration.SshConnectionProperties;
+import com.sshtools.j2ssh.session.SessionChannelClient;
+import com.sshtools.j2ssh.transport.HostKeyVerification;
+import com.sshtools.j2ssh.transport.TransportProtocolException;
+import com.sshtools.j2ssh.transport.publickey.InvalidSshKeyException;
+import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
+import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
+import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
-public class EC2Provider extends AbstractProvider implements GFacProvider {
+public class EC2Provider extends AbstractProvider {
private static final Logger log = LoggerFactory.getLogger(EC2Provider.class);
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
index e4d905a..5455f1b 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
@@ -22,7 +22,7 @@ package org.apache.airavata.job.monitor;
import java.util.Calendar;
-import org.apache.airavata.job.monitor.state.ExperimentStatus;
+import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.registry.cpi.DataType;
@@ -46,7 +46,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}
@Subscribe
- public void updateRegistry(ExperimentStatus experimentStatus) {
+ public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
ExperimentState state = experimentStatus.getState();
if (state != null) {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index ec03d71..74fdf43 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -24,8 +24,8 @@ import java.util.Calendar;
import java.util.concurrent.BlockingQueue;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.job.monitor.state.TaskStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.model.workspace.experiment.TaskState;
@@ -63,7 +63,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
}
@Subscribe
- public void updateRegistry(JobStatus jobStatus) {
+ public void updateRegistry(JobStatusChangeRequest jobStatus) {
/* Here we need to parse the jobStatus message and update
the registry accordingly, for now we are just printing to standard Out
*/
@@ -110,6 +110,8 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
jobsToMonitor.remove(jobStatus.getMonitorID());
break;
+ case CANCELING:
+ logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CENCELING");
default:
break;
}
@@ -117,7 +119,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
}
@Subscribe
- public void setupTaskStatus(JobStatus jobStatus){
+ public void setupTaskStatus(JobStatusChangeRequest jobStatus){
TaskState state=TaskState.UNKNOWN;
switch(jobStatus.getState()){
case ACTIVE:
@@ -136,11 +138,13 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
state=TaskState.STARTED; break;
case UN_SUBMITTED:
state=TaskState.CANCELED; break;
+ case CANCELING:
+ state=TaskState.CANCELING; break;
default:
break;
}
logger.debug("Publishing Task Status "+state.toString());
- monitorPublisher.publish(new TaskStatus(jobStatus.getMonitorID(),state));
+ monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getMonitorID(),state));
}
public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
index 8ba94d6..40b095a 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
@@ -23,8 +23,8 @@ package org.apache.airavata.job.monitor;
import java.util.Calendar;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.state.ExperimentStatus;
-import org.apache.airavata.job.monitor.state.TaskStatus;
+import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.TaskState;
@@ -51,7 +51,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
}
@Subscribe
- public void updateRegistry(TaskStatus taskStatus) {
+ public void updateRegistry(TaskStatusChangeRequest taskStatus) {
TaskState state = taskStatus.getState();
if (state != null) {
try {
@@ -64,7 +64,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
}
@Subscribe
- public void setupExperimentStatus(TaskStatus taskStatus){
+ public void setupExperimentStatus(TaskStatusChangeRequest taskStatus){
ExperimentState state=ExperimentState.UNKNOWN;
switch(taskStatus.getState()){
case CANCELED:
@@ -79,11 +79,13 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
state=ExperimentState.EXECUTING; break;
case STARTED:
state=ExperimentState.LAUNCHED; break;
+ case CANCELING:
+ state=ExperimentState.CANCELING; break;
default:
break;
}
logger.debug("Publishing Experiment Status "+state.toString());
- monitorPublisher.publish(new ExperimentStatus(taskStatus.getMonitorID(),state));
+ monitorPublisher.publish(new ExperimentStatusChangeRequest(taskStatus.getMonitorID(),state));
}
public void updateTaskStatus(String taskId, TaskState state) throws Exception {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..f6dc360
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import java.util.Calendar;
+
+import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.Subscribe;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener{
+ private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+ private Registry airavataRegistry;
+
+// private MonitorPublisher monitorPublisher;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
+// ExperimentState state = experimentStatus.getState();
+// if (state != null) {
+// try {
+// String experimentID = experimentStatus.getMonitorID().getExperimentID();
+// updateWorkflowNodeStatus(experimentID, state);
+// } catch (Exception e) {
+// logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+// }
+// }
+ }
+
+ public void updateWorkflowNodeStatus(String experimentId, ExperimentState state) throws Exception {
+ Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+ if(details == null) {
+ details = new Experiment();
+ details.setExperimentID(experimentId);
+ }
+ org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+ status.setExperimentState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setExperimentStatus(status);
+ airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.EXPERIMENT, details, experimentId);
+ }
+
+ @Override
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+// } else if (configuration instanceof MonitorPublisher){
+// this.monitorPublisher=(MonitorPublisher) configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index ed89230..9a1b68b 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -74,8 +74,6 @@ public class MonitorManager {
private Monitor localJobMonitor;
- private List<AbstractActivityListener> activityListeners;
-
private Registry registry;
/**
@@ -99,7 +97,6 @@ public class MonitorManager {
private void loadActivityMonitors(){
try {
- activityListeners=new ArrayList<AbstractActivityListener>();
String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS);
if (activityListenersString!=null){
String[] activityListenerClasses = activityListenersString.split(",");
@@ -109,8 +106,6 @@ public class MonitorManager {
Class<?> classInstance = MonitorManager.class
.getClassLoader().loadClass(activityListenerClassName);
AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance();
- monitor.setup(registry, getFinishQueue(), getMonitorPublisher());
- activityListeners.add(monitor);
registerListener(monitor);
} catch (ClassNotFoundException e) {
logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e);
@@ -175,6 +170,23 @@ public class MonitorManager {
*/
public void registerListener(Object listener) {
monitorPublisher.registerListener(listener);
+ if (listener instanceof AbstractActivityListener){
+ ((AbstractActivityListener)listener).setup(registry, getFinishQueue(), getMonitorPublisher(), this);
+ }
+ }
+
+ public void registerListener(AbstractActivityListener listener) {
+ registerListener((Object)listener);
+ }
+
+ /**
+ * To remove listeners of changing statuses
+ *
+ * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+ * have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+ */
+ public void unregisterListener(Object listener) {
+ monitorPublisher.unregisterListener(listener);
}
/**
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..7a8183e
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor.command;
+
+public class ExperimentCancelRequest {
+ private String experimentId;
+
+ public ExperimentCancelRequest(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..5648265
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor.command;
+
+public class TaskCancelRequest {
+ private String experimentId;
+ private String nodeId;
+ private String taskId;
+
+ public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+ this.experimentId = experimentId;
+ this.setNodeId(nodeId);
+ this.taskId = taskId;
+ }
+ public String getExperimentId() {
+ return experimentId;
+ }
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+ public String getTaskId() {
+ return taskId;
+ }
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+ public String getNodeId() {
+ return nodeId;
+ }
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
index f8360ba..33ffa09 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
@@ -32,7 +32,7 @@ import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo
import org.apache.airavata.gsi.ssh.util.CommonUtils;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Assert;
@@ -154,7 +154,7 @@ public class QstatMonitorTestWithMyProxyAuth {
}
@Subscribe
- public void testCaseShutDown(JobStatus status) {
+ public void testCaseShutDown(JobStatusChangeRequest status) {
Assert.assertNotNull(status.getState());
try {
monitorManager.stopMonitor();
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index bfeb058..bb9865d 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -218,6 +218,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
public boolean terminateExperiment(String experimentId) throws TException {
- return false;
+ try {
+ orchestrator.cancelExperiment(experimentId);
+ } catch (OrchestratorException e) {
+ log.error("Error canceling experiment "+experimentId,e);
+ return false;
+ }
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index 47179b1..e81b76e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -20,15 +20,15 @@
*/
package org.apache.airavata.orchestrator.core.context;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.airavata.job.monitor.MonitorManager;
import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.registry.api.AiravataRegistry2;
-import org.apache.airavata.registry.api.OrchestratorRegistry;
import org.apache.airavata.registry.cpi.Registry;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* This is the context object used in orchestrator whic h
*/
@@ -40,6 +40,8 @@ public class OrchestratorContext {
private AiravataRegistry2 registry;
private Registry newRegistry;
+
+ private MonitorManager monitorManager;
public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
this.gfacInstanceList = new ArrayList<GFACInstance>();
@@ -79,4 +81,12 @@ public class OrchestratorContext {
public void setNewRegistry(Registry newRegistry) {
this.newRegistry = newRegistry;
}
+
+ public MonitorManager getMonitorManager() {
+ return monitorManager;
+ }
+
+ public void setMonitorManager(MonitorManager monitorManager) {
+ this.monitorManager = monitorManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index 5ae92ae..3341d9b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -21,18 +21,14 @@
package org.apache.airavata.orchestrator.core.impl;
-import java.util.*;
-
-import org.apache.airavata.common.utils.AiravataJobState;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.cpi.GFac;
+import org.apache.airavata.gfac.cpi.GFacImpl;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.gfac.cpi.GFacImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +48,9 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
this.orchestratorContext = orchestratorContext;
gfac = new GFacImpl(orchestratorContext.getNewRegistry(), null, orchestratorContext.getRegistry());
+ if (orchestratorContext.getMonitorManager()!=null) {
+ orchestratorContext.getMonitorManager().registerListener(gfac);
+ }
}
public GFACInstance selectGFACInstance() throws OrchestratorException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index 21b4866..3fd655f 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -52,7 +52,15 @@ public interface Orchestrator {
*/
public List<TaskDetails> createTasks(String experimentId) throws OrchestratorException;
-
+ /**
+ * After creating the experiment Data user have the
+ * experimentID as the handler to the experiment, during the launchExperiment
+ * We just have to give the experimentID
+ *
+ * @param experimentID
+ * @throws OrchestratorException
+ */
+ void cancelExperiment(String experimentID) throws OrchestratorException;
//todo have to add another method to handle failed or jobs to be recovered by orchestrator
//todo if you don't add these this is not an orchestrator, its just an intemediate component which invoke gfac
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
index 461bba5..7147ef9 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.UUID;
import org.apache.airavata.client.AiravataAPIFactory;
import org.apache.airavata.client.api.AiravataAPI;
@@ -39,7 +38,6 @@ import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
-import org.apache.airavata.orchestrator.core.model.ExperimentRequest;
import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
import org.apache.airavata.orchestrator.cpi.Orchestrator;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index d83e5c9..463e08e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -24,10 +24,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.job.monitor.AbstractActivityListener;
import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.MonitorManager;
+import org.apache.airavata.job.monitor.command.ExperimentCancelRequest;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -42,11 +43,12 @@ import org.apache.airavata.registry.cpi.Registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SimpleOrchestratorImpl extends AbstractOrchestrator {
+import com.google.common.eventbus.Subscribe;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator implements AbstractActivityListener{
private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
private ExecutorService executor;
-
-
+
// this is going to be null unless the thread count is 0
private JobSubmitter jobSubmitter = null;
@@ -124,8 +126,14 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator {
return tasks;
}
+ @Override
+ public void cancelExperiment(String experimentID)
+ throws OrchestratorException {
+ orchestratorContext.getMonitorManager().getMonitorPublisher().publish(new ExperimentCancelRequest(experimentID));
+ }
+
@Subscribe
- public void handlePostExperimentTask(JobStatus status) throws OrchestratorException {
+ public void handlePostExperimentTask(JobStatusChangeRequest status) throws OrchestratorException {
if(status.getState() == JobState.COMPLETE){
MonitorID monitorID = status.getMonitorID();
jobSubmitter.runAfterJobTask(monitorID.getExperimentID(), monitorID.getTaskID());
@@ -154,4 +162,19 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator {
public void setJobSubmitter(JobSubmitter jobSubmitter) {
this.jobSubmitter = jobSubmitter;
}
+
+ @Override
+ public void setup(Object... configurations) {
+ for (Object config : configurations) {
+ if (config instanceof MonitorManager){
+ orchestratorContext.setMonitorManager((MonitorManager)config);
+ try {
+ getJobSubmitter().initialize(orchestratorContext);
+ } catch (OrchestratorException e) {
+ logger.error("Error reinitializing the job submitter!!!",e);
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
----------------------------------------------------------------------
diff --git a/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java b/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
index 2a99269..5fa7cb4 100644
--- a/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
+++ b/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
@@ -30,6 +30,7 @@ import org.apache.airavata.api.client.AiravataClientFactory;
import org.apache.airavata.api.error.AiravataClientException;
import org.apache.airavata.api.error.AiravataSystemException;
import org.apache.airavata.api.error.ExperimentNotFoundException;
+import org.apache.airavata.api.error.AiravataClientConnectException;
import org.apache.airavata.api.error.InvalidRequestException;
import org.apache.airavata.client.AiravataAPIFactory;
import org.apache.airavata.client.api.AiravataAPI;
@@ -54,13 +55,13 @@ public class ExperimentSample {
return AiravataAPIFactory.getAPI("default", "admin");
}
- protected static Airavata.Client getClient() throws ApplicationSettingsException {
+ protected static Airavata.Client getClient() throws ApplicationSettingsException, AiravataClientConnectException {
String THRIFT_SERVER_HOST = ClientSettings.getSetting("thrift.server.host");
int THRIFT_SERVER_PORT = Integer.parseInt(ClientSettings.getSetting("thrift.server.port"));
return AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
}
- public static void main(String[] args) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, ApplicationSettingsException, TException, AiravataAPIInvocationException {
+ public static void main(String[] args) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, ApplicationSettingsException, TException, AiravataAPIInvocationException, AiravataClientConnectException {
AiravataAPI airavataAPI = getAiravataAPI();
DocumentCreator documentCreator = new DocumentCreator(airavataAPI);
documentCreator.createLocalHostDocs();
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index 718177c..bd6bfcb 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -40,28 +40,29 @@ public class MonitorID {
private String userName;
- private String jobID;
-
private Timestamp jobStartedTime;
private Timestamp lastMonitored;
private HostDescription host;
-
private AuthenticationInfo authenticationInfo = null;
private Map<String, Object> parameters;
private String experimentID;
+// private String workflowNodeID;
+
private String taskID;
+ private String jobID;
+
private int failedCount = 0;
private JobState state;
- public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) {
+ public MonitorID(HostDescription host, String jobID,String taskID, String experimentID, String userName) {
this.host = host;
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.userName = userName;
@@ -205,4 +206,12 @@ public class MonitorID {
this.state = status;
}
}
+
+// public String getWorkflowNodeID() {
+// return workflowNodeID;
+// }
+//
+// public void setWorkflowNodeID(String workflowNodeID) {
+// this.workflowNodeID = workflowNodeID;
+// }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
index 1a79a17..b4a06c9 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -24,7 +24,7 @@ import org.apache.airavata.job.monitor.HostMonitorData;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.JobState;
/**
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index cc85e58..3064781 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -20,19 +20,15 @@
*/
package org.apache.airavata.job.monitor.event;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.state.ExperimentStatus;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.job.monitor.state.TaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.EventBus;
-public class MonitorPublisher {
+public class MonitorPublisher{
private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
private EventBus eventBus;
-
+
public MonitorPublisher(EventBus eventBus) {
this.eventBus = eventBus;
}
@@ -40,20 +36,13 @@ public class MonitorPublisher {
public void registerListener(Object listener) {
eventBus.register(listener);
}
-
- public void publish(JobStatus jobStatus) {
- eventBus.post(jobStatus);
- }
-
- public void publish(TaskStatus taskStatus) {
- eventBus.post(taskStatus);
- }
- public void publish(ExperimentStatus experimentStatus) {
- eventBus.post(experimentStatus);
+ public void unregisterListener(Object listener) {
+ eventBus.unregister(listener);
}
-
- public void publish(MonitorID monitorID){
- eventBus.post(monitorID);
+
+ public void publish(Object o) {
+ eventBus.post(o);
}
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
index 8b9ebfd..de7cf90 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -23,7 +23,7 @@ package org.apache.airavata.job.monitor.impl;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.JobState;
import java.util.concurrent.BlockingQueue;
@@ -41,7 +41,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor {
do {
try {
MonitorID take = jobQueue.take();
- getPublisher().publish(new JobStatus(take, JobState.COMPLETE));
+ getPublisher().publish(new JobStatusChangeRequest(take, JobState.COMPLETE));
} catch (Exception e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index 59de0f4..1ec45fe 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -29,7 +29,7 @@ import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.PullMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
@@ -108,7 +108,7 @@ public class QstatMonitor extends PullMonitor {
//todo this polling will not work with multiple usernames but with single user
// and multiple hosts, currently monitoring will work
UserMonitorData take = null;
- JobStatus jobStatus = new JobStatus();
+ JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
MonitorID currentMonitorID = null;
HostDescription currentHostDescription = null;
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index addedbb..88a5198 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -34,7 +34,7 @@ import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.core.PushMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -201,7 +201,7 @@ public class AMQPMonitor extends PushMonitor {
}
}
next.setStatus(monitorID.getStatus());
- publisher.publish(new JobStatus(next,next.getStatus()));
+ publisher.publish(new JobStatusChangeRequest(next,next.getStatus()));
return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index 8992f56..53bcc8b 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -30,7 +30,7 @@ import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.MessageParser;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
index 1f85fc4..cdff685 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -27,7 +27,7 @@ import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.MessageParser;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
index a893372..becb4d7 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -24,7 +24,7 @@ import com.google.common.eventbus.Subscribe;
import com.rabbitmq.client.Channel;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
@@ -42,7 +42,7 @@ public class UnRegisterWorker{
}
@Subscribe
- private boolean unRegisterListener(JobStatus jobStatus) throws AiravataMonitorException {
+ private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
MonitorID monitorID = jobStatus.getMonitorID();
String channelID = CommonUtils.getChannelID(monitorID);
if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
new file mode 100644
index 0000000..909f10e
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor.state;
+
+import org.apache.airavata.job.monitor.MonitorID;
+
+public abstract class AbstractStateChangeRequest implements PublisherMessage{
+ private MonitorID monitorID;
+
+ public MonitorID getMonitorID() {
+ return monitorID;
+ }
+
+ public void setMonitorID(MonitorID monitorID) {
+ this.monitorID = monitorID;
+ }
+
+}