You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/22 16:44:56 UTC

[2/2] git commit: AIRAVATA-433 + fix client sample issue

AIRAVATA-433 + fix client sample issue


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/aed31d3e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/aed31d3e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/aed31d3e

Branch: refs/heads/master
Commit: aed31d3eb08c410611ef4a42205bae89a6de923d
Parents: 6791031
Author: Saminda Wijeratne <sa...@gmail.com>
Authored: Tue Apr 22 07:44:30 2014 -0700
Committer: Saminda Wijeratne <sa...@gmail.com>
Committed: Tue Apr 22 07:44:30 2014 -0700

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   | 17 ++--
 modules/gfac/gfac-core/pom.xml                  |  7 +-
 .../org/apache/airavata/gfac/Scheduler.java     |  2 -
 .../org/apache/airavata/gfac/cpi/GFacImpl.java  | 66 ++++++++++++---
 .../airavata/gfac/provider/GFacProvider.java    | 12 ++-
 .../gfac/provider/impl/AbstractProvider.java    | 51 +++++++++++-
 .../gfac/provider/impl/BESProvider.java         |  2 +-
 .../gfac/provider/impl/GSISSHProvider.java      | 17 +---
 .../gfac/provider/impl/GramProvider.java        |  3 +-
 .../gfac/provider/impl/HadoopProvider.java      |  4 +-
 .../gfac/provider/impl/LocalProvider.java       |  9 +-
 .../gfac/provider/impl/SSHProvider.java         |  3 +-
 modules/gfac/gfac-ec2/pom.xml                   |  6 +-
 .../apache/airavata/gfac/ec2/EC2Provider.java   | 54 ++++++------
 .../AiravataExperimentStatusUpdator.java        |  4 +-
 .../job/monitor/AiravataJobStatusUpdator.java   | 14 ++--
 .../job/monitor/AiravataTaskStatusUpdator.java  | 12 +--
 .../AiravataWorkflowNodeStatusUpdator.java      | 86 ++++++++++++++++++++
 .../airavata/job/monitor/MonitorManager.java    | 22 +++--
 .../command/ExperimentCancelRequest.java        | 38 +++++++++
 .../job/monitor/command/TaskCancelRequest.java  | 52 ++++++++++++
 .../QstatMonitorTestWithMyProxyAuth.java        |  4 +-
 .../server/OrchestratorServerHandler.java       |  8 +-
 .../core/context/OrchestratorContext.java       | 18 +++-
 .../core/impl/EmbeddedGFACJobSubmitter.java     |  9 +-
 .../airavata/orchestrator/cpi/Orchestrator.java | 10 ++-
 .../cpi/impl/AbstractOrchestrator.java          |  2 -
 .../cpi/impl/SimpleOrchestratorImpl.java        | 37 +++++++--
 .../airavata/api/samples/ExperimentSample.java  |  5 +-
 .../apache/airavata/job/monitor/MonitorID.java  | 17 +++-
 .../job/monitor/core/MessageParser.java         |  2 +-
 .../job/monitor/event/MonitorPublisher.java     | 27 ++----
 .../job/monitor/impl/LocalJobMonitor.java       |  4 +-
 .../monitor/impl/pull/qstat/QstatMonitor.java   |  4 +-
 .../job/monitor/impl/push/amqp/AMQPMonitor.java |  4 +-
 .../monitor/impl/push/amqp/BasicConsumer.java   |  2 +-
 .../impl/push/amqp/JSONMessageParser.java       |  2 +-
 .../impl/push/amqp/UnRegisterWorker.java        |  4 +-
 .../state/AbstractStateChangeRequest.java       | 37 +++++++++
 .../job/monitor/state/ExperimentStatus.java     | 65 ---------------
 .../state/ExperimentStatusChangeRequest.java    | 55 +++++++++++++
 .../airavata/job/monitor/state/JobStatus.java   | 67 ---------------
 .../monitor/state/JobStatusChangeRequest.java   | 56 +++++++++++++
 .../job/monitor/state/PublisherMessage.java     | 26 ++++++
 .../airavata/job/monitor/state/TaskStatus.java  | 65 ---------------
 .../monitor/state/TaskStatusChangeRequest.java  | 53 ++++++++++++
 .../apache/airavata/job/AMQPMonitorTest.java    |  4 +-
 .../job/QstatMonitorTestWithMyProxyAuth.java    |  6 +-
 48 files changed, 713 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index bbbf123..1681a35 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -27,6 +27,7 @@ import org.apache.airavata.api.error.*;
 import org.apache.airavata.model.workspace.Project;
 import org.apache.airavata.orchestrator.client.OrchestratorClientFactory;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
+import org.apache.airavata.orchestrator.cpi.OrchestratorService.Client;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.*;
@@ -43,12 +44,12 @@ import java.util.Map;
 public class AiravataServerHandler implements Airavata.Iface {
 
     private Registry registry;
+	private OrchestratorService.Client orchestratorClient;
     private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
 	 //FIXME: these go in a configuration file or a "constants" class. 
     public static final String ORCHESTRATOR_SERVER_HOST = "localhost";
 	 //FIXME: these go in a configuration file or a "constants" class. 
     public static final int ORCHESTRATOR_SERVER_PORT = 8940;
-    private OrchestratorService.Client orchestratorClient;
     /**
      * Query Airavata to fetch the API version
      */
@@ -409,9 +410,7 @@ public class AiravataServerHandler implements Airavata.Iface {
      */
     @Override
     public void launchExperiment(String airavataExperimentId, String airavataCredStoreToken) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, TException {
-        if(orchestratorClient == null){
-            orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(ORCHESTRATOR_SERVER_HOST, ORCHESTRATOR_SERVER_PORT);
-        }
+        final OrchestratorService.Client orchestratorClient = getOrchestratorClient();
         final String expID = airavataExperimentId;
         (new Thread(){
             public void run(){
@@ -424,6 +423,13 @@ public class AiravataServerHandler implements Airavata.Iface {
         }).start();
     }
 
+	private OrchestratorService.Client getOrchestratorClient() {
+		if(orchestratorClient == null){
+            orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(ORCHESTRATOR_SERVER_HOST, ORCHESTRATOR_SERVER_PORT);
+        }
+		return orchestratorClient;
+	}
+
     /**
      * Clone an specified experiment with a new name. A copy of the experiment configuration is made and is persisted with new metadata.
      * The client has to subsequently update this configuration if needed and launch the cloned experiment.
@@ -483,7 +489,8 @@ public class AiravataServerHandler implements Airavata.Iface {
      */
     @Override
     public void terminateExperiment(String airavataExperimentId) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, TException {
-
+    	Client client = getOrchestratorClient();
+    	client.terminateExperiment(airavataExperimentId);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 68b2120..e755c66 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -94,8 +94,11 @@
             <artifactId>airavata-workflow-tracking</artifactId>
             <version>${project.version}</version>
         </dependency>
-
-   
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+       		<artifactId>gfac-monitor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- Credential Store -->
         <dependency>
             <groupId>org.apache.airavata</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
index 7f0deb6..1b30240 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
@@ -27,7 +27,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -36,7 +35,6 @@ import javax.xml.xpath.XPathExpressionException;
 
 import org.apache.airavata.client.api.AiravataAPI;
 import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.context.JobExecutionContext;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 6c80009..d6a61a3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -20,16 +20,24 @@
 */
 package org.apache.airavata.gfac.cpi;
 
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.airavata.client.api.AiravataAPI;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.commons.gfac.type.ServiceDescription;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.credential.store.store.CredentialReaderFactory;
-import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl;
-import org.apache.airavata.gfac.*;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.Scheduler;
 import org.apache.airavata.gfac.context.ApplicationContext;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.MessageContext;
@@ -56,29 +64,36 @@ import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthentica
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.AbstractActivityListener;
+import org.apache.airavata.job.monitor.MonitorManager;
+import org.apache.airavata.job.monitor.command.ExperimentCancelRequest;
+import org.apache.airavata.job.monitor.command.TaskCancelRequest;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.persistance.registry.jpa.resources.AbstractResource.TaskDetailConstants;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.schemas.gfac.*;
-import org.apache.airavata.schemas.wec.ContextHeaderDocument;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
+import org.apache.airavata.schemas.gfac.Ec2HostType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.airavata.schemas.gfac.UnicoreHostType;
 import org.apache.airavata.schemas.wec.SecurityContextDocument;
-import org.apache.airavata.workflow.model.exceptions.WorkflowException;
-import org.apache.xmlbeans.XmlObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.net.URL;
-import java.util.*;
+import com.google.common.eventbus.Subscribe;
 
 /**
  * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
  * the resource, required data for the job has to be stored in registry prior to invoke this object.
  */
-public class GFacImpl implements GFac {
+public class GFacImpl implements GFac, AbstractActivityListener {
     private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
     public static final String ERROR_SENT = "ErrorSent";
     public static final String PBS_JOB_MANAGER = "pbs";
@@ -90,6 +105,8 @@ public class GFacImpl implements GFac {
     private AiravataAPI airavataAPI;
 
     private AiravataRegistry2 airavataRegistry2;
+    
+    private MonitorManager monitorManager;
 
     /**
      * Constructor for GFac
@@ -468,4 +485,29 @@ public class GFacImpl implements GFac {
         }
     }
 
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof MonitorManager){
+				monitorManager=(MonitorManager) configuration;
+			} 	
+		}
+			
+	}
+	
+	@Subscribe
+	public void experimentCancelRequested(ExperimentCancelRequest request){
+		try {
+			List<String> nodeIds = registry.getIds(DataType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, request.getExperimentId());
+			for (String nodeId : nodeIds) {
+				List<String> taskIds = registry.getIds(DataType.TASK_DETAIL, TaskDetailConstants.NODE_INSTANCE_ID, nodeId);
+				for (String taskId : taskIds) {
+					monitorManager.getMonitorPublisher().publish(new TaskCancelRequest(request.getExperimentId(),nodeId, taskId));
+				}
+			}
+		} catch (RegistryException e) {
+			log.error("Error while attempting to publish task cancel requests!!!",e);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
index a6d5182..5c33ec1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProvider.java
@@ -21,13 +21,16 @@
 
 package org.apache.airavata.gfac.provider;
 
+import java.util.Map;
+
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.schemas.gfac.HadoopHostType;
+import org.apache.airavata.job.monitor.AbstractActivityListener;
+import org.apache.airavata.job.monitor.command.TaskCancelRequest;
 
-import java.util.Map;
+import com.google.common.eventbus.Subscribe;
 
-public interface GFacProvider {
+public interface GFacProvider extends AbstractActivityListener{
 
     void initProperties(Map<String,String> properties) throws GFacProviderException,GFacException;
     /**
@@ -60,5 +63,6 @@ public interface GFacProvider {
      */
     public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
 
-
+    @Subscribe
+    public void taskCancelRequested(TaskCancelRequest request);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
index dbbcb62..3ba02b9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
@@ -21,22 +21,71 @@
 
 package org.apache.airavata.gfac.provider.impl;
 
+import java.util.List;
+
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.command.TaskCancelRequest;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.JobStatus;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.JobDetaisConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractProvider implements GFacProvider{
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
-public abstract class AbstractProvider{
     protected Registry registry = null;
 	protected JobDetails details;     //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
 	protected JobStatus status;   //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
+	protected JobExecutionContext jobExecutionContext;
+
+	private MonitorPublisher monitorPublisher;
 
 	public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
 		registry = RegistryFactory.getDefaultRegistry();
 		details = new JobDetails();
 		status = new JobStatus();
+		this.jobExecutionContext=jobExecutionContext;
+	}
+	
+	@Override
+	public void taskCancelRequested(TaskCancelRequest request) {
+		try {
+			List<Object> jobDetails = registry.get(DataType.JOB_DETAIL, JobDetaisConstants.TASK_ID, request.getTaskId());
+			for (Object o : jobDetails) {
+				JobDetails jd=(JobDetails)o;
+				JobState jobState = jd.getJobStatus().getJobState();
+				if (jobState!=JobState.CANCELED || jobState!=JobState.CANCELING || jobState!=JobState.COMPLETE || jobState!=JobState.FAILED){
+					MonitorID monitorId = new MonitorID(null, jd.getJobID(), request.getTaskId(), request.getExperimentId(), null, null);
+					monitorPublisher.publish(new JobStatusChangeRequest(monitorId, JobState.CANCELING));
+					log.debug("Canceling job "+jd.getJobID());
+					cancelJob(jd.getJobID(), jobExecutionContext);
+				}
+			}
+		} catch (RegistryException e) {
+			log.error("Error retrieving job details for Task "+request.getTaskId(),e);
+		} catch (Exception e) {
+			log.error("Error canceling jobs!!!",e);
+		}
+	}
+	
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
index 03dbf42..654c9ec 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
@@ -92,7 +92,7 @@ import eu.unicore.util.httpclient.DefaultClientConfiguration;
 
 
 
-public class BESProvider extends AbstractProvider implements GFacProvider{
+public class BESProvider extends AbstractProvider{
     protected final Logger log = LoggerFactory.getLogger(this.getClass());
 
     private DefaultClientConfiguration secProperties;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
index 8eaf5d2..69ad519 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
@@ -20,43 +20,28 @@
 */
 package org.apache.airavata.gfac.provider.impl;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.commons.gfac.type.ActualParameter;
-import org.apache.airavata.commons.gfac.type.MappingFactory;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.gfac.context.MessageContext;
 import org.apache.airavata.gfac.context.security.GSISecurityContext;
 import org.apache.airavata.gfac.context.security.SSHSecurityContext;
 import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
-import org.apache.airavata.gsi.ssh.impl.PBSCluster;
-import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
 import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
 import org.apache.airavata.model.workspace.experiment.ErrorCategory;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.schemas.gfac.FileArrayType;
 import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
-import org.apache.airavata.schemas.gfac.StringArrayType;
-import org.apache.airavata.schemas.gfac.URIArrayType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GSISSHProvider extends AbstractProvider implements GFacProvider{
+public class GSISSHProvider extends AbstractProvider{
     private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
 
     public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
index 9654b7d..4066c00 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.security.GSISecurityContext;
 import org.apache.airavata.gfac.notification.events.JobIDEvent;
 import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gfac.utils.GramJobSubmissionListener;
@@ -57,7 +56,7 @@ import org.ietf.jgss.GSSException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GramProvider extends AbstractProvider implements GFacProvider{
+public class GramProvider extends AbstractProvider{
     private static final Logger log = LoggerFactory.getLogger(GramJobSubmissionListener.class);
 
     private GramJob job;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
index a5762a1..c20e2ea 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
@@ -31,7 +31,6 @@ import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.provider.utils.HadoopUtils;
 import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
@@ -49,13 +48,14 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 /**
  * Executes hadoop job using the cluster configuration provided by handlers in
  * in-flow.
  */
-public class HadoopProvider extends AbstractProvider implements GFacProvider{
+public class HadoopProvider extends AbstractProvider{
     private static final Logger logger = LoggerFactory.getLogger(HadoopProvider.class);
 
     private boolean isWhirrBasedDeployment = false;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
index e446614..a12bf5d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
@@ -20,21 +20,16 @@
 */
 package org.apache.airavata.gfac.provider.impl;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 
-import javax.xml.bind.JAXB;
-
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.provider.utils.ProviderUtils;
 import org.apache.airavata.gfac.utils.GFacUtils;
@@ -44,8 +39,6 @@ import org.apache.airavata.gfac.utils.OutputUtils;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.NameValuePairType;
 import org.apache.xmlbeans.XmlException;
@@ -54,7 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
-public class LocalProvider extends AbstractProvider implements GFacProvider{
+public class LocalProvider extends AbstractProvider{
     private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
     private ProcessBuilder builder;
     private List<String> cmdList;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
index 5914dd3..12e2ad1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
@@ -39,7 +39,6 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.MessageContext;
 import org.apache.airavata.gfac.context.security.SSHSecurityContext;
-import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gsi.ssh.api.Cluster;
@@ -61,7 +60,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 /**
  * Execute application using remote SSH
  */
-public class SSHProvider extends AbstractProvider implements GFacProvider{
+public class SSHProvider extends AbstractProvider{
     private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
     private Cluster cluster;
     private String jobID = null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-ec2/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/pom.xml b/modules/gfac/gfac-ec2/pom.xml
index 7d696e6..df2d035 100644
--- a/modules/gfac/gfac-ec2/pom.xml
+++ b/modules/gfac/gfac-ec2/pom.xml
@@ -52,7 +52,11 @@
             <artifactId>airavata-workflow-execution-context</artifactId>
             <version>${project.version}</version>
         </dependency>
-
+		<dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>gfac-monitor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- Workflow Tracking -->
         <dependency>
             <groupId>org.apache.airavata</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
index 30c8c89..e773fe6 100644
--- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
+++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java
@@ -21,22 +21,12 @@
 
 package org.apache.airavata.gfac.ec2;
 
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.ec2.AmazonEC2Client;
-import com.amazonaws.services.ec2.model.*;
-import com.sshtools.j2ssh.SshClient;
-import com.sshtools.j2ssh.authentication.AuthenticationProtocolState;
-import com.sshtools.j2ssh.authentication.PublicKeyAuthenticationClient;
-import com.sshtools.j2ssh.configuration.SshConnectionProperties;
-import com.sshtools.j2ssh.session.SessionChannelClient;
-import com.sshtools.j2ssh.transport.HostKeyVerification;
-import com.sshtools.j2ssh.transport.TransportProtocolException;
-import com.sshtools.j2ssh.transport.publickey.InvalidSshKeyException;
-import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
-import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
-import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.airavata.client.api.AiravataAPI;
 import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
@@ -46,14 +36,11 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.ec2.util.AmazonEC2Util;
 import org.apache.airavata.gfac.ec2.util.EC2ProviderUtil;
-import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.provider.impl.AbstractProvider;
 import org.apache.airavata.gfac.provider.utils.ProviderUtils;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.registry.api.workflow.ApplicationJob;
-import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType;
 import org.apache.airavata.schemas.gfac.OutputParameterType;
@@ -63,14 +50,29 @@ import org.slf4j.LoggerFactory;
 
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.AuthorizeSecurityGroupIngressRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
+import com.amazonaws.services.ec2.model.DescribeInstancesResult;
+import com.amazonaws.services.ec2.model.GroupIdentifier;
+import com.amazonaws.services.ec2.model.Instance;
+import com.amazonaws.services.ec2.model.IpPermission;
+import com.sshtools.j2ssh.SshClient;
+import com.sshtools.j2ssh.authentication.AuthenticationProtocolState;
+import com.sshtools.j2ssh.authentication.PublicKeyAuthenticationClient;
+import com.sshtools.j2ssh.configuration.SshConnectionProperties;
+import com.sshtools.j2ssh.session.SessionChannelClient;
+import com.sshtools.j2ssh.transport.HostKeyVerification;
+import com.sshtools.j2ssh.transport.TransportProtocolException;
+import com.sshtools.j2ssh.transport.publickey.InvalidSshKeyException;
+import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
+import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
+import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
 
-public class EC2Provider extends AbstractProvider implements GFacProvider {
+public class EC2Provider extends AbstractProvider {
 
     private static final Logger log = LoggerFactory.getLogger(EC2Provider.class);
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
index e4d905a..5455f1b 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
@@ -22,7 +22,7 @@ package org.apache.airavata.job.monitor;
 
 import java.util.Calendar;
 
-import org.apache.airavata.job.monitor.state.ExperimentStatus;
+import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.registry.cpi.DataType;
@@ -46,7 +46,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
     }
 
     @Subscribe
-    public void updateRegistry(ExperimentStatus experimentStatus) {
+    public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
         ExperimentState state = experimentStatus.getState();
         if (state != null) {
             try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index ec03d71..74fdf43 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -24,8 +24,8 @@ import java.util.Calendar;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.job.monitor.state.TaskStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.model.workspace.experiment.TaskState;
@@ -63,7 +63,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
     }
 
     @Subscribe
-    public void updateRegistry(JobStatus jobStatus) {
+    public void updateRegistry(JobStatusChangeRequest jobStatus) {
         /* Here we need to parse the jobStatus message and update
                 the registry accordingly, for now we are just printing to standard Out
                  */
@@ -110,6 +110,8 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
                     logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
                     jobsToMonitor.remove(jobStatus.getMonitorID());
                     break;
+                case CANCELING:
+                    logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CENCELING");
 			default:
 				break;
             }
@@ -117,7 +119,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
     }
     
     @Subscribe
-    public void setupTaskStatus(JobStatus jobStatus){
+    public void setupTaskStatus(JobStatusChangeRequest jobStatus){
     	TaskState state=TaskState.UNKNOWN;
     	switch(jobStatus.getState()){
     	case ACTIVE:
@@ -136,11 +138,13 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
     		state=TaskState.STARTED; break;
     	case UN_SUBMITTED:
     		state=TaskState.CANCELED; break;
+    	case CANCELING:
+    		state=TaskState.CANCELING; break;
 		default:
 			break;
     	}
     	logger.debug("Publishing Task Status "+state.toString());
-    	monitorPublisher.publish(new TaskStatus(jobStatus.getMonitorID(),state));
+    	monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getMonitorID(),state));
     }
     
     public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
index 8ba94d6..40b095a 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
@@ -23,8 +23,8 @@ package org.apache.airavata.job.monitor;
 import java.util.Calendar;
 
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.state.ExperimentStatus;
-import org.apache.airavata.job.monitor.state.TaskStatus;
+import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.TaskState;
@@ -51,7 +51,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
     }
 
     @Subscribe
-    public void updateRegistry(TaskStatus taskStatus) {
+    public void updateRegistry(TaskStatusChangeRequest taskStatus) {
         TaskState state = taskStatus.getState();
         if (state != null) {
             try {
@@ -64,7 +64,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
     }
     
     @Subscribe
-    public void setupExperimentStatus(TaskStatus taskStatus){
+    public void setupExperimentStatus(TaskStatusChangeRequest taskStatus){
     	ExperimentState state=ExperimentState.UNKNOWN;
     	switch(taskStatus.getState()){
     	case CANCELED:
@@ -79,11 +79,13 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
     		state=ExperimentState.EXECUTING; break;
     	case STARTED:
     		state=ExperimentState.LAUNCHED; break;
+    	case CANCELING:
+    		state=ExperimentState.CANCELING; break;
 		default:
 			break;
     	}
     	logger.debug("Publishing Experiment Status "+state.toString());
-    	monitorPublisher.publish(new ExperimentStatus(taskStatus.getMonitorID(),state));
+    	monitorPublisher.publish(new ExperimentStatusChangeRequest(taskStatus.getMonitorID(),state));
     }
     
     public  void updateTaskStatus(String taskId, TaskState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..f6dc360
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import java.util.Calendar;
+
+import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.Subscribe;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener{
+    private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+    private Registry airavataRegistry;
+    
+//    private MonitorPublisher monitorPublisher;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
+//        ExperimentState state = experimentStatus.getState();
+//        if (state != null) {
+//            try {
+//                String experimentID = experimentStatus.getMonitorID().getExperimentID();
+//                updateWorkflowNodeStatus(experimentID, state);
+//            } catch (Exception e) {
+//                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+//            }
+//        }
+    }
+    
+    public  void updateWorkflowNodeStatus(String experimentId, ExperimentState state) throws Exception {
+    	Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+        if(details == null) {
+            details = new Experiment();
+            details.setExperimentID(experimentId);
+        }
+        org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+        status.setExperimentState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setExperimentStatus(status);
+        airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.EXPERIMENT, details, experimentId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+//			} else if (configuration instanceof MonitorPublisher){
+//				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index ed89230..9a1b68b 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -74,8 +74,6 @@ public class MonitorManager {
 
     private Monitor localJobMonitor;
     
-    private List<AbstractActivityListener> activityListeners;
-
     private Registry registry;
 
     /**
@@ -99,7 +97,6 @@ public class MonitorManager {
     
     private void loadActivityMonitors(){
 		try {
-			activityListeners=new ArrayList<AbstractActivityListener>();
 			String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS);
 			if (activityListenersString!=null){
 				String[] activityListenerClasses = activityListenersString.split(",");
@@ -109,8 +106,6 @@ public class MonitorManager {
 						Class<?>  classInstance = MonitorManager.class
 						        .getClassLoader().loadClass(activityListenerClassName);
 						AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance();
-						monitor.setup(registry, getFinishQueue(), getMonitorPublisher());
-						activityListeners.add(monitor);
 						registerListener(monitor);
 					} catch (ClassNotFoundException e) {
 						logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e);
@@ -175,6 +170,23 @@ public class MonitorManager {
      */
     public void registerListener(Object listener) {
         monitorPublisher.registerListener(listener);
+        if (listener instanceof AbstractActivityListener){
+        	((AbstractActivityListener)listener).setup(registry, getFinishQueue(), getMonitorPublisher(), this);
+        }
+    }
+    
+    public void registerListener(AbstractActivityListener listener) {
+    	registerListener((Object)listener);
+    }
+
+    /**
+     * To remove listeners of changing statuses
+     *
+     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+     */
+    public void unregisterListener(Object listener) {
+        monitorPublisher.unregisterListener(listener);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..7a8183e
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor.command;
+
+public class ExperimentCancelRequest {
+	private String experimentId;
+
+	public ExperimentCancelRequest(String experimentId) {
+		this.experimentId = experimentId;
+	}
+
+	public String getExperimentId() {
+		return experimentId;
+	}
+
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..5648265
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor.command;
+
+public class TaskCancelRequest {
+	private String experimentId;
+	private String nodeId;
+	private String taskId;
+	
+	public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+		this.experimentId = experimentId;
+		this.setNodeId(nodeId);
+		this.taskId = taskId;
+	}
+	public String getExperimentId() {
+		return experimentId;
+	}
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+	public String getNodeId() {
+		return nodeId;
+	}
+	public void setNodeId(String nodeId) {
+		this.nodeId = nodeId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
index f8360ba..33ffa09 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
@@ -32,7 +32,7 @@ import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.junit.Assert;
@@ -154,7 +154,7 @@ public class QstatMonitorTestWithMyProxyAuth {
     }
 
     @Subscribe
-    public void testCaseShutDown(JobStatus status) {
+    public void testCaseShutDown(JobStatusChangeRequest status) {
         Assert.assertNotNull(status.getState());
         try {
             monitorManager.stopMonitor();

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index bfeb058..bb9865d 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -218,6 +218,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
     }
 
     public boolean terminateExperiment(String experimentId) throws TException {
-        return false;
+    	try {
+			orchestrator.cancelExperiment(experimentId);
+		} catch (OrchestratorException e) {
+			log.error("Error canceling experiment "+experimentId,e);
+			return false;
+		}
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index 47179b1..e81b76e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -20,15 +20,15 @@
 */
 package org.apache.airavata.orchestrator.core.context;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.airavata.job.monitor.MonitorManager;
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.registry.api.AiravataRegistry2;
-import org.apache.airavata.registry.api.OrchestratorRegistry;
 import org.apache.airavata.registry.cpi.Registry;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * This is the context object used in orchestrator whic h
  */
@@ -40,6 +40,8 @@ public class OrchestratorContext {
     private AiravataRegistry2 registry;
 
     private Registry newRegistry;
+    
+    private MonitorManager monitorManager;
 
     public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
         this.gfacInstanceList = new ArrayList<GFACInstance>();
@@ -79,4 +81,12 @@ public class OrchestratorContext {
     public void setNewRegistry(Registry newRegistry) {
         this.newRegistry = newRegistry;
     }
+
+	public MonitorManager getMonitorManager() {
+		return monitorManager;
+	}
+
+	public void setMonitorManager(MonitorManager monitorManager) {
+		this.monitorManager = monitorManager;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
index 5ae92ae..3341d9b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java
@@ -21,18 +21,14 @@
 package org.apache.airavata.orchestrator.core.impl;
 
 
-import java.util.*;
-
-import org.apache.airavata.common.utils.AiravataJobState;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.cpi.GFac;
+import org.apache.airavata.gfac.cpi.GFacImpl;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.gfac.cpi.GFacImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +48,9 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter {
     public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
         this.orchestratorContext = orchestratorContext;
         gfac = new GFacImpl(orchestratorContext.getNewRegistry(), null, orchestratorContext.getRegistry());
+        if (orchestratorContext.getMonitorManager()!=null) {
+			orchestratorContext.getMonitorManager().registerListener(gfac);
+		}
     }
 
     public GFACInstance selectGFACInstance() throws OrchestratorException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index 21b4866..3fd655f 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -52,7 +52,15 @@ public interface Orchestrator {
      */
     public List<TaskDetails> createTasks(String experimentId) throws OrchestratorException;
 
-
+    /**
+     * After creating the experiment Data user have the
+     * experimentID as the handler to the experiment, during the launchExperiment
+     * We just have to give the experimentID
+     *
+     * @param experimentID
+     * @throws OrchestratorException
+     */
+    void cancelExperiment(String experimentID) throws OrchestratorException;
     //todo have to add another method to handle failed or jobs to be recovered by orchestrator
     //todo if you don't add these this is not an orchestrator, its just an intemediate component which invoke gfac
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
index 461bba5..7147ef9 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.UUID;
 
 import org.apache.airavata.client.AiravataAPIFactory;
 import org.apache.airavata.client.api.AiravataAPI;
@@ -39,7 +38,6 @@ import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
-import org.apache.airavata.orchestrator.core.model.ExperimentRequest;
 import org.apache.airavata.orchestrator.core.utils.OrchestratorConstants;
 import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
 import org.apache.airavata.orchestrator.cpi.Orchestrator;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index d83e5c9..463e08e 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -24,10 +24,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.job.monitor.AbstractActivityListener;
 import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.MonitorManager;
+import org.apache.airavata.job.monitor.command.ExperimentCancelRequest;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -42,11 +43,12 @@ import org.apache.airavata.registry.cpi.Registry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SimpleOrchestratorImpl extends AbstractOrchestrator {
+import com.google.common.eventbus.Subscribe;
+
+public class SimpleOrchestratorImpl extends AbstractOrchestrator implements AbstractActivityListener{
     private final static Logger logger = LoggerFactory.getLogger(SimpleOrchestratorImpl.class);
     private ExecutorService executor;
-
-
+    
     // this is going to be null unless the thread count is 0
     private JobSubmitter jobSubmitter = null;
 
@@ -124,8 +126,14 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator {
         return tasks;
     }
 
+	@Override
+	public void cancelExperiment(String experimentID)
+			throws OrchestratorException {
+		orchestratorContext.getMonitorManager().getMonitorPublisher().publish(new ExperimentCancelRequest(experimentID));
+	}
+
     @Subscribe
-    public void handlePostExperimentTask(JobStatus status) throws OrchestratorException {
+    public void handlePostExperimentTask(JobStatusChangeRequest status) throws OrchestratorException {
         if(status.getState() == JobState.COMPLETE){
             MonitorID monitorID = status.getMonitorID();
             jobSubmitter.runAfterJobTask(monitorID.getExperimentID(), monitorID.getTaskID());
@@ -154,4 +162,19 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator {
     public void setJobSubmitter(JobSubmitter jobSubmitter) {
         this.jobSubmitter = jobSubmitter;
     }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object config : configurations) {
+			if (config instanceof MonitorManager){
+				orchestratorContext.setMonitorManager((MonitorManager)config);
+				try {
+					getJobSubmitter().initialize(orchestratorContext);
+				} catch (OrchestratorException e) {
+					logger.error("Error reinitializing the job submitter!!!",e);
+				}
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
----------------------------------------------------------------------
diff --git a/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java b/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
index 2a99269..5fa7cb4 100644
--- a/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
+++ b/samples/java-client/experiment/src/main/java/org/apache/airavata/api/samples/ExperimentSample.java
@@ -30,6 +30,7 @@ import org.apache.airavata.api.client.AiravataClientFactory;
 import org.apache.airavata.api.error.AiravataClientException;
 import org.apache.airavata.api.error.AiravataSystemException;
 import org.apache.airavata.api.error.ExperimentNotFoundException;
+import org.apache.airavata.api.error.AiravataClientConnectException;
 import org.apache.airavata.api.error.InvalidRequestException;
 import org.apache.airavata.client.AiravataAPIFactory;
 import org.apache.airavata.client.api.AiravataAPI;
@@ -54,13 +55,13 @@ public class ExperimentSample {
         return  AiravataAPIFactory.getAPI("default", "admin");
     }
 	
-	protected static Airavata.Client getClient() throws ApplicationSettingsException {
+	protected static Airavata.Client getClient() throws ApplicationSettingsException, AiravataClientConnectException {
         String THRIFT_SERVER_HOST = ClientSettings.getSetting("thrift.server.host");
         int THRIFT_SERVER_PORT = Integer.parseInt(ClientSettings.getSetting("thrift.server.port"));
         return AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
 
     }
-	public static void main(String[] args) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, ApplicationSettingsException, TException, AiravataAPIInvocationException {
+	public static void main(String[] args) throws InvalidRequestException, ExperimentNotFoundException, AiravataClientException, AiravataSystemException, ApplicationSettingsException, TException, AiravataAPIInvocationException, AiravataClientConnectException {
 	AiravataAPI airavataAPI = getAiravataAPI();
 	DocumentCreator documentCreator = new DocumentCreator(airavataAPI);
         documentCreator.createLocalHostDocs();

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index 718177c..bd6bfcb 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -40,28 +40,29 @@ public class MonitorID {
 
     private String userName;
 
-    private String jobID;
-
     private Timestamp jobStartedTime;
 
     private Timestamp lastMonitored;
 
     private HostDescription host;
 
-
     private AuthenticationInfo authenticationInfo = null;
 
     private Map<String, Object> parameters;
 
     private String experimentID;
 
+//    private String workflowNodeID;
+
     private String taskID;
 
+    private String jobID;
+
     private int failedCount = 0;
 
     private JobState state;
 
-    public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) {
+    public MonitorID(HostDescription host, String jobID,String taskID, String experimentID, String userName) {
         this.host = host;
         this.jobStartedTime = new Timestamp((new Date()).getTime());
         this.userName = userName;
@@ -205,4 +206,12 @@ public class MonitorID {
                 this.state = status;
             }
     }
+
+//	public String getWorkflowNodeID() {
+//		return workflowNodeID;
+//	}
+//
+//	public void setWorkflowNodeID(String workflowNodeID) {
+//		this.workflowNodeID = workflowNodeID;
+//	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
index 1a79a17..b4a06c9 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -24,7 +24,7 @@ import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.JobState;
 
 /**

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index cc85e58..3064781 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -20,19 +20,15 @@
 */
 package org.apache.airavata.job.monitor.event;
 
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.state.ExperimentStatus;
-import org.apache.airavata.job.monitor.state.JobStatus;
-import org.apache.airavata.job.monitor.state.TaskStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.EventBus;
 
-public class MonitorPublisher {
+public class MonitorPublisher{
     private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
     private EventBus eventBus;
-
+    
     public MonitorPublisher(EventBus eventBus) {
         this.eventBus = eventBus;
     }
@@ -40,20 +36,13 @@ public class MonitorPublisher {
     public void registerListener(Object listener) {
         eventBus.register(listener);
     }
-
-    public void publish(JobStatus jobStatus) {
-        eventBus.post(jobStatus);
-    }
-
-    public void publish(TaskStatus taskStatus) {
-        eventBus.post(taskStatus);
-    }
     
-    public void publish(ExperimentStatus experimentStatus) {
-        eventBus.post(experimentStatus);
+    public void unregisterListener(Object listener) {
+        eventBus.unregister(listener);
     }
-    
-    public void publish(MonitorID monitorID){
-        eventBus.post(monitorID);
+
+    public void publish(Object o) {
+        eventBus.post(o);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
index 8b9ebfd..de7cf90 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -23,7 +23,7 @@ package org.apache.airavata.job.monitor.impl;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.JobState;
 
 import java.util.concurrent.BlockingQueue;
@@ -41,7 +41,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor {
         do {
             try {
                 MonitorID take = jobQueue.take();
-                getPublisher().publish(new JobStatus(take, JobState.COMPLETE));
+                getPublisher().publish(new JobStatusChangeRequest(take, JobState.COMPLETE));
             } catch (Exception e) {
                 e.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index 59de0f4..1ec45fe 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -29,7 +29,7 @@ import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
@@ -108,7 +108,7 @@ public class QstatMonitor extends PullMonitor {
         //todo this polling will not work with multiple usernames but with single user
         // and multiple hosts, currently monitoring will work
         UserMonitorData take = null;
-        JobStatus jobStatus = new JobStatus();
+        JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
         MonitorID currentMonitorID = null;
         HostDescription currentHostDescription = null;
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index addedbb..88a5198 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -34,7 +34,7 @@ import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
 import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -201,7 +201,7 @@ public class AMQPMonitor extends PushMonitor {
             }
         }
         next.setStatus(monitorID.getStatus());
-        publisher.publish(new JobStatus(next,next.getStatus()));
+        publisher.publish(new JobStatusChangeRequest(next,next.getStatus()));
         return true;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index 8992f56..53bcc8b 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -30,7 +30,7 @@ import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.MessageParser;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
index 1f85fc4..cdff685 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -27,7 +27,7 @@ import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.MessageParser;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
index a893372..becb4d7 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -24,7 +24,7 @@ import com.google.common.eventbus.Subscribe;
 import com.rabbitmq.client.Channel;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
@@ -42,7 +42,7 @@ public class UnRegisterWorker{
     }
 
     @Subscribe
-    private boolean unRegisterListener(JobStatus jobStatus) throws AiravataMonitorException {
+    private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
         MonitorID monitorID = jobStatus.getMonitorID();
         String channelID = CommonUtils.getChannelID(monitorID);
         if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){

http://git-wip-us.apache.org/repos/asf/airavata/blob/aed31d3e/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
new file mode 100644
index 0000000..909f10e
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor.state;
+
+import org.apache.airavata.job.monitor.MonitorID;
+
+public abstract class AbstractStateChangeRequest implements PublisherMessage{
+    private MonitorID monitorID;
+	
+	public MonitorID getMonitorID() {
+	    return monitorID;
+	}
+	
+	public void setMonitorID(MonitorID monitorID) {
+	    this.monitorID = monitorID;
+	}
+
+}