You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/05/21 19:56:19 UTC

[05/51] [abbrv] Separating gfac-monitoring implementation

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
index a12bf5d..6a92e53 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
@@ -30,13 +30,13 @@ import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.notification.events.StartExecutionEvent;
+import org.apache.airavata.gfac.provider.AbstractProvider;
 import org.apache.airavata.gfac.provider.GFacProviderException;
 import org.apache.airavata.gfac.provider.utils.ProviderUtils;
 import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gfac.utils.InputStreamToFileWriter;
 import org.apache.airavata.gfac.utils.InputUtils;
 import org.apache.airavata.gfac.utils.OutputUtils;
-import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
-public class LocalProvider extends AbstractProvider{
+public class LocalProvider extends AbstractProvider {
     private static final Logger log = LoggerFactory.getLogger(LocalProvider.class);
     private ProcessBuilder builder;
     private List<String> cmdList;
@@ -123,13 +123,12 @@ public class LocalProvider extends AbstractProvider{
                  getApplicationContext().getApplicationDeploymentDescription().getType();
         JobDetails jobDetails = new JobDetails();
         try {
-        	jobId= jobExecutionContext.getTaskData().getTaskID();
+        	jobId = jobExecutionContext.getTaskData().getTaskID();
             jobDetails.setJobID(jobId);
             jobDetails.setJobDescription(app.toString());
             jobExecutionContext.setJobDetails(jobDetails);
-            JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null);
-            jobDetails.setJobDescription(jobDescriptor.toXML());
-            GFacUtils.saveJobStatus(jobDetails, JobState.SETUP, jobExecutionContext.getTaskData().getTaskID());
+            jobDetails.setJobDescription(app.toString());
+            GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP);
         	// running cmd
             Process process = builder.start();
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index c33a8bc..bade5f0 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -26,15 +26,18 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.ApplicationContext;
 import org.apache.airavata.gfac.context.JobExecutionContext;
 import org.apache.airavata.gfac.context.MessageContext;
-import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.gfac.handler.LocalDirectorySetupHandler;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.provider.impl.LocalProvider;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.InputParameterType;
 import org.apache.airavata.schemas.gfac.OutputParameterType;
 import org.apache.airavata.schemas.gfac.StringParameterType;
 import org.apache.commons.lang.SystemUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
 
 import java.io.File;
 import java.net.URL;
@@ -43,7 +46,7 @@ import java.util.List;
 
 public class LocalProviderTest {
     private JobExecutionContext jobExecutionContext;
-    @Before
+    @BeforeTest
     public void setUp() throws Exception {
 
         URL resource = this.getClass().getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
@@ -133,18 +136,37 @@ public class LocalProviderTest {
 
         MessageContext outMessage = new MessageContext();
         ActualParameter echo_out = new ActualParameter();
-//		((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
         outMessage.addParameter("echo_output", echo_out);
 
         jobExecutionContext.setOutMessageContext(outMessage);
 
+        jobExecutionContext.setExperimentID("test123");
+        jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID()));
+        jobExecutionContext.setRegistry(new LoggingRegistryImpl());
+
+
+    }
+
+    @Test
+    public void testLocalDirectorySetupHandler() throws GFacException {
+        LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
+        localDirectorySetupHandler.invoke(jobExecutionContext);
+
+        ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+        ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType();
+        junit.framework.Assert.assertTrue(new File(app.getStaticWorkingDirectory()).exists());
+        junit.framework.Assert.assertTrue(new File(app.getScratchWorkingDirectory()).exists());
+        junit.framework.Assert.assertTrue(new File(app.getInputDataDirectory()).exists());
+        junit.framework.Assert.assertTrue(new File(app.getOutputDataDirectory()).exists());
     }
 
     @Test
-    public void testLocalProvider() throws GFacException {
-        GFacImpl gFacAPI = new GFacImpl();
-        gFacAPI.submitJob(jobExecutionContext);
-        MessageContext outMessageContext = jobExecutionContext.getOutMessageContext();
-        Assert.assertEquals(MappingFactory.toString((ActualParameter)outMessageContext.getParameter("echo_output")), "hello");
+    public void testLocalProvider() throws GFacException,GFacProviderException{
+        LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
+        localDirectorySetupHandler.invoke(jobExecutionContext);
+        LocalProvider localProvider = new LocalProvider();
+        localProvider.initialize(jobExecutionContext);
+        localProvider.execute(jobExecutionContext);
+        localProvider.dispose(jobExecutionContext);
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
index 61dca4f..f16460f 100644
--- a/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
+++ b/modules/gfac/gfac-local/src/test/resources/gfac-config.xml
@@ -24,67 +24,4 @@
             <Handler class="org.apache.airavata.gfac.handler.LocalDirectorySetupHandler"/>
         </InHandlers>
     </Provider>
-    <Provider class="org.apache.airavata.gfac.provider.impl.GramProvider" host="org.apache.airavata.schemas.gfac.impl.GlobusHostTypeImpl">
-        <property name="name" value="value"/>
-        <InHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler">
-                    <property name="name" value="value"/>
-            </Handler>
-            <Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
-        </InHandlers>
-        <OutHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.GridFTPOutputHandler"/>
-        </OutHandlers>
-    </Provider>
-      <Provider class="org.apache.airavata.gfac.provider.impl.BESProvider" host="org.apache.airavata.schemas.gfac.impl.UnicoreHostTypeImpl">
-        <InHandlers>
-        	<Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
-        </InHandlers>
-        <OutHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.GridFTPOutputHandler"/>
-        </OutHandlers>
-    </Provider>
-
-    <Provider class="org.apache.airavata.gfac.ec2.EC2Provider" host="org.apache.airavata.schemas.gfac.impl.Ec2HostTypeImpl">
-        <InHandlers/>
-        <OutHandlers/>
-    </Provider>
-
-    <Provider class="org.apache.airavata.gfac.provider.impl.HadoopProvider" host="org.apache.airavata.schemas.gfac.impl.HadoopHostTypeImpl">
-        <InHandlers>
-        	<Handler class="org.apache.airavata.gfac.handler.HadoopDeploymentHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.HDFSDataMovementHandler"/>
-        </InHandlers>
-        <OutHandlers/>
-    </Provider>
-
-    <Application name="UltraScan">
-        <InHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.GramDirectorySetupHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.GridFTPInputHandler"/>
-        </InHandlers>
-        <OutHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.GridFTPOutputHandler"/>
-        </OutHandlers>
-    </Application>
-
-     <Provider class="org.apache.airavata.gfac.provider.impl.SSHProvider" host="org.apache.airavata.schemas.gfac.impl.SSHHostTypeImpl">
-         <InHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
-        </InHandlers>
-        <OutHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
-        </OutHandlers>
-    </Provider>
-     <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl">
-         <InHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
-        </InHandlers>
-        <OutHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
-        </OutHandlers>
-    </Provider>
 </GFac>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/pom.xml b/modules/gfac/gfac-monitor/pom.xml
new file mode 100644
index 0000000..8991dcd
--- /dev/null
+++ b/modules/gfac/gfac-monitor/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file 
+    distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under 
+    the Apache License, Version 2.0 (theƏ "License"); you may not use this file except in compliance with the License. You may 
+    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to 
+    in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
+    ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
+    the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.airavata</groupId>
+        <artifactId>gfac</artifactId>
+        <version>0.12-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>airavata-gfac-hpc-monitor</artifactId>
+    <name>Airavata GFac Grid Job Monitor</name>
+    <description>The Grid related monitoring implementation</description>
+    <url>http://airavata.apache.org/</url>
+
+    <dependencies>
+        <!-- Logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <!-- GFAC schemas -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-client-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+	<dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-gfac-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-workflow-execution-context</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-registry-cpi</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-jpa-registry</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- Workflow Tracking -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-workflow-tracking</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- Credential Store -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-credential-store</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+
+        <!-- Test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>6.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-server-configuration</artifactId>
+	    <scope>test</scope>
+        </dependency>
+	    <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-client-configuration</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Guava -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>12.0</version>
+        </dependency>
+        <!-- gsi-ssh api dependencies -->
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>gsissh</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-data-models</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.50</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ogce</groupId>
+            <artifactId>bcgss</artifactId>
+            <version>146</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.xmlbeans</groupId>
+            <artifactId>xmlbeans</artifactId>
+            <version>${xmlbeans.version}</version>
+        </dependency>
+        <!-- this is the dependency for amqp implementation -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.0.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skip>false</skip>
+                    <forkMode>always</forkMode>
+                    <failIfNoTests>false</failIfNoTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <version>0.4.0</version>
+                <configuration>
+                    <sourceDirectory>${basedir}/src/main/resources/schema</sourceDirectory>
+                    <targetPackage>org.apache.airavata</targetPackage>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
new file mode 100644
index 0000000..63f89df
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AbstractActivityListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+
+public interface AbstractActivityListener {
+	public void setup(Object... configurations);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
new file mode 100644
index 0000000..a70b14f
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataExperimentStatusUpdator.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
+        ExperimentState state = experimentStatus.getState();
+        if (state != null) {
+            try {
+                String experimentID = experimentStatus.getIdentity().getExperimentID();
+                updateExperimentStatus(experimentID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+
+    public  void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
+    	Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+        if(details == null) {
+            details = new Experiment();
+            details.setExperimentID(experimentId);
+        }
+        org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+        status.setExperimentState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setExperimentStatus(status);
+        airavataRegistry.update(DataType.EXPERIMENT, details, experimentId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
new file mode 100644
index 0000000..99c8733
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataJobStatusUpdator.java
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.concurrent.BlockingQueue;
+
+public class AiravataJobStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+
+    private BlockingQueue<MonitorID> jobsToMonitor;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    public BlockingQueue<MonitorID> getJobsToMonitor() {
+        return jobsToMonitor;
+    }
+
+    public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) {
+        this.jobsToMonitor = jobsToMonitor;
+    }
+
+    @Subscribe
+    public void updateRegistry(JobStatusChangeRequest jobStatus) {
+        /* Here we need to parse the jobStatus message and update
+                the registry accordingly, for now we are just printing to standard Out
+                 */
+        JobState state = jobStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = jobStatus.getIdentity().getTaskId();
+                String jobID = jobStatus.getIdentity().getJobId();
+                updateJobStatus(taskID, jobID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+            logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
+            switch (state) {
+                case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
+                    break;
+			default:
+				break;
+            }
+        }
+    }
+
+    @Subscribe
+    public void setupTaskStatus(JobStatusChangeRequest jobStatus){
+    	TaskState state=TaskState.UNKNOWN;
+    	switch(jobStatus.getState()){
+    	case ACTIVE:
+    		state=TaskState.EXECUTING; break;
+    	case CANCELED:
+    		state=TaskState.CANCELED; break;
+    	case COMPLETE:
+    		state=TaskState.COMPLETED; break;
+    	case FAILED:
+    		state=TaskState.FAILED; break;
+    	case HELD: case SUSPENDED: case QUEUED:
+    		state=TaskState.WAITING; break;
+    	case SETUP:
+    		state=TaskState.PRE_PROCESSING; break;
+    	case SUBMITTED:
+    		state=TaskState.STARTED; break;
+    	case UN_SUBMITTED:
+    		state=TaskState.CANCELED; break;
+    	case CANCELING:
+    		state=TaskState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Task Status "+state.toString());
+    	monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
+    }
+
+    public  void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
+        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+        JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
+        if(details == null) {
+            details = new JobDetails();
+        }
+        org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus();
+        status.setJobState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setJobStatus(status);
+        details.setJobID(jobID);
+        airavataRegistry.update(DataType.JOB_DETAIL, details, ids);
+    }
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof BlockingQueue<?>){
+				this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
new file mode 100644
index 0000000..e8dd7a0
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataTaskStatusUpdator.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataTaskStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+    
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(TaskStatusChangeRequest taskStatus) {
+        TaskState state = taskStatus.getState();
+        if (state != null) {
+            try {
+                String taskID = taskStatus.getIdentity().getTaskId();
+                updateTaskStatus(taskID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+    
+    @Subscribe
+    public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
+    	WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+    	switch(taskStatus.getState()){
+    	case CANCELED:
+    		state=WorkflowNodeState.CANCELED; break;
+    	case COMPLETED:
+    		state=WorkflowNodeState.COMPLETED; break;
+    	case CONFIGURING_WORKSPACE:
+    		state=WorkflowNodeState.INVOKED; break;
+    	case FAILED:
+    		state=WorkflowNodeState.FAILED; break;
+    	case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+    		state=WorkflowNodeState.EXECUTING; break;
+    	case STARTED:
+    		state=WorkflowNodeState.INVOKED; break;
+    	case CANCELING:
+    		state=WorkflowNodeState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Experiment Status "+state.toString());
+    	monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
+    }
+    
+    public  void updateTaskStatus(String taskId, TaskState state) throws Exception {
+    	TaskDetails details = (TaskDetails)airavataRegistry.get(DataType.TASK_DETAIL, taskId);
+        if(details == null) {
+            details = new TaskDetails();
+            details.setTaskID(taskId);
+        }
+        org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus();
+        status.setExecutionState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setTaskStatus(status);
+        airavataRegistry.update(DataType.TASK_DETAIL, details, taskId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
new file mode 100644
index 0000000..2375d72
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+
+public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+    private Registry airavataRegistry;
+
+    private MonitorPublisher monitorPublisher;
+
+    public Registry getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public void setAiravataRegistry(Registry airavataRegistry) {
+        this.airavataRegistry = airavataRegistry;
+    }
+
+    @Subscribe
+    public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
+        WorkflowNodeState state = workflowNodeStatus.getState();
+        if (state != null) {
+            try {
+                String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
+                updateWorkflowNodeStatus(workflowNodeID, state);
+            } catch (Exception e) {
+                logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+
+    @Subscribe
+    public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){
+    	ExperimentState state=ExperimentState.UNKNOWN;
+    	switch(nodeStatus.getState()){
+    	case CANCELED:
+    		state=ExperimentState.CANCELED; break;
+    	case COMPLETED:
+    		state=ExperimentState.COMPLETED; break;
+    	case INVOKED:
+    		state=ExperimentState.LAUNCHED; break;
+    	case FAILED:
+    		state=ExperimentState.FAILED; break;
+    	case EXECUTING:
+    		state=ExperimentState.EXECUTING; break;
+    	case CANCELING:
+    		state=ExperimentState.CANCELING; break;
+		default:
+			break;
+    	}
+    	logger.debug("Publishing Experiment Status "+state.toString());
+    	monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state));
+    }
+
+    public  void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
+    	WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+        if(details == null) {
+            details = new WorkflowNodeDetails();
+            details.setNodeInstanceId(workflowNodeId);
+        }
+        WorkflowNodeStatus status = new WorkflowNodeStatus();
+        status.setWorkflowNodeState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+        details.setWorkflowNodeStatus(status);
+        airavataRegistry.update(DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId);
+    }
+
+	@Override
+	public void setup(Object... configurations) {
+		for (Object configuration : configurations) {
+			if (configuration instanceof Registry){
+				this.airavataRegistry=(Registry)configuration;
+			} else if (configuration instanceof MonitorPublisher){
+				this.monitorPublisher=(MonitorPublisher) configuration;
+			} 
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
new file mode 100644
index 0000000..ba8efeb
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/ExperimentIdentity.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class ExperimentIdentity {
+	private String experimentID;
+	public ExperimentIdentity(String experimentId) {
+		setExperimentID(experimentId);
+	}
+	public String getExperimentID() {
+		return experimentID;
+	}
+
+	public void setExperimentID(String experimentID) {
+		this.experimentID = experimentID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
new file mode 100644
index 0000000..e57087d
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+    private HostDescription host;
+
+    private List<MonitorID> monitorIDs;
+
+    public HostMonitorData(HostDescription host) {
+        this.host = host;
+        monitorIDs = new ArrayList<MonitorID>();
+    }
+
+    public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) {
+        this.host = host;
+        this.monitorIDs = monitorIDs;
+    }
+
+    public HostDescription getHost() {
+        return host;
+    }
+
+    public void setHost(HostDescription host) {
+        this.host = host;
+    }
+
+    public List<MonitorID> getMonitorIDs() {
+        return monitorIDs;
+    }
+
+    public void setMonitorIDs(List<MonitorID> monitorIDs) {
+        this.monitorIDs = monitorIDs;
+    }
+
+    /**
+     * this method get called by CommonUtils and it will check the right place before adding
+     * so there will not be a mismatch between this.host and monitorID.host
+     * @param monitorID
+     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+     */
+    public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException {
+        monitorIDs.add(monitorID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
new file mode 100644
index 0000000..84c4a55
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/JobIdentity.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class JobIdentity extends TaskIdentity {
+	private String jobId;
+	
+	public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
+		super(experimentId,workflowNodeId,taskId);
+		setJobId(jobId);
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+
+	public void setJobId(String jobId) {
+		this.jobId = jobId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
new file mode 100644
index 0000000..e21160a
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/MonitorID.java
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Map;
+
+/*
+This is the object which contains the data to identify a particular
+Job to start the monitoring
+*/
+public class MonitorID {
+    private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
+
+    private String userName;
+
+    private Timestamp jobStartedTime;
+
+    private Timestamp lastMonitored;
+
+    private HostDescription host;
+
+    private AuthenticationInfo authenticationInfo = null;
+
+    private Map<String, Object> parameters;
+
+    private String experimentID;
+
+    private String workflowNodeID;
+
+    private String taskID;
+
+    private String jobID;
+
+    private int failedCount = 0;
+
+    private JobState state;
+
+    private JobExecutionContext jobExecutionContext;
+
+    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
+        this.host = host;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.userName = userName;
+        this.jobID = jobID;
+        this.taskID = taskID;
+        this.experimentID = experimentID;
+        this.workflowNodeID = workflowNodeID;
+    }
+
+    public MonitorID(AuthenticationInfo authenticationInfo, JobExecutionContext jobExecutionContext) {
+        this.authenticationInfo = authenticationInfo;
+        this.jobExecutionContext = jobExecutionContext;
+        host = jobExecutionContext.getApplicationContext().getHostDescription();
+        userName = jobExecutionContext.getExperiment().getUserName();
+        jobID = jobExecutionContext.getJobDetails().getJobID();
+        taskID = jobExecutionContext.getTaskData().getTaskID();
+        experimentID = jobExecutionContext.getExperiment().getExperimentID();
+        workflowNodeID =  jobExecutionContext.getExperiment().getWorkflowNodeDetailsList().get(0).getNodeInstanceId();// at this point we only have one node todo: fix this
+
+    }
+
+    public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
+        this.host = host;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.authenticationInfo = authenticationInfo;
+        this.userName = userName;
+        // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user
+        if(this.authenticationInfo != null){
+            if(this.authenticationInfo instanceof MyProxyAuthenticationInfo){
+                this.userName = ((MyProxyAuthenticationInfo)this.authenticationInfo).getUserName();
+            }
+        }
+        this.workflowNodeID = workflowNodeID;
+        this.jobID = jobID;
+        this.taskID = taskID;
+        this.experimentID = experimentID;
+    }
+    public HostDescription getHost() {
+        return host;
+    }
+
+    public void setHost(HostDescription host) {
+        this.host = host;
+    }
+
+    public Timestamp getLastMonitored() {
+        return lastMonitored;
+    }
+
+    public void setLastMonitored(Timestamp lastMonitored) {
+        this.lastMonitored = lastMonitored;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getJobID() {
+        return jobID;
+    }
+
+    public void setJobID(String jobID) {
+        this.jobID = jobID;
+    }
+
+    public Timestamp getJobStartedTime() {
+        return jobStartedTime;
+    }
+
+    public void setJobStartedTime(Timestamp jobStartedTime) {
+        this.jobStartedTime = jobStartedTime;
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+
+    public void addParameter(String key,Object value) {
+        this.parameters.put(key, value);
+    }
+
+    public Object getParameter(String key) {
+        return this.parameters.get(key);
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+    }
+
+    public String getExperimentID() {
+        return experimentID;
+    }
+
+    public void setExperimentID(String experimentID) {
+        this.experimentID = experimentID;
+    }
+
+    public String getTaskID() {
+        return taskID;
+    }
+
+    public void setTaskID(String taskID) {
+        this.taskID = taskID;
+    }
+
+    public int getFailedCount() {
+        return failedCount;
+    }
+
+    public void setFailedCount(int failedCount) {
+        this.failedCount = failedCount;
+    }
+
+    public JobState getStatus() {
+        return state;
+    }
+
+    public void setStatus(JobState status) {
+        // this logic is going to be useful for fast finishing jobs
+        // because in some machines job state vanishes quicckly when the job is done
+        // during that case job state comes as unknown.so we handle it here.
+            if (this.state != null && status.equals(JobState.UNKNOWN)) {
+                if (getFailedCount() > 2) {
+                    switch (this.state) {
+                        case ACTIVE:
+                            this.state = JobState.COMPLETE;
+                            break;
+                        case QUEUED:
+                            this.state = JobState.COMPLETE;
+                            break;
+                    }
+                } else {
+                    try {
+                        // when state becomes unknown we sleep for a while
+                        Thread.sleep(10000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                    setFailedCount(getFailedCount() + 1);
+                }
+            } else {
+                // normal scenario
+                this.state = status;
+            }
+    }
+
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+
+    public JobExecutionContext getJobExecutionContext() {
+        return jobExecutionContext;
+    }
+
+    public void setJobExecutionContext(JobExecutionContext jobExecutionContext) {
+        this.jobExecutionContext = jobExecutionContext;
+    }
+
+    //	public String getWorkflowNodeID() {
+//		return workflowNodeID;
+//	}
+//
+//	public void setWorkflowNodeID(String workflowNodeID) {
+//		this.workflowNodeID = workflowNodeID;
+//	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
new file mode 100644
index 0000000..c6d386e
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/TaskIdentity.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class TaskIdentity extends WorkflowNodeIdentity {
+	private String taskId;
+
+	public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
+		super(experimentId,workflowNodeId);
+		setTaskId(taskId);
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
new file mode 100644
index 0000000..022d17c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+    private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+    private String  userName;
+
+    private List<HostMonitorData> hostMonitorData;
+
+
+    public UserMonitorData(String userName) {
+        this.userName = userName;
+        hostMonitorData = new ArrayList<HostMonitorData>();
+    }
+
+    public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+        this.hostMonitorData = hostMonitorDataList;
+        this.userName = userName;
+    }
+
+    public List<HostMonitorData> getHostMonitorData() {
+        return hostMonitorData;
+    }
+
+    public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+        this.hostMonitorData = hostMonitorData;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /*
+    This method will add element to the MonitorID list, user should not
+    duplicate it, we do not check it because its going to be used by airavata
+    so we have to use carefully and this method will add a host if its a new host
+     */
+    public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+        this.hostMonitorData.add(hostMonitorData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
new file mode 100644
index 0000000..e569c52
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/WorkflowNodeIdentity.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor;
+
+public class WorkflowNodeIdentity extends ExperimentIdentity {
+	private String workflowNodeID;
+	public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
+		super(experimentId);
+		setWorkflowNodeID(workflowNodeId);
+	}
+	public String getWorkflowNodeID() {
+		return workflowNodeID;
+	}
+
+	public void setWorkflowNodeID(String workflowNodeID) {
+		this.workflowNodeID = workflowNodeID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
new file mode 100644
index 0000000..f19decf
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/ExperimentCancelRequest.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class ExperimentCancelRequest {
+	private String experimentId;
+
+	public ExperimentCancelRequest(String experimentId) {
+		this.experimentId = experimentId;
+	}
+
+	public String getExperimentId() {
+		return experimentId;
+	}
+
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
new file mode 100644
index 0000000..b45e01c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/command/TaskCancelRequest.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.command;
+
+public class TaskCancelRequest {
+	private String experimentId;
+	private String nodeId;
+	private String taskId;
+	
+	public TaskCancelRequest(String experimentId, String nodeId, String taskId) {
+		this.experimentId = experimentId;
+		this.setNodeId(nodeId);
+		this.taskId = taskId;
+	}
+	public String getExperimentId() {
+		return experimentId;
+	}
+	public void setExperimentId(String experimentId) {
+		this.experimentId = experimentId;
+	}
+	public String getTaskId() {
+		return taskId;
+	}
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+	public String getNodeId() {
+		return nodeId;
+	}
+	public void setNodeId(String nodeId) {
+		this.nodeId = nodeId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
new file mode 100644
index 0000000..da6baf8
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the abstract Monitor which needs to be used by
+ * any Monitoring implementation which expect nto consume
+ * to store the status to registry. Because they have to
+ * use the MonitorPublisher to publish the monitoring statuses
+ * to the Event Bus. All the Monitor statuses publish to the eventbus
+ * will be saved to the Registry.
+ */
+public abstract class AiravataAbstractMonitor implements Monitor {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataAbstractMonitor.class);
+    protected MonitorPublisher publisher;
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
new file mode 100644
index 0000000..a003f55
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+    /**
+     * This method is to implement how to parse the incoming message
+     * and implement a logic to finalize the status of the job,
+     * we have to makesure the correct message is given to the messageparser
+     * parse method, it will not do any filtering
+     * @param message content of the message
+     * @return
+     */
+    JobState parseMessage(String message)throws AiravataMonitorException;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
new file mode 100644
index 0000000..614d606
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+
+/**
+ * This is the primary interface for Monitors,
+ * This can be used to implement different methods of monitoring
+ */
+public interface Monitor {
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
new file mode 100644
index 0000000..efdf89c
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PullMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This will allow users to program Pull monitors separately
+ */
+public abstract class PullMonitor extends AiravataAbstractMonitor {
+
+    private int pollingFrequence;
+    /**
+     * This method will can invoke when PullMonitor needs to start
+     * and it has to invoke in the frequency specified below,
+     * @return if the start process is successful return true else false
+     */
+    public abstract boolean startPulling() throws AiravataMonitorException;
+
+    /**
+     * This is the method to stop the polling process
+     * @return if the stopping process is successful return true else false
+     */
+    public abstract boolean stopPulling()throws AiravataMonitorException;
+
+    /**
+     * this method can be used to set the polling frequencey or otherwise
+     * can implement a polling mechanism, and implement how to do
+     * @param frequence
+     */
+    public void setPollingFrequence(int frequence){
+        this.pollingFrequence = frequence;
+    }
+
+    /**
+     * this method can be used to get the polling frequencey or otherwise
+     * can implement a polling mechanism, and implement how to do
+     * @return
+     */
+    public int getPollingFrequence(){
+        return this.pollingFrequence;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
new file mode 100644
index 0000000..8e13252
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PushMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This interface will allow users to program Push monitors separately
+ */
+public abstract class PushMonitor extends AiravataAbstractMonitor {
+    /**
+     * This method can be invoked to register a listener with the
+     * remote monitoring system, ideally inside this method users will be
+     * writing some client listener code for the remote monitoring system,
+     * this will be a simple wrapper around any client for the remote Monitor.
+     * @param monitorID
+     * @return
+     */
+    public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+
+    /**
+     * This method can be invoked to unregister a listener with the
+     * remote monitoring system, ideally inside this method users will be
+     * writing some client listener code for the remote monitoring system,
+     * this will be a simple wrapper around any client for the remote Monitor.
+     * @param monitorID
+     * @return
+     */
+    public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
+
+    /**
+     * This can be used to stop the registration thread
+     * @return
+     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+     */
+    public abstract boolean stopRegister()throws AiravataMonitorException;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
new file mode 100644
index 0000000..52487fe
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.event;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MonitorPublisher{
+    private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
+    private EventBus eventBus;
+    
+    public MonitorPublisher(EventBus eventBus) {
+        this.eventBus = eventBus;
+    }
+
+    public void registerListener(Object listener) {
+        eventBus.register(listener);
+    }
+    
+    public void unregisterListener(Object listener) {
+        eventBus.unregister(listener);
+    }
+
+    public void publish(Object o) {
+        eventBus.post(o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
new file mode 100644
index 0000000..3acef66
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.exception;
+
+public class AiravataMonitorException extends Exception {
+    private static final long serialVersionUID = -2849422320139467602L;
+
+    public AiravataMonitorException(Throwable e) {
+        super(e);
+    }
+
+    public AiravataMonitorException(String message) {
+        super(message, null);
+    }
+
+    public AiravataMonitorException(String message, Throwable e) {
+        super(message, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
new file mode 100644
index 0000000..b3420b8
--- /dev/null
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.handlers;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.handler.GFacHandlerException;
+import org.apache.airavata.gfac.handler.ThreadedHandler;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * this handler is responsible for monitoring jobs in a pull mode
+ * and currently this support multiple pull monitoring in grid resource and uses
+ * commands like qstat,squeue and this supports sun grid enging monitoring too
+ * which is a slight variation of qstat monitoring.
+ */
+public class GridPullMonitorHandler extends ThreadedHandler {
+    private final static Logger logger = LoggerFactory.getLogger(GridPullMonitorHandler.class);
+
+    private HPCPullMonitor hpcPullMonitor;
+
+    private AuthenticationInfo authenticationInfo;
+
+    public void initProperties(Map<String, String> properties) throws GFacHandlerException {
+        String myProxyUser = null;
+        try {
+            myProxyUser = ServerSettings.getSetting("myproxy.username");
+            String myProxyPass = ServerSettings.getSetting("myproxy.password");
+            String certPath = ServerSettings.getSetting("trusted.cert.location");
+            String myProxyServer = ServerSettings.getSetting("myproxy.server");
+            setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
+                    7512, 17280000, certPath));
+            hpcPullMonitor = new HPCPullMonitor();
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+    }
+
+    public void run() {
+        hpcPullMonitor.run();
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
+        super.invoke(jobExecutionContext);
+        MonitorID monitorID = new MonitorID(authenticationInfo, jobExecutionContext);
+        try {
+            CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
+        } catch (AiravataMonitorException e) {
+            logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID());
+        }
+    }
+
+    public AuthenticationInfo getAuthenticationInfo() {
+        return authenticationInfo;
+    }
+
+    public HPCPullMonitor getHpcPullMonitor() {
+        return hpcPullMonitor;
+    }
+
+    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
+        this.authenticationInfo = authenticationInfo;
+    }
+
+    public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) {
+        this.hpcPullMonitor = hpcPullMonitor;
+    }
+}