You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/04/23 21:42:34 UTC

[7/8] merging monitoring with gfac-core, later this will be separated

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
new file mode 100644
index 0000000..efdf89c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PullMonitor.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PullMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This will allow users to program Pull monitors separately
+ */
+public abstract class PullMonitor extends AiravataAbstractMonitor {
+
+    private int pollingFrequence;
+    /**
+     * This method will can invoke when PullMonitor needs to start
+     * and it has to invoke in the frequency specified below,
+     * @return if the start process is successful return true else false
+     */
+    public abstract boolean startPulling() throws AiravataMonitorException;
+
+    /**
+     * This is the method to stop the polling process
+     * @return if the stopping process is successful return true else false
+     */
+    public abstract boolean stopPulling()throws AiravataMonitorException;
+
+    /**
+     * this method can be used to set the polling frequencey or otherwise
+     * can implement a polling mechanism, and implement how to do
+     * @param frequence
+     */
+    public void setPollingFrequence(int frequence){
+        this.pollingFrequence = frequence;
+    }
+
+    /**
+     * this method can be used to get the polling frequencey or otherwise
+     * can implement a polling mechanism, and implement how to do
+     * @return
+     */
+    public int getPollingFrequence(){
+        return this.pollingFrequence;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
new file mode 100644
index 0000000..8e13252
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/core/PushMonitor.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.core;
+
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+
+/**
+ * PushMonitors can implement this interface
+ * Since the pull and push based monitoring required different
+ * operations, PullMonitor will be useful.
+ * This interface will allow users to program Push monitors separately
+ */
+public abstract class PushMonitor extends AiravataAbstractMonitor {
+    /**
+     * This method can be invoked to register a listener with the
+     * remote monitoring system, ideally inside this method users will be
+     * writing some client listener code for the remote monitoring system,
+     * this will be a simple wrapper around any client for the remote Monitor.
+     * @param monitorID
+     * @return
+     */
+    public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+
+    /**
+     * This method can be invoked to unregister a listener with the
+     * remote monitoring system, ideally inside this method users will be
+     * writing some client listener code for the remote monitoring system,
+     * this will be a simple wrapper around any client for the remote Monitor.
+     * @param monitorID
+     * @return
+     */
+    public abstract boolean unRegisterListener(MonitorID monitorID)throws AiravataMonitorException;
+
+    /**
+     * This can be used to stop the registration thread
+     * @return
+     * @throws org.apache.airavata.gfac.monitor.exception.AiravataMonitorException
+     */
+    public abstract boolean stopRegister()throws AiravataMonitorException;
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
new file mode 100644
index 0000000..52487fe
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/event/MonitorPublisher.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.event;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MonitorPublisher{
+    private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
+    private EventBus eventBus;
+    
+    public MonitorPublisher(EventBus eventBus) {
+        this.eventBus = eventBus;
+    }
+
+    public void registerListener(Object listener) {
+        eventBus.register(listener);
+    }
+    
+    public void unregisterListener(Object listener) {
+        eventBus.unregister(listener);
+    }
+
+    public void publish(Object o) {
+        eventBus.post(o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
new file mode 100644
index 0000000..3acef66
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/exception/AiravataMonitorException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.exception;
+
+public class AiravataMonitorException extends Exception {
+    private static final long serialVersionUID = -2849422320139467602L;
+
+    public AiravataMonitorException(Throwable e) {
+        super(e);
+    }
+
+    public AiravataMonitorException(String message) {
+        super(message, null);
+    }
+
+    public AiravataMonitorException(String message, Throwable e) {
+        super(message, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
new file mode 100644
index 0000000..a64b484
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.JobIdentity;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This monitor can be used to monitor a job which runs locally,
+ * Since its a local job job doesn't have states, once it get executed
+ * then the job starts running
+ */
+public class LocalJobMonitor extends AiravataAbstractMonitor {
+    // Though we have a qeuue here, it not going to be used in local jobs
+    BlockingQueue<MonitorID> jobQueue;
+
+    public void run() {
+        do {
+            try {
+                MonitorID take = jobQueue.take();
+                getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        } while (!ServerSettings.isStopAllThreads());
+    }
+
+    public BlockingQueue<MonitorID> getJobQueue() {
+        return jobQueue;
+    }
+
+    public void setJobQueue(BlockingQueue<MonitorID> jobQueue) {
+        this.jobQueue = jobQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
new file mode 100644
index 0000000..4e2e18e
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/QstatMonitor.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.core.PullMonitor;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This monitor is based on qstat command which can be run
+ * in grid resources and retrieve the job status.
+ */
+public class QstatMonitor extends PullMonitor {
+    private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
+
+    // I think this should use DelayedBlocking Queue to do the monitoring*/
+    private BlockingQueue<UserMonitorData> queue;
+
+    private boolean startPulling = false;
+
+    private Map<String, ResourceConnection> connections;
+
+    private MonitorPublisher publisher;
+
+    public QstatMonitor(){
+        connections = new HashMap<String, ResourceConnection>();
+    }
+    public QstatMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
+        this.queue = queue;
+        this.publisher = publisher;
+        connections = new HashMap<String, ResourceConnection>();
+    }
+
+    public void run() {
+        /* implement a logic to pick each monitorID object from the queue and do the
+        monitoring
+         */
+        this.startPulling = true;
+        while (this.startPulling && !ServerSettings.isStopAllThreads()) {
+            try {
+                startPulling();
+                // After finishing one iteration of the full queue this thread sleeps 1 second
+                Thread.sleep(10000);
+            } catch (Exception e){
+                // we catch all the exceptions here because no matter what happens we do not stop running this
+                // thread, but ideally we should report proper error messages, but this is handled in startPulling
+                // method, incase something happen in Thread.sleep we handle it with this catch block.
+                e.printStackTrace();
+                logger.error(e.getMessage());
+            }
+        }
+        // thread is going to return so we close all the connections
+        Iterator<String> iterator = connections.keySet().iterator();
+        while(iterator.hasNext()){
+            String next = iterator.next();
+            ResourceConnection resourceConnection = connections.get(next);
+            try {
+                resourceConnection.getCluster().disconnect();
+            } catch (SSHApiException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+    }
+
+    /**
+     * This method will can invoke when PullMonitor needs to start
+     * and it has to invoke in the frequency specified below,
+     *
+     * @return if the start process is successful return true else false
+     */
+    public boolean startPulling() throws AiravataMonitorException {
+        // take the top element in the queue and pull the data and put that element
+        // at the tail of the queue
+        //todo this polling will not work with multiple usernames but with single user
+        // and multiple hosts, currently monitoring will work
+        UserMonitorData take = null;
+        JobStatusChangeRequest jobStatus = new JobStatusChangeRequest();
+        MonitorID currentMonitorID = null;
+        HostDescription currentHostDescription = null;
+        try {
+            take = this.queue.take();
+            List<MonitorID> completedJobs = new ArrayList<MonitorID>();
+            List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
+            for (HostMonitorData iHostMonitorData : hostMonitorData) {
+                if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) {
+                    currentHostDescription = iHostMonitorData.getHost();
+                    GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType();
+                    String hostName = gsisshHostType.getHostAddress();
+                    ResourceConnection connection = null;
+                    if (connections.containsKey(hostName)) {
+                        logger.debug("We already have this connection so not going to create one");
+                        connection = connections.get(hostName);
+                    } else {
+                        connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath());
+                        connections.put(hostName, connection);
+                    }
+                    List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+                    Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID);
+                    for (MonitorID iMonitorID : monitorID) {
+                        currentMonitorID = iMonitorID;
+                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
+                        jobStatus.setMonitorID(iMonitorID);
+                        jobStatus.setState(iMonitorID.getStatus());
+                        // we have this JobStatus class to handle amqp monitoring
+
+                        publisher.publish(jobStatus);
+                        // if the job is completed we do not have to put the job to the queue again
+                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+
+                        // After successful monitoring perform following actions to cleanup the queue, if necessary
+                        if (jobStatus.getState().equals(JobState.COMPLETE)) {
+                            completedJobs.add(iMonitorID);
+                        } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) {
+                            logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor");
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            completedJobs.add(iMonitorID);
+                        } else {
+                            // Evey
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            // if the job is complete we remove it from the Map, if any of these maps
+                            // get empty this userMonitorData will get delete from the queue
+                        }
+                    }
+                } else {
+                    logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
+                }
+            }
+            // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
+            // now the userMonitorData goes back to the tail of the queue
+            queue.put(take);
+            // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
+            // they become empty
+            for(MonitorID completedJob:completedJobs){
+                CommonUtils.removeMonitorFromQueue(queue, completedJob);
+            }
+        } catch (InterruptedException e) {
+            if (!this.queue.contains(take)) {
+                try {
+                    this.queue.put(take);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+            }
+            logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
+            throw new AiravataMonitorException(e);
+        } catch (SSHApiException e) {
+            logger.error(e.getMessage());
+            if (e.getMessage().contains("Unknown Job Id Error")) {
+                // in this case job is finished or may be the given job ID is wrong
+                jobStatus.setState(JobState.UNKNOWN);
+                publisher.publish(jobStatus);
+            } else if (e.getMessage().contains("illegally formed job identifier")) {
+                logger.error("Wrong job ID is given so dropping the job from monitoring system");
+            } else if (!this.queue.contains(take)) {   // we put the job back to the queue only if its state is not unknown
+                if (currentMonitorID == null) {
+                    logger.error("Monitoring the jobs failed, for user: " + take.getUserName()
+                            + " in Host: " + currentHostDescription.getType().getHostAddress());
+                } else {
+                    if (currentMonitorID != null) {
+                        if (currentMonitorID.getFailedCount() < 2) {
+                            try {
+                                currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
+                                this.queue.put(take);
+                            } catch (InterruptedException e1) {
+                                e1.printStackTrace();
+                            }
+                        } else {
+                            logger.error(e.getMessage());
+                            logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
+                        }
+                    }
+                }
+            }
+            throw new AiravataMonitorException("Error retrieving the job status", e);
+        } catch (Exception e) {
+            if (currentMonitorID != null) {
+                if (currentMonitorID.getFailedCount() < 3) {
+                    try {
+                        currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
+                        this.queue.put(take);
+                        // if we get a wrong status we wait for a while and request again
+                        Thread.sleep(10000);
+                    } catch (InterruptedException e1) {
+                        e1.printStackTrace();
+                    }
+                } else {
+                    logger.error(e.getMessage());
+                    logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
+                }
+            }
+            throw new AiravataMonitorException("Error retrieving the job status", e);
+        }
+
+
+        return true;
+    }
+
+
+    /**
+     * This is the method to stop the polling process
+     *
+     * @return if the stopping process is successful return true else false
+     */
+    public boolean stopPulling() {
+        this.startPulling = false;
+        return true;
+    }
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+
+    public BlockingQueue<UserMonitorData> getQueue() {
+        return queue;
+    }
+
+    public void setQueue(BlockingQueue<UserMonitorData> queue) {
+        this.queue = queue;
+    }
+
+    public boolean authenticate() {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
new file mode 100644
index 0000000..7a37b88
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.pull.qstat;
+
+import org.apache.airavata.gfac.monitor.HostMonitorData;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class ResourceConnection {
+    private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
+
+    private PBSCluster cluster;
+
+    public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException {
+        AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo();
+        String hostAddress = monitorID.getHost().getType().getHostAddress();
+        String userName = monitorID.getUserName();
+        String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager();
+        JobManagerConfiguration jConfig = null;
+        if (jobManager == null) {
+            log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+            jConfig = CommonUtils.getPBSJobManager(installedPath);
+        } else {
+            if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) {
+                jConfig = CommonUtils.getPBSJobManager(installedPath);
+            } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) {
+                jConfig = CommonUtils.getSLURMJobManager(installedPath);
+            } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(monitorID.getHost())) {
+                jConfig = CommonUtils.getSGEJobManager(installedPath);
+            }
+            //todo support br2 etc
+        }
+        ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort());
+        cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+    }
+
+    public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException {
+        AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo();
+        String hostAddress = hostMonitorData.getHost().getType().getHostAddress();
+        String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager();
+        JobManagerConfiguration jConfig = null;
+        if (jobManager == null) {
+            log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+            jConfig = CommonUtils.getPBSJobManager(installedPath);
+        } else {
+            if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) {
+                jConfig = CommonUtils.getPBSJobManager(installedPath);
+            } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) {
+                jConfig = CommonUtils.getSLURMJobManager(installedPath);
+            }else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(hostMonitorData.getHost())) {
+                jConfig = CommonUtils.getSGEJobManager(installedPath);
+            }
+            //todo support br2 etc
+        }
+        ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort());
+        cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+    }
+    public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
+        String jobID = monitorID.getJobID();
+        //todo so currently we execute the qstat for each job but we can use user based monitoring
+        //todo or we should concatenate all the commands and execute them in one go and parse the response
+        return getStatusFromString(cluster.getJobStatus(jobID).toString());
+    }
+
+    public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException {
+        Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>();
+        Map<String,JobState> treeMap1 = new TreeMap<String,JobState>();
+        // creating a sorted map with all the jobIds and with the predefined
+        // status as UNKNOWN
+        for (MonitorID monitorID : monitorIDs) {
+            treeMap.put(monitorID.getJobID(), JobStatus.U);
+        }
+        //todo so currently we execute the qstat for each job but we can use user based monitoring
+        //todo or we should concatenate all the commands and execute them in one go and parse the response
+        cluster.getJobStatuses(userName,treeMap);
+        for(String key:treeMap.keySet()){
+            treeMap1.put(key,getStatusFromString(treeMap.get(key).toString()));
+        }
+        return treeMap1;
+    }
+    private JobState getStatusFromString(String status) {
+        log.info("parsing the job status returned : " + status);
+        if(status != null){
+            if("C".equals(status) || "CD".equals(status)|| "E".equals(status) || "CG".equals(status)){
+                return JobState.COMPLETE;
+            }else if("H".equals(status) || "h".equals(status)){
+                return JobState.HELD;
+            }else if("Q".equals(status) || "qw".equals(status)){
+                return JobState.QUEUED;
+            }else if("R".equals(status)  || "CF".equals(status) || "r".equals(status)){
+                return JobState.ACTIVE;
+            }else if ("T".equals(status)) {
+                return JobState.HELD;
+            } else if ("W".equals(status) || "PD".equals(status)) {
+                return JobState.QUEUED;
+            } else if ("S".equals(status)) {
+                return JobState.SUSPENDED;
+            }else if("CA".equals(status)){
+                return JobState.CANCELED;
+            }else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status)) {
+                return JobState.FAILED;
+            }else if ("PR".equals(status) || "Er".equals(status)) {
+                return JobState.FAILED;
+            }else if ("U".equals(status)){
+                return JobState.UNKNOWN;
+            }
+        }
+        return JobState.UNKNOWN;
+    }
+
+    public PBSCluster getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(PBSCluster cluster) {
+        this.cluster = cluster;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
new file mode 100644
index 0000000..fbf6e21
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -0,0 +1,263 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.gfac.monitor.JobIdentity;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.PushMonitor;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This is the implementation for AMQP based finishQueue, this uses
+ * rabbitmq client to recieve AMQP based monitoring data from
+ * mostly excede resources.
+ */
+public class AMQPMonitor extends PushMonitor {
+    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+
+    /* this will keep all the channels available in the system, we do not create
+      channels for all the jobs submitted, but we create channels for each user for each
+      host.
+    */
+    private Map<String, Channel> availableChannels;
+
+    private MonitorPublisher publisher;
+
+    private MonitorPublisher localPublisher;
+
+    private BlockingQueue<MonitorID> runningQueue;
+
+    private BlockingQueue<MonitorID> finishQueue;
+
+    private String connectionName;
+
+    private String proxyPath;
+
+    private List<String> amqpHosts;
+
+    private boolean startRegister;
+
+    public AMQPMonitor(){
+
+    }
+    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
+                       BlockingQueue<MonitorID> finishQueue,
+                       String proxyPath,String connectionName,List<String> hosts) {
+        this.publisher = publisher;
+        this.runningQueue = runningQueue;        // these will be initialized by the MonitorManager
+        this.finishQueue = finishQueue;          // these will be initialized by the MonitorManager
+        this.availableChannels = new HashMap<String, Channel>();
+        this.connectionName = connectionName;
+        this.proxyPath = proxyPath;
+        this.amqpHosts = hosts;
+        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher.registerListener(this);
+    }
+
+    public void initialize(String proxyPath, String connectionName, List<String> hosts) {
+        this.availableChannels = new HashMap<String, Channel>();
+        this.connectionName = connectionName;
+        this.proxyPath = proxyPath;
+        this.amqpHosts = hosts;
+        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher.registerListener(this);
+    }
+
+    @Override
+    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
+        // we subscribe to read user-host based subscription
+        HostDescription host = monitorID.getHost();
+        String hostAddress = host.getType().getHostAddress();
+        // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+        // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
+        String channelID = CommonUtils.getChannelID(monitorID);
+        if(availableChannels.get(channelID) == null){
+        try {
+            //todo need to fix this rather getting it from a file
+            Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+            Channel channel = null;
+            channel = connection.createChannel();
+            availableChannels.put(channelID, channel);
+            String queueName = channel.queueDeclare().getQueue();
+
+            BasicConsumer consumer = new
+                    BasicConsumer(new JSONMessageParser(), localPublisher);          // here we use local publisher
+            channel.basicConsume(queueName, true, consumer);
+            String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
+            // here we queuebind to a particular user in a particular machine
+            channel.queueBind(queueName, "glue2.computing_activity", filterString);
+            logger.info("Using filtering string to monitor: " + filterString);
+        } catch (IOException e) {
+            logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
+        }
+        }
+        return true;
+    }
+
+    public void run() {
+        // before going to the while true mode we start unregister thread
+        startRegister = true; // this will be unset by someone else
+        while (startRegister || !ServerSettings.isStopAllThreads()) {
+            try {
+                MonitorID take = runningQueue.take();
+                this.registerListener(take);
+            } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            } catch (Exception e){
+                e.printStackTrace();
+            }
+        }
+        Set<String> strings = availableChannels.keySet();
+        for(String key:strings) {
+            Channel channel = availableChannels.get(key);
+            try {
+                channel.close();
+            } catch (IOException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+    }
+
+    @Subscribe
+    public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
+        Iterator<MonitorID> iterator = finishQueue.iterator();
+        MonitorID next = null;
+        while(iterator.hasNext()){
+            next = iterator.next();
+            if(next.getJobID().endsWith(monitorID.getJobID())){
+                break;
+            }
+        }
+        if(next == null) {
+            logger.error("Job has removed from the queue, old obsolete message recieved");
+            return false;
+        }
+        String channelID = CommonUtils.getChannelID(next);
+        if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
+            finishQueue.remove(next);
+
+            // if this is the last job in the queue at this point with the same username and same host we
+            // close the channel and close the connection and remove it from availableChannels
+            if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
+                logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
+                        ", incase new job created we do subscribe again");
+                Channel channel = availableChannels.get(channelID);
+                if (channel == null) {
+                    logger.error("Already Unregistered the listener");
+                    throw new AiravataMonitorException("Already Unregistered the listener");
+                } else {
+                    try {
+                        channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
+                        channel.close();
+                        channel.getConnection().close();
+                        availableChannels.remove(channelID);
+                    } catch (IOException e) {
+                        logger.error("Error unregistering the listener");
+                        throw new AiravataMonitorException("Error unregistering the listener");
+                    }
+                }
+            }
+        }
+        next.setStatus(monitorID.getStatus());
+        publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
+        return true;
+    }
+    @Override
+    public boolean stopRegister() throws AiravataMonitorException {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public Map<String, Channel> getAvailableChannels() {
+        return availableChannels;
+    }
+
+    public void setAvailableChannels(Map<String, Channel> availableChannels) {
+        this.availableChannels = availableChannels;
+    }
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+
+    public BlockingQueue<MonitorID> getRunningQueue() {
+        return runningQueue;
+    }
+
+    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
+        this.runningQueue = runningQueue;
+    }
+
+    public BlockingQueue<MonitorID> getFinishQueue() {
+        return finishQueue;
+    }
+
+    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
+        this.finishQueue = finishQueue;
+    }
+
+    public String getProxyPath() {
+        return proxyPath;
+    }
+
+    public void setProxyPath(String proxyPath) {
+        this.proxyPath = proxyPath;
+    }
+
+    public List<String> getAmqpHosts() {
+        return amqpHosts;
+    }
+
+    public void setAmqpHosts(List<String> amqpHosts) {
+        this.amqpHosts = amqpHosts;
+    }
+
+    public boolean isStartRegister() {
+        return startRegister;
+    }
+
+    public void setStartRegister(boolean startRegister) {
+        this.startRegister = startRegister;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
new file mode 100644
index 0000000..1d60c45
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.event.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BasicConsumer implements Consumer {
+    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
+
+    private MessageParser parser;
+
+    private MonitorPublisher publisher;
+
+    public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
+        this.parser = parser;
+        this.publisher = publisher;
+    }
+
+    public void handleCancel(String consumerTag) {
+    }
+
+    public void handleCancelOk(String consumerTag) {
+    }
+
+    public void handleConsumeOk(String consumerTag) {
+    }
+
+    public void handleDelivery(String consumerTag,
+                               Envelope envelope,
+                               AMQP.BasicProperties properties,
+                               byte[] body) {
+
+        logger.debug("job update for: " + envelope.getRoutingKey());
+        String message = new String(body);
+        message = message.replaceAll("(?m)^", "    ");
+        // Here we parse the message and get the job status and push it
+        // to the Event bus, this will be picked by
+//        AiravataJobStatusUpdator and store in to registry
+
+        logger.debug("************************************************************");
+        logger.debug("AMQP Message recieved \n" + message);
+        logger.debug("************************************************************");
+        try {
+            String jobID = envelope.getRoutingKey().split("\\.")[0];
+            MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null);
+            monitorID.setStatus(parser.parseMessage(message));
+            publisher.publish(monitorID);
+        } catch (AiravataMonitorException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void handleRecoverOk(String consumerTag) {
+    }
+
+    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
new file mode 100644
index 0000000..72c77d5
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.airavata.ComputingActivity;
+import org.apache.airavata.gfac.monitor.core.MessageParser;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class JSONMessageParser implements MessageParser {
+    private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
+
+    public JobState parseMessage(String message)throws AiravataMonitorException {
+        /*todo write a json message parser here*/
+        logger.debug(message);
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
+            logger.info(computingActivity.getIDFromEndpoint());
+            List<String> stateList = computingActivity.getState();
+            JobState jobState = null;
+            for (String aState : stateList) {
+                jobState = getStatusFromString(aState);
+            }
+            // we get the last value of the state array
+            return jobState;
+        } catch (IOException e) {
+            throw new AiravataMonitorException(e);
+        }
+    }
+
+private JobState getStatusFromString(String status) {
+        logger.info("parsing the job status returned : " + status);
+        if(status != null){
+            if("ipf:finished".equals(status)){
+                return JobState.COMPLETE;
+            }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){
+                return JobState.QUEUED;
+            }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){
+                return JobState.ACTIVE;
+            }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) {
+                return JobState.HELD;
+            } else if ("ipf:suspending".equals(status)) {
+                return JobState.SUSPENDED;
+            }else if ("ipf:failed".equals(status)) {
+                return JobState.FAILED;
+            }else if ("ipf:unknown".equals(status)){
+                return JobState.UNKNOWN;
+            }
+        }
+        return JobState.UNKNOWN;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
new file mode 100644
index 0000000..c6e1378
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class UnRegisterWorker{
+    private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
+    private Map<String, Channel> availableChannels;
+
+    public UnRegisterWorker(Map<String, Channel> channels) {
+        this.availableChannels = channels;
+    }
+
+    @Subscribe
+    private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
+        MonitorID monitorID = jobStatus.getMonitorID();
+        String channelID = CommonUtils.getChannelID(monitorID);
+        if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
+            Channel channel = availableChannels.get(channelID);
+            if (channel == null) {
+                logger.error("Already Unregistered the listener");
+                throw new AiravataMonitorException("Already Unregistered the listener");
+            } else {
+                try {
+                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+                    channel.close();
+                    channel.getConnection().close();
+                    availableChannels.remove(channelID);
+                } catch (IOException e) {
+                    logger.error("Error unregistering the listener");
+                    throw new AiravataMonitorException("Error unregistering the listener");
+                }
+            }
+        }
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java
new file mode 100644
index 0000000..10048b0
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.state;
+
+
+public abstract class AbstractStateChangeRequest implements PublisherMessage {
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java
new file mode 100644
index 0000000..eecf88d
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.ExperimentIdentity;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest {
+    private ExperimentState state;
+    private ExperimentIdentity identity;
+
+    // this constructor can be used in Qstat monitor to handle errors
+    public ExperimentStatusChangeRequest() {
+    }
+
+    public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) {
+        this.state = state;
+        setIdentity(experimentIdentity);
+    }
+
+    public ExperimentState getState() {
+        return state;
+    }
+
+    public void setState(ExperimentState state) {
+       this.state = state;
+    }
+
+	public ExperimentIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(ExperimentIdentity identity) {
+		this.identity = identity;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
new file mode 100644
index 0000000..5ea9eb5
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.JobIdentity;
+import org.apache.airavata.gfac.monitor.MonitorID;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class JobStatusChangeRequest  extends AbstractStateChangeRequest {
+    private JobState state;
+    private JobIdentity identity;
+
+    private MonitorID monitorID;
+    
+    // this constructor can be used in Qstat monitor to handle errors
+    public JobStatusChangeRequest() {
+    }
+
+    public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) {
+    	setIdentity(jobId);
+    	setMonitorID(monitorID);
+    	this.state = state;
+    }
+
+    public JobState getState() {
+        return state;
+    }
+
+    public void setState(JobState state) {
+       this.state = state;
+    }
+
+	public JobIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(JobIdentity identity) {
+		this.identity = identity;
+	}
+
+	public MonitorID getMonitorID() {
+		return monitorID;
+	}
+
+	public void setMonitorID(MonitorID monitorID) {
+		this.monitorID = monitorID;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java
new file mode 100644
index 0000000..9a59b50
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+/**
+ * Based on the job status monitoring we can gather
+ * different informaation about the job, its not simply
+ * the job status, so we need a way to implement
+ * different job statusinfo object to keep job status
+ */
+public interface JobStatusInfo {
+
+    /**
+     * This method can be used to get JobStatusInfo data and
+     * decide the finalJobState
+     *
+     * @param jobState
+     */
+    void setJobStatus(JobStatus jobState);
+
+    /**
+     * After setting the jobState by processing jobinformation
+     * this method can be used to get the JobStatus
+     * @return
+     */
+    JobStatus getJobStatus();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java
new file mode 100644
index 0000000..cbfcb5a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.gfac.monitor.state;
+
+public interface PublisherMessage {
+//	public String getType();
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java
new file mode 100644
index 0000000..af20707
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.TaskIdentity;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class TaskStatusChangeRequest extends AbstractStateChangeRequest {
+    private TaskState state;
+    private TaskIdentity identity;
+    // this constructor can be used in Qstat monitor to handle errors
+    public TaskStatusChangeRequest() {
+    }
+
+    public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) {
+        this.state = state;
+        setIdentity(taskIdentity);
+    }
+
+    public TaskState getState() {
+        return state;
+    }
+
+    public void setState(TaskState state) {
+       this.state = state;
+    }
+
+	public TaskIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(TaskIdentity identity) {
+		this.identity = identity;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java
new file mode 100644
index 0000000..632f2e3
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state;
+
+import org.apache.airavata.gfac.monitor.WorkflowNodeIdentity;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest {
+    private WorkflowNodeState state;
+    private WorkflowNodeIdentity identity;
+
+    // this constructor can be used in Qstat monitor to handle errors
+    public WorkflowNodeStatusChangeRequest() {
+    }
+
+    public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) {
+        this.state = state;
+        setIdentity(identity);
+    }
+
+    public WorkflowNodeState getState() {
+        return state;
+    }
+
+    public void setState(WorkflowNodeState state) {
+       this.state = state;
+    }
+
+	public WorkflowNodeIdentity getIdentity() {
+		return identity;
+	}
+
+	public void setIdentity(WorkflowNodeIdentity identity) {
+		this.identity = identity;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java
new file mode 100644
index 0000000..19b051a
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state.impl;
+
+import org.apache.airavata.gfac.monitor.state.JobStatusInfo;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+/**
+ * This can be used to store job status information about
+ * amazon jobs, this data could be very different from
+ * a typical grid job
+ */
+public class AmazonJobStatusInfo implements JobStatusInfo {
+    public void setJobStatus(JobStatus jobState) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public JobStatus getJobStatus() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java
new file mode 100644
index 0000000..4612c3c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.state.impl;
+
+import org.apache.airavata.gfac.monitor.state.JobStatusInfo;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
+
+
+/**
+ * This can be used to keep information about a Grid job
+ * which we can get from qstat polling or from amqp based
+ * monitoring in Grid machines
+ */
+public class GridJobStatusInfo implements JobStatusInfo {
+    public void setJobStatus(JobStatus jobState) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public JobStatus getJobStatus() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b0230849/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
new file mode 100644
index 0000000..b69cf52
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.monitor.util;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultSaslConfig;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.List;
+
+public class AMQPConnectionUtil {
+    public static Connection connect(List<String>hosts,String vhost, String proxyFile) {
+        Collections.shuffle(hosts);
+        for (String host : hosts) {
+            Connection connection = connect(host, vhost, proxyFile);
+            if (host != null) {
+                System.out.println("connected to " + host);
+                return connection;
+            }
+        }
+        return null;
+    }
+
+    public static Connection connect(String host, String vhost, String proxyFile) {
+        Connection connection;
+        try {
+            String keyPassPhrase = "test123";
+            KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+            kmf.init(ks, keyPassPhrase.toCharArray());
+
+            KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+            tmf.init(tks);
+
+            SSLContext c = SSLContext.getInstance("SSLv3");
+            c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+            ConnectionFactory factory = new ConnectionFactory();
+            factory.setHost(host);
+            factory.setPort(5671);
+            factory.useSslProtocol(c);
+            factory.setVirtualHost(vhost);
+            factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
+
+            connection = factory.newConnection();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+        return connection;
+    }
+
+}