You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/23 21:42:35 UTC

[8/8] git commit: merging monitoring with gfac-core, later this will be separated

merging monitoring with gfac-core, later this will be separated


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b0230849
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b0230849
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b0230849

Branch: refs/heads/master
Commit: b02308493b0d9c9497ce25fac70da0aef910a822
Parents: 0364b65
Author: lahiru <la...@apache.org>
Authored: Wed Apr 23 15:42:00 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Apr 23 15:42:00 2014 -0400

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |   6 +-
 modules/distribution/server/pom.xml             |  10 -
 .../org/apache/airavata/gfac/cpi/GFacImpl.java  |  41 +--
 .../gfac/monitor/AbstractActivityListener.java  |  27 ++
 .../AiravataExperimentStatusUpdator.java        |  81 +++++
 .../gfac/monitor/AiravataJobStatusUpdator.java  | 145 ++++++++
 .../gfac/monitor/AiravataTaskStatusUpdator.java | 113 ++++++
 .../AiravataWorkflowNodeStatusUpdator.java      | 112 ++++++
 .../gfac/monitor/ExperimentIdentity.java        |  36 ++
 .../airavata/gfac/monitor/HostMonitorData.java  |  69 ++++
 .../airavata/gfac/monitor/JobIdentity.java      |  39 +++
 .../apache/airavata/gfac/monitor/MonitorID.java | 225 ++++++++++++
 .../airavata/gfac/monitor/MonitorManager.java   | 347 +++++++++++++++++++
 .../airavata/gfac/monitor/TaskIdentity.java     |  38 ++
 .../airavata/gfac/monitor/UserMonitorData.java  |  76 ++++
 .../gfac/monitor/WorkflowNodeIdentity.java      |  37 ++
 .../command/ExperimentCancelRequest.java        |  38 ++
 .../gfac/monitor/command/TaskCancelRequest.java |  52 +++
 .../monitor/core/AiravataAbstractMonitor.java   |  46 +++
 .../gfac/monitor/core/MessageParser.java        |  43 +++
 .../airavata/gfac/monitor/core/Monitor.java     |  30 ++
 .../airavata/gfac/monitor/core/PullMonitor.java |  64 ++++
 .../airavata/gfac/monitor/core/PushMonitor.java |  60 ++++
 .../gfac/monitor/event/MonitorPublisher.java    |  47 +++
 .../exception/AiravataMonitorException.java     |  37 ++
 .../gfac/monitor/impl/LocalJobMonitor.java      |  59 ++++
 .../monitor/impl/pull/qstat/QstatMonitor.java   | 262 ++++++++++++++
 .../impl/pull/qstat/ResourceConnection.java     | 151 ++++++++
 .../monitor/impl/push/amqp/AMQPMonitor.java     | 263 ++++++++++++++
 .../monitor/impl/push/amqp/BasicConsumer.java   |  86 +++++
 .../impl/push/amqp/JSONMessageParser.java       |  78 +++++
 .../impl/push/amqp/UnRegisterWorker.java        |  68 ++++
 .../state/AbstractStateChangeRequest.java       |  27 ++
 .../state/ExperimentStatusChangeRequest.java    |  63 ++++
 .../monitor/state/JobStatusChangeRequest.java   |  74 ++++
 .../gfac/monitor/state/JobStatusInfo.java       |  48 +++
 .../gfac/monitor/state/PublisherMessage.java    |  26 ++
 .../monitor/state/TaskStatusChangeRequest.java  |  61 ++++
 .../state/WorkflowNodeStatusChangeRequest.java  |  63 ++++
 .../monitor/state/impl/AmazonJobStatusInfo.java |  39 +++
 .../monitor/state/impl/GridJobStatusInfo.java   |  40 +++
 .../gfac/monitor/util/AMQPConnectionUtil.java   |  77 ++++
 .../airavata/gfac/monitor/util/CommonUtils.java | 172 +++++++++
 .../airavata/gfac/monitor/util/X509Helper.java  | 161 +++++++++
 .../airavata/gfac/provider/GFacProvider.java    |   4 +-
 .../gfac/provider/impl/AbstractProvider.java    |  10 +-
 .../gfac/provider/impl/GSISSHProvider.java      |   3 -
 .../src/main/resources/schema/AccessPolicy.json |  13 +
 .../src/main/resources/schema/Activity.json     |  31 ++
 .../src/main/resources/schema/AdminDomain.json  |  51 +++
 .../schema/ApplicationEnvironment.json          |  86 +++++
 .../resources/schema/ApplicationHandle.json     |  21 ++
 .../src/main/resources/schema/Benchmark.json    |  21 ++
 .../resources/schema/ComputingActivity.json     | 165 +++++++++
 .../resources/schema/ComputingEndpoint.json     |  44 +++
 .../main/resources/schema/ComputingManager.json | 117 +++++++
 .../main/resources/schema/ComputingService.json |  32 ++
 .../main/resources/schema/ComputingShare.json   | 182 ++++++++++
 .../src/main/resources/schema/Contact.json      |  32 ++
 .../src/main/resources/schema/DataStore.json    |  30 ++
 .../src/main/resources/schema/Domain.json       |  30 ++
 .../src/main/resources/schema/Endpoint.json     | 147 ++++++++
 .../src/main/resources/schema/Entity.json       |  35 ++
 .../resources/schema/ExecutionEnvironment.json  | 115 ++++++
 .../src/main/resources/schema/Glue2.json        | 246 +++++++++++++
 .../src/main/resources/schema/Location.json     |  47 +++
 .../src/main/resources/schema/Manager.json      |  28 ++
 .../main/resources/schema/MappingPolicy.json    |  13 +
 .../src/main/resources/schema/Policy.json       |  27 ++
 .../src/main/resources/schema/Resource.json     |  27 ++
 .../src/main/resources/schema/Service.json      |  75 ++++
 .../src/main/resources/schema/Share.json        |  45 +++
 .../resources/schema/StorageAccessProtocol.json |  32 ++
 .../main/resources/schema/StorageEndpoint.json  |   8 +
 .../main/resources/schema/StorageManager.json   |   8 +
 .../main/resources/schema/StorageService.json   |  22 ++
 .../schema/StorageServiceCapacity.json          |  33 ++
 .../src/main/resources/schema/StorageShare.json |  65 ++++
 .../resources/schema/StorageShareCapacity.json  |  33 ++
 .../resources/schema/ToComputingService.json    |  32 ++
 .../main/resources/schema/ToStorageService.json |  25 ++
 .../src/main/resources/schema/UserDomain.json   |  58 ++++
 .../apache/airavata/job/AMQPMonitorTest.java    | 175 ++++++++++
 .../job/QstatMonitorTestWithMyProxyAuth.java    | 167 +++++++++
 modules/gfac/gfac-monitor/pom.xml               |  80 -----
 .../job/monitor/AbstractActivityListener.java   |  27 --
 .../airavata/job/monitor/MonitorManager.java    | 347 -------------------
 .../command/ExperimentCancelRequest.java        |  38 --
 .../job/monitor/command/TaskCancelRequest.java  |  52 ---
 .../src/main/resources/PBSTemplate.xslt         |  77 ----
 .../src/main/resources/gsissh.properties        |  26 --
 .../airavata/job/monitor/AMQPMonitorTest.java   | 144 --------
 .../QstatMonitorTestWithMyProxyAuth.java        | 165 ---------
 .../src/test/resources/gsissh.properties        |  26 --
 .../src/test/resources/monitor.properties       |   3 -
 modules/gfac/gfac-ssh/pom.xml                   |   2 +-
 .../gfac/handler/AdvancedSCPInputHandler.java   |  13 +-
 .../gfac/handler/AdvancedSCPOutputHandler.java  |  19 +-
 .../gfac/handler/SCPDirectorySetupHandler.java  |  10 +
 .../airavata/gfac/handler/SCPInputHandler.java  |  11 +-
 .../airavata/gfac/handler/SCPOutputHandler.java |  11 +-
 .../gfac/provider/impl/SSHProvider.java         |  13 +-
 .../apache/airavata/gfac/util/GFACSSHUtils.java |  88 +++++
 modules/gfac/pom.xml                            |   1 -
 .../airavata-orchestrator-service/pom.xml       |   7 +-
 .../server/OrchestratorServerHandler.java       | 136 ++------
 .../src/test/resources/monitor.properties       |   4 +-
 modules/orchestrator/orchestrator-core/pom.xml  |   5 -
 .../core/context/OrchestratorContext.java       |   2 +-
 .../airavata/orchestrator/cpi/Orchestrator.java |   9 +-
 .../cpi/impl/SimpleOrchestratorImpl.java        | 142 +++++++-
 .../orchestrator/core/NewOrchestratorTest.java  |  20 +-
 .../core/OrchestratorTestWithMyProxyAuth.java   | 100 +++---
 tools/job-monitor/pom.xml                       | 162 ---------
 .../job/monitor/ExperimentIdentity.java         |  36 --
 .../airavata/job/monitor/HostMonitorData.java   |  69 ----
 .../airavata/job/monitor/JobIdentity.java       |  39 ---
 .../airavata/job/monitor/TaskIdentity.java      |  38 --
 .../airavata/job/monitor/UserMonitorData.java   |  76 ----
 .../job/monitor/WorkflowNodeIdentity.java       |  37 --
 .../monitor/core/AiravataAbstractMonitor.java   |  46 ---
 .../job/monitor/core/MessageParser.java         |  47 ---
 .../airavata/job/monitor/core/Monitor.java      |  30 --
 .../airavata/job/monitor/core/PullMonitor.java  |  64 ----
 .../airavata/job/monitor/core/PushMonitor.java  |  61 ----
 .../job/monitor/event/MonitorPublisher.java     |  48 ---
 .../exception/AiravataMonitorException.java     |  37 --
 .../job/monitor/impl/LocalJobMonitor.java       |  59 ----
 .../monitor/impl/pull/qstat/QstatMonitor.java   | 262 --------------
 .../impl/pull/qstat/ResourceConnection.java     | 152 --------
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 268 --------------
 .../monitor/impl/push/amqp/BasicConsumer.java   |  93 -----
 .../impl/push/amqp/JSONMessageParser.java       |  82 -----
 .../impl/push/amqp/UnRegisterWorker.java        |  68 ----
 .../state/AbstractStateChangeRequest.java       |  27 --
 .../state/ExperimentStatusChangeRequest.java    |  63 ----
 .../monitor/state/JobStatusChangeRequest.java   |  74 ----
 .../job/monitor/state/JobStatusInfo.java        |  48 ---
 .../job/monitor/state/PublisherMessage.java     |  26 --
 .../monitor/state/TaskStatusChangeRequest.java  |  61 ----
 .../state/WorkflowNodeStatusChangeRequest.java  |  63 ----
 .../monitor/state/impl/AmazonJobStatusInfo.java |  39 ---
 .../monitor/state/impl/GridJobStatusInfo.java   |  40 ---
 .../job/monitor/util/AMQPConnectionUtil.java    |  78 -----
 .../airavata/job/monitor/util/CommonUtils.java  | 174 ----------
 .../airavata/job/monitor/util/X509Helper.java   | 170 ---------
 .../src/main/resources/PBSTemplate.xslt         |  77 ----
 .../src/main/resources/gsissh.properties        |  26 --
 .../src/main/resources/schema/AccessPolicy.json |  13 -
 .../src/main/resources/schema/Activity.json     |  31 --
 .../src/main/resources/schema/AdminDomain.json  |  51 ---
 .../schema/ApplicationEnvironment.json          |  86 -----
 .../resources/schema/ApplicationHandle.json     |  21 --
 .../src/main/resources/schema/Benchmark.json    |  21 --
 .../resources/schema/ComputingActivity.json     | 165 ---------
 .../resources/schema/ComputingEndpoint.json     |  44 ---
 .../main/resources/schema/ComputingManager.json | 117 -------
 .../main/resources/schema/ComputingService.json |  32 --
 .../main/resources/schema/ComputingShare.json   | 182 ----------
 .../src/main/resources/schema/Contact.json      |  32 --
 .../src/main/resources/schema/DataStore.json    |  30 --
 .../src/main/resources/schema/Domain.json       |  30 --
 .../src/main/resources/schema/Endpoint.json     | 147 --------
 .../src/main/resources/schema/Entity.json       |  35 --
 .../resources/schema/ExecutionEnvironment.json  | 115 ------
 .../src/main/resources/schema/Glue2.json        | 246 -------------
 .../src/main/resources/schema/Location.json     |  47 ---
 .../src/main/resources/schema/Manager.json      |  28 --
 .../main/resources/schema/MappingPolicy.json    |  13 -
 .../src/main/resources/schema/Policy.json       |  27 --
 .../src/main/resources/schema/Resource.json     |  27 --
 .../src/main/resources/schema/Service.json      |  75 ----
 .../src/main/resources/schema/Share.json        |  45 ---
 .../resources/schema/StorageAccessProtocol.json |  32 --
 .../main/resources/schema/StorageEndpoint.json  |   8 -
 .../main/resources/schema/StorageManager.json   |   8 -
 .../main/resources/schema/StorageService.json   |  22 --
 .../schema/StorageServiceCapacity.json          |  33 --
 .../src/main/resources/schema/StorageShare.json |  65 ----
 .../resources/schema/StorageShareCapacity.json  |  33 --
 .../resources/schema/ToComputingService.json    |  32 --
 .../main/resources/schema/ToStorageService.json |  25 --
 .../src/main/resources/schema/UserDomain.json   |  58 ----
 .../apache/airavata/job/AMQPMonitorTest.java    | 179 ----------
 .../job/QstatMonitorTestWithMyProxyAuth.java    | 168 ---------
 .../src/test/resources/gsissh.properties        |  26 --
 .../src/test/resources/monitor.properties       |   3 -
 tools/pom.xml                                   |   1 -
 188 files changed, 6288 insertions(+), 6355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 4a65a71..7169ea8 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -256,13 +256,13 @@ TwoPhase=true
 ###---------------------------Monitoring module Configurations---------------------------###
 #This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
 #mechanisms and one would be able to start a monitor
-monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.LocalJobMonitor
-#,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
+monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.gfac.monitor.impl.LocalJobMonitor
+#,org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor
 #This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.gfac.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.monitor.AiravataExperimentStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
 job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 6629860..0ff7f4e 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -331,16 +331,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>job-monitor-tool</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>gfac-monitor</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-api-server</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index 064c9ba..14ea519 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -51,7 +51,6 @@ import org.apache.airavata.gfac.notification.listeners.WorkflowTrackingListener;
 import org.apache.airavata.gfac.provider.GFacProvider;
 import org.apache.airavata.gfac.scheduler.HostScheduler;
 import org.apache.airavata.gfac.utils.GFacUtils;
-import org.apache.airavata.gfac.context.security.SSHSecurityContext;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
@@ -64,10 +63,10 @@ import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthentica
 import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.job.monitor.AbstractActivityListener;
-import org.apache.airavata.job.monitor.MonitorManager;
-import org.apache.airavata.job.monitor.command.ExperimentCancelRequest;
-import org.apache.airavata.job.monitor.command.TaskCancelRequest;
+import org.apache.airavata.gfac.monitor.AbstractActivityListener;
+import org.apache.airavata.gfac.monitor.MonitorManager;
+import org.apache.airavata.gfac.monitor.command.ExperimentCancelRequest;
+import org.apache.airavata.gfac.monitor.command.TaskCancelRequest;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -451,41 +450,9 @@ public class GFacImpl implements GFac, AbstractActivityListener {
 //               if (this.configuration.getAmazonSecurityContext() != null) {
 //                   jobExecutionContext.addSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT,
 //                           this.configuration.getAmazonSecurityContext());
-        } else if (registeredHost.getType() instanceof SSHHostType) {
-            String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME);
-            String sshPrivateKey = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY);
-            String sshPrivateKeyPass = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS);
-            String sshPassword = configurationProperties.getProperty(Constants.SSH_PASSWORD);
-            String sshPublicKey = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY);
-            SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
-                AuthenticationInfo authenticationInfo = null;
-                // we give higher preference to the password over keypair ssh authentication
-                if (sshPassword != null) {
-                    authenticationInfo = new DefaultPasswordAuthenticationInfo(sshPassword);
-                } else {
-                    authenticationInfo = new DefaultPublicKeyFileAuthentication(sshPublicKey, sshPrivateKey, sshPrivateKeyPass);
-                }
-                ServerInfo serverInfo = new ServerInfo(sshUserName, registeredHost.getType().getHostAddress());
-
-                Cluster pbsCluster = null;
-                try {
-                    String installedParentPath = "/";
-                    if(((SSHHostType) registeredHost.getType()).getHpcResource()){
-                    	installedParentPath = ((HpcApplicationDeploymentType)
-                                jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
-                    }
-                    pbsCluster = new PBSCluster(serverInfo, authenticationInfo,
-                            CommonUtils.getPBSJobManager(installedParentPath));
-                } catch (SSHApiException e) {
-                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-                sshSecurityContext.setPbsCluster(pbsCluster);
-                sshSecurityContext.setUsername(sshUserName);
-           jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, sshSecurityContext);
         }
     }
 
-	@Override
 	public void setup(Object... configurations) {
 		for (Object configuration : configurations) {
 			if (configuration instanceof MonitorManager){

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
new file mode 100644
index 0000000..63f89df
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+
+public interface AbstractActivityListener {
+	public void setup(Object... configurations);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
new file mode 100644
index 0000000..a70b14f
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
+        ExperimentState state = experimentStatus.getState();
+        if (state != null) {
+            try {
+                String experimentID = experimentStatus.getIdentity().getExperimentID();
+                updateExperimentStatus(experimentID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+
+    public  void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
+    	Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+        if(details == null) {
+            details = new Experiment();
+            details.setExperimentID(experimentId);
+        }
+        org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+        status.setExperimentState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setExperimentStatus(status);
+        airavataRegistry.update(DataType.EXPERIMENT, details, experimentId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..99c8733
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.concurrent.BlockingQueue;
+
+public class AiravataJobStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+
+    private BlockingQueue<MonitorID> jobsToMonitor;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    public BlockingQueue<MonitorID> getJobsToMonitor() {
+        return jobsToMonitor;
+    }
+
+    public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
+        this.jobsToMonitor = jobsToMonitor;
+    }
+
+    @Subscribe
+    public void updateRegistry(JobStatusChangeRequest jobStatus) {
+        /* Here we need to parse the jobStatus message and update
+                the registry accordingly, for now we are just printing to standard Out
+                 */
+        JobState state = jobStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = jobStatus.getIdentity().getTaskId();
+                String jobID = jobStatus.getIdentity().getJobId();
+                updateJobStatus(taskID, jobID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+            logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
+            switch (state) {
+                case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    break;
+			default:
+				break;
+            }
+        }
+    }
+
+    @Subscribe
+    public void setupTaskStatus(JobStatusChangeRequest jobStatus){
+    	TaskState state=TaskState.UNKNOWN;
+    	switch(jobStatus.getState()){
+    	case ACTIVE:
+    		state=TaskState.EXECUTING; break;
+    	case CANCELED:
+    		state=TaskState.CANCELED; break;
+    	case COMPLETE:
+    		state=TaskState.COMPLETED; break;
+    	case FAILED:
+    		state=TaskState.FAILED; break;
+    	case HELD: case SUSPENDED: case QUEUED:
+    		state=TaskState.WAITING; break;
+    	case SETUP:
+    		state=TaskState.PRE_PROCESSING; break;
+    	case SUBMITTED:
+    		state=TaskState.STARTED; break;
+    	case UN_SUBMITTED:
+    		state=TaskState.CANCELED; break;
+    	case CANCELING:
+    		state=TaskState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Task Status "+state.toString());
+    	monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
+    }
+
+    public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
+        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+        JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
+        if(details == null) {
+            details = new JobDetails();
+        }
+        org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+        status.setJobState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setJobStatus(status);
+        details.setJobID(jobID);
+        airavataRegistry.update(DataType.JOB_DETAIL, details, ids);
+    }
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof BlockingQueue<?>){
+				this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
new file mode 100644
index 0000000..e8dd7a0
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataTaskStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+    
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(TaskStatusChangeRequest taskStatus) {
+        TaskState state = taskStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = taskStatus.getIdentity().getTaskId();
+                updateTaskStatus(taskID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+    
+    @Subscribe
+    public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
+    	WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+    	switch(taskStatus.getState()){
+    	case CANCELED:
+    		state=WorkflowNodeState.CANCELED; break;
+    	case COMPLETED:
+    		state=WorkflowNodeState.COMPLETED; break;
+    	case CONFIGURING_WORKSPACE:
+    		state=WorkflowNodeState.INVOKED; break;
+    	case FAILED:
+    		state=WorkflowNodeState.FAILED; break;
+    	case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+    		state=WorkflowNodeState.EXECUTING; break;
+    	case STARTED:
+    		state=WorkflowNodeState.INVOKED; break;
+    	case CANCELING:
+    		state=WorkflowNodeState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Experiment Status "+state.toString());
+    	monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
+    }
+    
+    public  void updateTaskStatus(String taskId, TaskState state) throws Exception {
+    	TaskDetails details = (TaskDetails)airavataRegistry.get(DataType.TASK_DETAIL, taskId);
+        if(details == null) {
+            details = new TaskDetails();
+            details.setTaskID(taskId);
+        }
+        org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus();
+        status.setExecutionState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setTaskStatus(status);
+        airavataRegistry.update(DataType.TASK_DETAIL, details, taskId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..2375d72
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
+        WorkflowNodeState state = workflowNodeStatus.getState();
+        if (state != null) {
+            try {
+                String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
+                updateWorkflowNodeStatus(workflowNodeID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+
+    @Subscribe
+    public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){
+    	ExperimentState state=ExperimentState.UNKNOWN;
+    	switch(nodeStatus.getState()){
+    	case CANCELED:
+    		state=ExperimentState.CANCELED; break;
+    	case COMPLETED:
+    		state=ExperimentState.COMPLETED; break;
+    	case INVOKED:
+    		state=ExperimentState.LAUNCHED; break;
+    	case FAILED:
+    		state=ExperimentState.FAILED; break;
+    	case EXECUTING:
+    		state=ExperimentState.EXECUTING; break;
+    	case CANCELING:
+    		state=ExperimentState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Experiment Status "+state.toString());
+    	monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state));
+    }
+
+    public  void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
+    	WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+        if(details == null) {
+            details = new WorkflowNodeDetails();
+            details.setNodeInstanceId(workflowNodeId);
+        }
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setWorkflowNodeStatus(status);
+        airavataRegistry.update(DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
new file mode 100644
index 0000000..ba8efeb
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class ExperimentIdentity {
+	private String experimentID;
+	public ExperimentIdentity(String experimentId) {
+		setExperimentID(experimentId);
+	}
+	public String getExperimentID() {
+		return experimentID;
+	}
+
+	public void setExperimentID(String experimentID) {
+		this.experimentID = experimentID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
new file mode 100644
index 0000000..e57087d
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+    private HostDescription host;
+
+    private List<MonitorID> monitorIDs;
+
+    public HostMonitorData(HostDescription host) {
+        this.host = host;
+        monitorIDs = new ArrayList<MonitorID>();
+    }
+
+    public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) {
+        this.host = host;
+        this.monitorIDs = monitorIDs;
+    }
+
+    public HostDescription getHost() {
+        return host;
+    }
+
+    public void setHost(HostDescription host) {
+        this.host = host;
+    }
+
+    public List<MonitorID> getMonitorIDs() {
+        return monitorIDs;
+    }
+
+    public void setMonitorIDs(List<MonitorID> monitorIDs) {
+        this.monitorIDs = monitorIDs;
+    }
+
+    /**
+     * this method get called by CommonUtils and it will check the right place before adding
+     * so there will not be a mismatch between this.host and monitorID.host
+     * @param monitorID
+     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+     */
+    public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException {
+        monitorIDs.add(monitorID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
new file mode 100644
index 0000000..84c4a55
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class JobIdentity extends TaskIdentity {
+	private String jobId;
+	
+	public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
+		super(experimentId,workflowNodeId,taskId);
+		setJobId(jobId);
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(String jobId) {
+		this.jobId = jobId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
new file mode 100644
index 0000000..c022bef
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Map;
+
+/*
+This is the object which contains the data to identify a particular
+Job to start the monitoring
+*/
+public class MonitorID {
+    private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
+
+    private String userName;
+
+    private Timestamp jobStartedTime;
+
+    private Timestamp lastMonitored;
+
+    private HostDescription host;
+
+    private AuthenticationInfo authenticationInfo = null;
+
+    private Map<String, Object> parameters;
+
+    private String experimentID;
+
+    private String workflowNodeID;
+
+    private String taskID;
+
+    private String jobID;
+
+    private int failedCount = 0;
+
+    private JobState state;
+
+    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
+        this.host = host;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.userName = userName;
+        this.jobID = jobID;
+        this.taskID = taskID;
+        this.experimentID = experimentID;
+    }
+
+    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
+        this.host = host;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.authenticationInfo = authenticationInfo;
+        this.userName = userName;
+        // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
+        if(this.authenticationInfo != null){
+            if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
+                this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
+            }
+        }
+        this.jobID = jobID;
+        this.taskID = taskID;
+        this.experimentID = experimentID;
+    }
+    public HostDescription getHost() {
+        return host;
+    }
+
+    public void setHost(HostDescription host) {
+        this.host = host;
+    }
+
+    public Timestamp getLastMonitored() {
+        return lastMonitored;
+    }
+
+    public void setLastMonitored(Timestamp lastMonitored) {
+        this.lastMonitored = lastMonitored;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getJobID() {
+        return jobID;
+    }
+
+    public void setJobID(String jobID) {
+        this.jobID = jobID;
+    }
+
+    public Timestamp getJobStartedTime() {
+        return jobStartedTime;
+    }
+
+    public void setJobStartedTime(Timestamp jobStartedTime) {
+        this.jobStartedTime = jobStartedTime;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+
+    public void addParameter(String key,Object value) {
+        this.parameters.put(key, value);
+    }
+
+    public Object getParameter(String key) {
+        return this.parameters.get(key);
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+    }
+
+    public String getExperimentID() {
+        return experimentID;
+    }
+
+    public void setExperimentID(String experimentID) {
+        this.experimentID = experimentID;
+    }
+
+    public String getTaskID() {
+        return taskID;
+    }
+
+    public void setTaskID(String taskID) {
+        this.taskID = taskID;
+    }
+
+    public int getFailedCount() {
+        return failedCount;
+    }
+
+    public void setFailedCount(int failedCount) {
+        this.failedCount = failedCount;
+    }
+
+    public JobState getStatus() {
+        return state;
+    }
+
+    public void setStatus(JobState status) {
+        // this logic is going to be useful for fast finishing jobs
+        // because in some machines job state vanishes quicckly when the job is done
+        // during that case job state comes as unknown.so we handle it here.
+            if (this.state != null && status.equals(JobState.UNKNOWN)) {
+                if (getFailedCount() > 2) {
+                    switch (this.state) {
+                        case ACTIVE:
+                            this.state = JobState.COMPLETE;
+                            break;
+                        case QUEUED:
+                            this.state = JobState.COMPLETE;
+                            break;
+                    }
+                } else {
+                    try {
+                        // when state becomes unknown we sleep for a while
+                        Thread.sleep(10000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    setFailedCount(getFailedCount() + 1);
+                }
+            } else {
+                // normal scenario
+                this.state = status;
+            }
+    }
+
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+
+//	public String getWorkflowNodeID() {
+//		return workflowNodeID;
+//	}
+//
+//	public void setWorkflowNodeID(String workflowNodeID) {
+//		this.workflowNodeID = workflowNodeID;
+//	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
new file mode 100644
index 0000000..b703a0a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/MonitorManager.java
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.core.Monitor;
+import org.apache.airavata.gfac.monitor.core.PullMonitor;
+import org.apache.airavata.gfac.monitor.core.PushMonitor;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.LocalJobMonitor;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/*
+this is the manager class for monitoring system of airavata,
+This simply handle the monitoring flow of the system.
+Keeps available jobs to monitor in a queue and once they are done
+remove them from the queue, this will be done by AiravataJobUpdator.
+ */
+public class MonitorManager {
+    private final static Logger logger = LoggerFactory.getLogger(MonitorManager.class);
+    
+	private final static String ACTIVITY_LISTENERS = "activity.listeners";
+
+    private List<PullMonitor> pullMonitors;    //todo though we have a List we only support one at a time
+
+    private List<PushMonitor> pushMonitors;   //todo we need to support multiple monitors dynamically
+
+    private BlockingQueue<UserMonitorData> pullQueue;
+
+    private BlockingQueue<MonitorID> pushQueue;
+
+    private BlockingQueue<MonitorID> localJobQueue;
+
+    private BlockingQueue<MonitorID> finishQueue;
+
+    private MonitorPublisher monitorPublisher;
+
+    private Monitor localJobMonitor;
+
+    private Registry registry;
+
+    /**
+     * This will initialize the major monitoring system.
+     */
+    public MonitorManager() {
+    	this(new RegistryImpl());
+    }
+
+    public MonitorManager(Registry registry) {
+        pullMonitors = new ArrayList<PullMonitor>();
+        pushMonitors = new ArrayList<PushMonitor>();
+        pullQueue = new LinkedBlockingQueue<UserMonitorData>();
+        pushQueue = new LinkedBlockingQueue<MonitorID>();
+        finishQueue = new LinkedBlockingQueue<MonitorID>();
+        localJobQueue = new LinkedBlockingQueue<MonitorID>();
+        monitorPublisher = new MonitorPublisher(new EventBus());
+        this.registry = registry;
+        loadActivityMonitors();
+    }
+
+    private void loadActivityMonitors(){
+		try {
+			String activityListenersString = ServerSettings.getSetting(ACTIVITY_LISTENERS);
+			if (activityListenersString!=null){
+				String[] activityListenerClasses = activityListenersString.split(",");
+				for (String activityListenerClassName : activityListenerClasses) {
+					try {
+						activityListenerClassName=activityListenerClassName.trim();
+						Class<?>  classInstance = MonitorManager.class
+						        .getClassLoader().loadClass(activityListenerClassName);
+						AbstractActivityListener monitor=(AbstractActivityListener)classInstance.newInstance();
+						registerListener(monitor);
+					} catch (ClassNotFoundException e) {
+						logger.error("Error while locating activity monitor implementation \""+activityListenerClassName+"\"!!!",e);
+					} catch (InstantiationException e) {
+						logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
+					} catch (IllegalAccessException e) {
+						logger.error("Error while initiating activity monitor instance \""+activityListenerClassName+"\"!!!",e);
+					} catch (ClassCastException e){
+						logger.error("Invalid activity monitor \""+activityListenerClassName+"\"!!!",e);
+					}
+				}
+			}
+		} catch (ApplicationSettingsException e1) {
+			logger.warn("Error in reading activity monitors!!!", e1);
+		}
+
+    }
+    /**
+     * This can be use to add an empty AMQPMonitor object to the monitor system
+     * and tihs method will take care of the initialization
+     * todo may be we need to move this to some other class
+     * @param monitor
+     */
+    public void addAMQPMonitor(AMQPMonitor monitor) {
+        monitor.setPublisher(this.getMonitorPublisher());
+        monitor.setFinishQueue(this.getFinishQueue());
+        monitor.setRunningQueue(this.getPushQueue());
+        addPushMonitor(monitor);
+    }
+
+
+    /**
+     * This can be use to add an empty AMQPMonitor object to the monitor system
+     * and tihs method will take care of the initialization
+     * todo may be we need to move this to some other class
+     * @param monitor
+     */
+    public void addLocalMonitor(LocalJobMonitor monitor) {
+        monitor.setPublisher(this.getMonitorPublisher());
+        monitor.setJobQueue(this.getLocalJobQueue());
+        localJobMonitor = monitor;
+    }
+
+    /**
+     * This can be used to adda a QstatMonitor and it will take care of
+     * the initialization of QstatMonitor
+     * //todo may be we need to move this to some other class
+     * @param qstatMonitor
+     */
+    public void addQstatMonitor(QstatMonitor qstatMonitor) {
+        qstatMonitor.setPublisher(this.getMonitorPublisher());
+        qstatMonitor.setQueue(this.getPullQueue());
+        addPullMonitor(qstatMonitor);
+
+    }
+
+    /**
+     * To deal with the statuses users can write their own listener and implement their own logic
+     *
+     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+     */
+    public void registerListener(Object listener) {
+        monitorPublisher.registerListener(listener);
+        if (listener instanceof AbstractActivityListener){
+        	((AbstractActivityListener)listener).setup(registry, getFinishQueue(), getMonitorPublisher(), this);
+        }
+    }
+
+    public void registerListener(AbstractActivityListener listener) {
+    	registerListener((Object)listener);
+    }
+
+    /**
+     * To remove listeners of changing statuses
+     *
+     * @param listener Any class can be written and if you want the JobStatus object to be taken from the bus, just
+     *                 have to put @subscribe as an annotation to your method to recieve the JobStatus object from the bus.
+     */
+    public void unregisterListener(Object listener) {
+        monitorPublisher.unregisterListener(listener);
+    }
+
+    /**
+     * todo write
+     *
+     * @param monitor
+     */
+    public void addPushMonitor(PushMonitor monitor) {
+        pushMonitors.add(monitor);
+    }
+
+    /**
+     * todo write
+     *
+     * @param monitor
+     */
+    public void addPullMonitor(PullMonitor monitor) {
+        pullMonitors.add(monitor);
+    }
+
+
+    /**
+     * Adding this method will trigger the thread in launchMonitor and notify it
+     * This is going to be useful during the startup of the launching process
+     * @param monitorID
+     * @throws AiravataMonitorException
+     * @throws InterruptedException
+     */
+    public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
+
+        if (monitorID.getHost().getType() instanceof GsisshHostType) {
+            GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
+            if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
+                    || Constants.PULL.equals(host.getMonitorMode())) {
+                CommonUtils.addMonitortoQueue(pullQueue, monitorID);
+            } else if (Constants.PUSH.equals(host.getMonitorMode())) {
+                pushQueue.put(monitorID);
+                finishQueue.put(monitorID);
+            }
+        } else if(monitorID.getHost().getType() instanceof GlobusHostType){
+            logger.error("Monitoring does not support GlubusHostType resources");
+        } else if(monitorID.getHost().getType() instanceof SSHHostType) {
+            logger.error("Monitoring does not support GlubusHostType resources");
+            localJobQueue.add(monitorID);
+        } else {
+            // we assume this is a type of localJobtype
+            localJobQueue.add(monitorID);
+        }
+    }
+
+    /**
+     * This method should be invoked before adding any elements to monitorQueue
+     * In this method we assume that we give higher preference to Push
+     * Monitorig mechanism if there's any configured, otherwise Pull
+     * monitoring will be launched.
+     * Ex: If there's a reasource which doesn't support Push, we have
+     * to live with Pull MOnitoring.
+     *
+     * @throws AiravataMonitorException
+     */
+    public void launchMonitor() throws AiravataMonitorException {
+        //no push monitor is configured so we launch pull monitor
+        if(localJobMonitor != null){
+            (new Thread(localJobMonitor)).start();
+        }
+
+        for (PullMonitor monitor : pullMonitors) {
+            (new Thread(monitor)).start();
+        }
+
+        //todo fix this
+        for (PushMonitor monitor : pushMonitors) {
+            (new Thread(monitor)).start();
+        }
+    }
+
+    /**
+     * This method should be invoked before adding any elements to monitorQueue
+     * In this method we assume that we give higher preference to Push
+     * Monitorig mechanism if there's any configured, otherwise Pull
+     * monitoring will be launched.
+     * Ex: If there's a reasource which doesn't support Push, we have
+     * to live with Pull MOnitoring.
+     *
+     * @throws AiravataMonitorException
+     */
+    public void stopMonitor() throws AiravataMonitorException {
+        //no push monitor is configured so we launch pull monitor
+        if(localJobMonitor != null){
+            (new Thread(localJobMonitor)).interrupt();
+        }
+
+        for (PullMonitor monitor : pullMonitors) {
+            (new Thread(monitor)).interrupt();
+        }
+
+        //todo fix this
+        for (PushMonitor monitor : pushMonitors) {
+            (new Thread(monitor)).interrupt();
+        }
+    }
+    /* getter setters for the private variables */
+
+    public List<PullMonitor> getPullMonitors() {
+        return pullMonitors;
+    }
+
+    public void setPullMonitors(List<PullMonitor> pullMonitors) {
+        this.pullMonitors = pullMonitors;
+    }
+
+    public List<PushMonitor> getPushMonitors() {
+        return pushMonitors;
+    }
+
+    public void setPushMonitors(List<PushMonitor> pushMonitors) {
+        this.pushMonitors = pushMonitors;
+    }
+
+    public BlockingQueue<UserMonitorData> getPullQueue() {
+        return pullQueue;
+    }
+
+    public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
+        this.pullQueue = pullQueue;
+    }
+
+    public MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
+
+    public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
+        this.monitorPublisher = monitorPublisher;
+    }
+
+    public BlockingQueue<MonitorID> getFinishQueue() {
+        return finishQueue;
+    }
+
+    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+        this.finishQueue = finishQueue;
+    }
+
+    public BlockingQueue<MonitorID> getPushQueue() {
+        return pushQueue;
+    }
+
+    public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
+        this.pushQueue = pushQueue;
+    }
+
+    public BlockingQueue<MonitorID> getLocalJobQueue() {
+        return localJobQueue;
+    }
+
+    public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
+        this.localJobQueue = localJobQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
new file mode 100644
index 0000000..c6d386e
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class TaskIdentity extends WorkflowNodeIdentity {
+	private String taskId;
+
+	public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
+		super(experimentId,workflowNodeId);
+		setTaskId(taskId);
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
new file mode 100644
index 0000000..022d17c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+    private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+    private String  userName;
+
+    private List<HostMonitorData> hostMonitorData;
+
+
+    public UserMonitorData(String userName) {
+        this.userName = userName;
+        hostMonitorData = new ArrayList<HostMonitorData>();
+    }
+
+    public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+        this.hostMonitorData = hostMonitorDataList;
+        this.userName = userName;
+    }
+
+    public List<HostMonitorData> getHostMonitorData() {
+        return hostMonitorData;
+    }
+
+    public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+        this.hostMonitorData = hostMonitorData;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /*
+    This method will add element to the MonitorID list, user should not
+    duplicate it, we do not check it because its going to be used by airavata
+    so we have to use carefully and this method will add a host if its a new host
+     */
+    public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+        this.hostMonitorData.add(hostMonitorData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
new file mode 100644
index 0000000..e569c52
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class WorkflowNodeIdentity extends ExperimentIdentity {
+	private String workflowNodeID;
+	public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
+		super(experimentId);
+		setWorkflowNodeID(workflowNodeId);
+	}
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..f19decf
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class ExperimentCancelRequest {
+	private String experimentId;
+
+	public ExperimentCancelRequest(String experimentId) {
+		this.experimentId = experimentId;
+	}
+
+	public String getExperimentId() {
+		return experimentId;
+	}
+
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..b45e01c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class TaskCancelRequest {
+	private String experimentId;
+	private String nodeId;
+	private String taskId;
+	
+	public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+		this.experimentId = experimentId;
+		this.setNodeId(nodeId);
+		this.taskId = taskId;
+	}
+	public String getExperimentId() {
+		return experimentId;
+	}
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+	public String getNodeId() {
+		return nodeId;
+	}
+	public void setNodeId(String nodeId) {
+		this.nodeId = nodeId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
new file mode 100644
index 0000000..1f85921
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the abstract Monitor which needs to be used by
+ * any Monitoring implementation which expect to consume
+ * to store the status to registry. Because they have to
+ * use the MonitorPublisher to publish the monitoring statuses
+ * to the Event Bus. All the Monitor statuses publish to the eventbus
+ * will be saved to the Registry.
+ */
+public abstract class AiravataAbstractMonitor implements Monitor {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
+    protected MonitorPublisher publisher;
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
new file mode 100644
index 0000000..a003f55
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+    /**
+     * This method is to implement how to parse the incoming message
+     * and implement a logic to finalize the status of the job,
+     * we have to makesure the correct message is given to the messageparser
+     * parse method, it will not do any filtering
+     * @param message content of the message
+     * @return
+     */
+    JobState parseMessage(String message)throws AiravataMonitorException;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
new file mode 100644
index 0000000..d2fede5
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+
+/**
+ * This is the primary interface for Monitors,
+ * This can be used to implement different methods of monitoring
+ */
+public interface Monitor extends Runnable {
+
+}