You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/06/16 21:37:41 UTC

[2/7] airavata git commit: Removed gsi related code

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
deleted file mode 100644
index d6da22a..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.handlers;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.gfac.monitor.HPCMonitorID;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *   this handler is responsible monitoring jobs in push mode
- *   and currently this support multiple push monitoring in grid resource
- */
-public class GridPushMonitorHandler extends ThreadedHandler {
-    private final static Logger logger= LoggerFactory.getLogger(GridPushMonitorHandler.class);
-
-    private AMQPMonitor amqpMonitor;
-
-    private AuthenticationInfo authenticationInfo;
-
-    @Override
-    public void initProperties(Properties properties) throws GFacHandlerException {
-        String myProxyUser=null;
-        try{
-            myProxyUser = ServerSettings.getSetting("myproxy.username");
-            String myProxyPass = ServerSettings.getSetting("myproxy.password");
-            String certPath = ServerSettings.getSetting("trusted.cert.location");
-            String myProxyServer = ServerSettings.getSetting("myproxy.server");
-            setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer,
-                    7512, 17280000, certPath));
-
-            String hostList=(String)properties.get("hosts");
-            String proxyFilePath = ServerSettings.getSetting("proxy.file.path");
-            String connectionName=ServerSettings.getSetting("connection.name");
-            LinkedBlockingQueue<MonitorID> pushQueue = new LinkedBlockingQueue<MonitorID>();
-            LinkedBlockingQueue<MonitorID> finishQueue = new LinkedBlockingQueue<MonitorID>();
-            List<String> hosts= Arrays.asList(hostList.split(","));
-            amqpMonitor=new AMQPMonitor(null,pushQueue,finishQueue,proxyFilePath,connectionName,hosts);
-        }catch (ApplicationSettingsException e){
-            logger.error(e.getMessage(), e);
-            throw new GFacHandlerException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void run() {
-        amqpMonitor.run();
-    }
-
-    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException{
-        super.invoke(jobExecutionContext);
-        MonitorID monitorID=new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
-        amqpMonitor.getRunningQueue().add(monitorID);
-    }
-
-    @Override
-    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
-        // TODO: Auto generated method body.
-    }
-
-    public AMQPMonitor getAmqpMonitor() {
-        return amqpMonitor;
-    }
-
-    public void setAmqpMonitor(AMQPMonitor amqpMonitor) {
-        this.amqpMonitor = amqpMonitor;
-    }
-
-    public AuthenticationInfo getAuthenticationInfo() {
-        return authenticationInfo;
-    }
-
-    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
-        this.authenticationInfo = authenticationInfo;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
deleted file mode 100644
index 79a4a8e..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-
-import com.google.common.eventbus.EventBus;
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.impl.OutHandlerWorker;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.monitor.UserMonitorData;
-import org.apache.airavata.gfac.monitor.core.PullMonitor;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer;
-import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
-import org.apache.airavata.model.experiment.JobState;
-
-import java.sql.Timestamp;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * This monitor is based on qstat command which can be run
- * in grid resources and retrieve the job status.
- */
-public class HPCPullMonitor extends PullMonitor {
-
-    private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(HPCPullMonitor.class);
-    public static final int FAILED_COUNT = 5;
-
-    // I think this should use DelayedBlocking Queue to do the monitoring*/
-    private BlockingQueue<UserMonitorData> queue;
-
-    private boolean startPulling = false;
-
-    private Map<String, ResourceConnection> connections;
-
-    private LocalEventPublisher publisher;
-
-    private LinkedBlockingQueue<String> cancelJobList;
-
-    private List<String> completedJobsFromPush;
-
-    private GFac gfac;
-
-    private AuthenticationInfo authenticationInfo;
-
-    private ArrayList<MonitorID> removeList;
-
-    public HPCPullMonitor() {
-        connections = new HashMap<String, ResourceConnection>();
-        queue = new LinkedBlockingDeque<UserMonitorData>();
-        publisher = new LocalEventPublisher(new EventBus());
-        cancelJobList = new LinkedBlockingQueue<String>();
-        completedJobsFromPush = new ArrayList<String>();
-        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
-        removeList = new ArrayList<MonitorID>();
-    }
-
-    public HPCPullMonitor(LocalEventPublisher localEventPublisher, AuthenticationInfo authInfo) {
-        connections = new HashMap<String, ResourceConnection>();
-        queue = new LinkedBlockingDeque<UserMonitorData>();
-        publisher = localEventPublisher;
-        authenticationInfo = authInfo;
-        cancelJobList = new LinkedBlockingQueue<String>();
-        this.completedJobsFromPush = new ArrayList<String>();
-        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
-        removeList = new ArrayList<MonitorID>();
-    }
-
-    public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, LocalEventPublisher publisher) {
-        this.queue = queue;
-        this.publisher = publisher;
-        connections = new HashMap<String, ResourceConnection>();
-        cancelJobList = new LinkedBlockingQueue<String>();
-        this.completedJobsFromPush = new ArrayList<String>();
-        (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
-        removeList = new ArrayList<MonitorID>();
-    }
-
-
-    public void run() {
-        /* implement a logic to pick each monitorID object from the queue and do the
-        monitoring
-         */
-        this.startPulling = true;
-        while (this.startPulling && !ServerSettings.isStopAllThreads()) {
-            try {
-                // After finishing one iteration of the full queue this thread sleeps 1 second
-                synchronized (this.queue) {
-                    if (this.queue.size() > 0) {
-                        startPulling();
-                }
-            }
-                Thread.sleep(10000);
-            } catch (Exception e) {
-                // we catch all the exceptions here because no matter what happens we do not stop running this
-                // thread, but ideally we should report proper error messages, but this is handled in startPulling
-                // method, incase something happen in Thread.sleep we handle it with this catch block.
-                logger.error(e.getMessage(),e);
-            }
-        }
-        // thread is going to return so we close all the connections
-        Iterator<String> iterator = connections.keySet().iterator();
-        while (iterator.hasNext()) {
-            String next = iterator.next();
-            ResourceConnection resourceConnection = connections.get(next);
-            try {
-                resourceConnection.getCluster().disconnect();
-            } catch (SSHApiException e) {
-                logger.error("Erro while connecting to the cluster", e);
-            }
-        }
-    }
-
-    /**
-     * This method will can invoke when PullMonitor needs to start
-     * and it has to invoke in the frequency specified below,
-     *
-     * @return if the start process is successful return true else false
-     */
-     public boolean startPulling() throws AiravataMonitorException {
-        // take the top element in the queue and pull the data and put that element
-        // at the tail of the queue
-        //todo this polling will not work with multiple usernames but with single user
-        // and multiple hosts, currently monitoring will work
-        UserMonitorData take = null;
-        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
-        MonitorID currentMonitorID = null;
-        try {
-            take = this.queue.take();
-            List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
-            for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) {
-                HostMonitorData iHostMonitorData = hostIterator.next();
-                if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
-                    String hostName = iHostMonitorData.getComputeResourceDescription().getHostName();
-                    ResourceConnection connection = null;
-                    if (connections.containsKey(hostName)) {
-                        if (!connections.get(hostName).isConnected()) {
-                            connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
-                            connections.put(hostName, connection);
-                        } else {
-                            logger.debug("We already have this connection so not going to create one");
-                            connection = connections.get(hostName);
-                        }
-                    } else {
-                        connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo());
-                        connections.put(hostName, connection);
-                    }
-
-                    // before we get the statuses, we check the cancel job list and remove them permanently
-                    List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
-                    Iterator<String> iterator1 = cancelJobList.iterator();
-                    ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator();
-                    while (monitorIDListIterator.hasNext()) {
-                        MonitorID iMonitorID = monitorIDListIterator.next();
-                        while (iterator1.hasNext()) {
-                            String cancelMId = iterator1.next();
-                            if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
-                                iMonitorID.setStatus(JobState.CANCELED);
-//                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
-                                removeList.add(iMonitorID);
-                                logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " +
-                                                "completed job queue, experiment {}, task {} , job {}",
-                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID());
-                                logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .",
-                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
-                                sendNotification(iMonitorID);
-                                logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
-                                Thread.sleep(10000);
-                                GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
-                                break;
-                            }
-                        }
-                        iterator1 = cancelJobList.iterator();
-                    }
-
-                    cleanup(take);
-
-                    synchronized (completedJobsFromPush) {
-                        for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) {
-                            String completeId = iterator.next();
-                            for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) {
-                                MonitorID iMonitorID = monitorIDListIterator.next();
-                                if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
-                                    logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
-                                    iMonitorID.setStatus(JobState.COMPLETE);
-//                                    CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak
-                                    removeList.add(iMonitorID);
-                                    logger.debugId(completeId, "Push notification updated job {} status to {}. " +
-                                                    "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
-                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-                                    logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
-                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
-
-                                    sendNotification(iMonitorID);
-                                    logger.info("To avoid timing issues we sleep sometime and try to retrieve output files");
-                                    Thread.sleep(10000);
-                                    GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
-                                    break;
-                                }
-                            }
-                        }
-                    }
-
-                    cleanup(take);
-
-                    // we have to get this again because we removed the already completed jobs with amqp messages
-                    monitorID = iHostMonitorData.getMonitorIDs();
-                    Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
-                    for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
-                        MonitorID iMonitorID = iterator.next();
-                        currentMonitorID = iMonitorID;
-                        if (!JobState.CANCELED.equals(iMonitorID.getStatus()) &&
-                                !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
-                            iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
-                        } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) {
-                            logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " +
-                                    "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-//                            CommonUtils.removeMonitorFromQueue(take, iMonitorID);
-                            removeList.add(iMonitorID);
-                            logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
-                                    iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
-                            GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
-                        }
-                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
-                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                        sendNotification(iMonitorID);
-                        // if the job is completed we do not have to put the job to the queue again
-                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                    }
-
-                    cleanup(take);
-
-
-                    for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) {
-                        MonitorID iMonitorID = iterator.next();
-                        if (iMonitorID.getFailedCount() > FAILED_COUNT) {
-                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            String outputDir = iMonitorID.getJobExecutionContext().getOutputDir();
-                            List<String> stdOut = null;
-                            try {
-                                stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory
-                            } catch (SSHApiException e) {
-                                if (e.getMessage().contains("No such file or directory")) {
-                                    // this is because while we run output handler something failed and during exception
-                                    // we store all the jobs in the monitor queue again
-                                    logger.error("We know this  job is already attempted to run out-handlers");
-//                                    CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
-                                }
-                            }
-                            if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this
-                                iMonitorID.setStatus(JobState.COMPLETE);
-                                logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " +
-                                                " Experiment {} , task {}", iMonitorID.getFailedCount(),
-                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-                                logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .",
-                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName());
-                                sendNotification(iMonitorID);
-//                                CommonUtils.removeMonitorFromQueue(take, iMonitorID);
-                                removeList.add(iMonitorID);
-                                GFacThreadPoolExecutor.getCachedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher));
-                            } else {
-                                iMonitorID.setFailedCount(0);
-                            }
-                        } else {
-                            // Evey
-                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            // if the job is complete we remove it from the Map, if any of these maps
-                            // get empty this userMonitorData will get delete from the queue
-                        }
-                    }
-
-                    cleanup(take);
-
-
-                } else {
-                    logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData.
-                            getComputeResourceDescription().getHostName());
-                }
-            }
-            // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
-            // now the userMonitorData goes back to the tail of the queue
-            // during individual monitorID removal we remove the HostMonitorData object if it become empty
-            // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object
-            // we should remove it from the queue so here we do not put it back.
-            for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) {
-                HostMonitorData iHostMonitorID = iterator1.next();
-                if (iHostMonitorID.getMonitorIDs().size() == 0) {
-                    iterator1.remove();
-                    logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName());
-                }
-            }
-            if(take.getHostMonitorData().size()!=0) {
-                queue.put(take);
-            }
-        } catch (InterruptedException e) {
-            if (!this.queue.contains(take)) {
-                try {
-                    this.queue.put(take);
-                } catch (InterruptedException e1) {
-                    e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                }
-            }
-            logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
-            throw new AiravataMonitorException(e);
-        } catch (SSHApiException e) {
-            logger.error(e.getMessage());
-            if (e.getMessage().contains("Unknown Job Id Error")) {
-                // in this case job is finished or may be the given job ID is wrong
-                jobStatus.setState(JobState.UNKNOWN);
-                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
-                if (currentMonitorID != null){
-                    jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
-                    jobIdentifier.setTaskId(currentMonitorID.getTaskID());
-                    jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
-                    jobIdentifier.setJobId(currentMonitorID.getJobID());
-                    jobIdentifier.setGatewayId(currentMonitorID.getJobExecutionContext().getGatewayID());
-                }
-                jobStatus.setJobIdentity(jobIdentifier);
-                publisher.publish(jobStatus);
-            } else if (e.getMessage().contains("illegally formed job identifier")) {
-                logger.error("Wrong job ID is given so dropping the job from monitoring system");
-            } else if (!this.queue.contains(take)) {
-                try {
-                    queue.put(take);
-                } catch (InterruptedException e1) {
-                    e1.printStackTrace();
-                }
-            }
-            throw new AiravataMonitorException("Error retrieving the job status", e);
-        } catch (Exception e) {
-            try {
-                queue.put(take);
-            } catch (InterruptedException e1) {
-                e1.printStackTrace();
-            }
-            throw new AiravataMonitorException("Error retrieving the job status", e);
-        }
-        return true;
-    }
-
-    private void sendNotification(MonitorID iMonitorID) {
-        JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent();
-        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
-                iMonitorID.getTaskID(),
-                iMonitorID.getWorkflowNodeID(),
-                iMonitorID.getExperimentID(),
-                iMonitorID.getJobExecutionContext().getGatewayID());
-        jobStatus.setJobIdentity(jobIdentity);
-        jobStatus.setState(iMonitorID.getStatus());
-        // we have this JobStatus class to handle amqp monitoring
-        logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
-                "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
-        jobStatus.getJobIdentity().getTaskId());
-
-        publisher.publish(jobStatus);
-    }
-
-    /**
-     * This is the method to stop the polling process
-     *
-     * @return if the stopping process is successful return true else false
-     */
-    public boolean stopPulling() {
-        this.startPulling = false;
-        return true;
-    }
-
-    public LocalEventPublisher getPublisher() {
-        return publisher;
-    }
-
-    public void setPublisher(LocalEventPublisher publisher) {
-        this.publisher = publisher;
-    }
-
-    public BlockingQueue<UserMonitorData> getQueue() {
-        return queue;
-    }
-
-    public void setQueue(BlockingQueue<UserMonitorData> queue) {
-        this.queue = queue;
-    }
-
-    public boolean authenticate() {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public Map<String, ResourceConnection> getConnections() {
-        return connections;
-    }
-
-    public boolean isStartPulling() {
-        return startPulling;
-    }
-
-    public void setConnections(Map<String, ResourceConnection> connections) {
-        this.connections = connections;
-    }
-
-    public void setStartPulling(boolean startPulling) {
-        this.startPulling = startPulling;
-    }
-
-    public GFac getGfac() {
-        return gfac;
-    }
-
-    public void setGfac(GFac gfac) {
-        this.gfac = gfac;
-    }
-
-    public AuthenticationInfo getAuthenticationInfo() {
-        return authenticationInfo;
-    }
-
-    public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
-        this.authenticationInfo = authenticationInfo;
-    }
-
-    public LinkedBlockingQueue<String> getCancelJobList() {
-        return cancelJobList;
-    }
-
-    public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) {
-        this.cancelJobList = cancelJobList;
-    }
-
-
-    private void cleanup(UserMonitorData userMonitorData){
-        for(MonitorID iMonitorId:removeList){
-            try {
-                CommonUtils.removeMonitorFromQueue(userMonitorData, iMonitorId);
-            } catch (AiravataMonitorException e) {
-                logger.error(e.getMessage(), e);
-                logger.error("Error deleting the monitor data: " + iMonitorId.getJobID());
-            }
-        }
-        removeList.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
deleted file mode 100644
index 26add1f..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.SecurityContext;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.gsi.ssh.impl.HPCRemoteCluster;
-import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.cluster.JobStatus;
-import org.apache.airavata.model.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public class ResourceConnection {
-    private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
-
-    private HPCRemoteCluster cluster;
-
-    private AuthenticationInfo authenticationInfo;
-
-
-    public ResourceConnection(HostMonitorData hostMonitorData,AuthenticationInfo authInfo) throws SSHApiException {
-        MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
-        try {
-            SecurityContext securityContext = monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
-            if(securityContext != null) {
-                if (securityContext instanceof GSISecurityContext) {
-                    GSISecurityContext gsiSecurityContext = (GSISecurityContext) securityContext;
-                    cluster = (HPCRemoteCluster) gsiSecurityContext.getRemoteCluster();
-                } else if (securityContext instanceof  SSHSecurityContext) {
-                    SSHSecurityContext sshSecurityContext = (SSHSecurityContext)
-                            securityContext;
-                    cluster = (HPCRemoteCluster) sshSecurityContext.getRemoteCluster();
-                }
-            }
-            // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
-            // we are using our own credentials and not using one users account to do everything.
-            authenticationInfo = authInfo;
-        } catch (GFacException e) {
-            log.error("Error reading data from job ExecutionContext");
-        }
-    }
-
-    public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException {
-        MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0);
-        try {
-            GSISecurityContext securityContext = (GSISecurityContext)
-                    monitorID.getJobExecutionContext().getSecurityContext(monitorID.getComputeResourceDescription().getHostName());
-            cluster = (HPCRemoteCluster) securityContext.getRemoteCluster();
-
-            // we just use cluster configuration from the incoming request and construct a new cluster because for monitoring
-            // we are using our own credentials and not using one users account to do everything.
-            cluster = new HPCRemoteCluster(cluster.getServerInfo(), authenticationInfo, cluster.getJobManagerConfiguration());
-        } catch (GFacException e) {
-            log.error("Error reading data from job ExecutionContext");
-        }
-    }
-
-    public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
-        String jobID = monitorID.getJobID();
-        //todo so currently we execute the qstat for each job but we can use user based monitoring
-        //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
-        return getStatusFromString(cluster.getJobStatus(jobID).toString());
-    }
-
-    public Map<String, JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException {
-        Map<String, JobStatus> treeMap = new TreeMap<String, JobStatus>();
-        Map<String, JobState> treeMap1 = new TreeMap<String, JobState>();
-        // creating a sorted map with all the jobIds and with the predefined
-        // status as UNKNOWN
-        for (MonitorID monitorID : monitorIDs) {
-            treeMap.put(monitorID.getJobID()+","+monitorID.getJobName(), JobStatus.U);
-        }
-        String userName = cluster.getServerInfo().getUserName();
-        //todo so currently we execute the qstat for each job but we can use user based monitoring
-        //todo or we should concatenate all the commands and execute them in one go and parseSingleJob the response
-        //
-        cluster.getJobStatuses(userName, treeMap);
-        for (String key : treeMap.keySet()) {
-            treeMap1.put(key, getStatusFromString(treeMap.get(key).toString()));
-        }
-        return treeMap1;
-    }
-
-    private JobState getStatusFromString(String status) {
-        log.info("parsing the job status returned : " + status);
-        if (status != null) {
-            if ("C".equals(status) || "CD".equals(status) || "E".equals(status) || "CG".equals(status) || "DONE".equals(status)) {
-                return JobState.COMPLETE;
-            } else if ("H".equals(status) || "h".equals(status)) {
-                return JobState.HELD;
-            } else if ("Q".equals(status) || "qw".equals(status) || "PEND".equals(status)) {
-                return JobState.QUEUED;
-            } else if ("R".equals(status) || "CF".equals(status) || "r".equals(status) || "RUN".equals(status)) {
-                return JobState.ACTIVE;
-            } else if ("T".equals(status)) {
-                return JobState.HELD;
-            } else if ("W".equals(status) || "PD".equals(status)) {
-                return JobState.QUEUED;
-            } else if ("S".equals(status) || "PSUSP".equals(status) || "USUSP".equals(status) || "SSUSP".equals(status)) {
-                return JobState.SUSPENDED;
-            } else if ("CA".equals(status)) {
-                return JobState.CANCELED;
-            } else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status) || "EXIT".equals(status)) {
-                return JobState.FAILED;
-            } else if ("PR".equals(status) || "Er".equals(status)) {
-                return JobState.FAILED;
-            } else if ("U".equals(status) || ("UNKWN".equals(status))) {
-                return JobState.UNKNOWN;
-            }
-        }
-        return JobState.UNKNOWN;
-    }
-
-    public HPCRemoteCluster getCluster() {
-        return cluster;
-    }
-
-    public void setCluster(HPCRemoteCluster cluster) {
-        this.cluster = cluster;
-    }
-
-    public boolean isConnected(){
-        return this.cluster.getSession().isConnected();
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
deleted file mode 100644
index a946075..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.PushMonitor;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.messaging.event.JobIdentifier;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-
-/**
- * This is the implementation for AMQP based finishQueue, this uses
- * rabbitmq client to recieve AMQP based monitoring data from
- * mostly excede resources.
- */
-public class AMQPMonitor extends PushMonitor {
-    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-
-    /* this will keep all the channels available in the system, we do not create
-      channels for all the jobs submitted, but we create channels for each user for each
-      host.
-    */
-    private Map<String, Channel> availableChannels;
-
-    private LocalEventPublisher publisher;
-
-    private LocalEventPublisher localPublisher;
-
-    private BlockingQueue<MonitorID> runningQueue;
-
-    private BlockingQueue<MonitorID> finishQueue;
-
-    private String connectionName;
-
-    private String proxyPath;
-
-    private List<String> amqpHosts;
-
-    private boolean startRegister;
-
-    public AMQPMonitor(){
-
-    }
-    public AMQPMonitor(LocalEventPublisher publisher, BlockingQueue<MonitorID> runningQueue,
-                       BlockingQueue<MonitorID> finishQueue,
-                       String proxyPath,String connectionName,List<String> hosts) {
-        this.publisher = publisher;
-        this.runningQueue = runningQueue;        // these will be initialized by the MonitorManager
-        this.finishQueue = finishQueue;          // these will be initialized by the MonitorManager
-        this.availableChannels = new HashMap<String, Channel>();
-        this.connectionName = connectionName;
-        this.proxyPath = proxyPath;
-        this.amqpHosts = hosts;
-        this.localPublisher = new LocalEventPublisher(new EventBus());
-        this.localPublisher.registerListener(this);
-    }
-
-    public void initialize(String proxyPath, String connectionName, List<String> hosts) {
-        this.availableChannels = new HashMap<String, Channel>();
-        this.connectionName = connectionName;
-        this.proxyPath = proxyPath;
-        this.amqpHosts = hosts;
-        this.localPublisher = new LocalEventPublisher(new EventBus());
-        this.localPublisher.registerListener(this);
-    }
-
-    @Override
-    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
-        // we subscribe to read user-host based subscription
-        ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription();
-        if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) {
-            // we get first ip address for the moment
-            String hostAddress = computeResourceDescription.getIpAddresses().get(0);
-            // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
-            // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue
-            String channelID = CommonUtils.getChannelID(monitorID);
-            if (availableChannels.get(channelID) == null) {
-                try {
-                    //todo need to fix this rather getting it from a file
-                    Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
-                    Channel channel = null;
-                    channel = connection.createChannel();
-                    availableChannels.put(channelID, channel);
-                    String queueName = channel.queueDeclare().getQueue();
-
-                    BasicConsumer consumer = new
-                            BasicConsumer(new JSONMessageParser(), localPublisher);          // here we use local publisher
-                    channel.basicConsume(queueName, true, consumer);
-                    String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
-                    // here we queuebind to a particular user in a particular machine
-                    channel.queueBind(queueName, "glue2.computing_activity", filterString);
-                    logger.info("Using filtering string to monitor: " + filterString);
-                } catch (IOException e) {
-                    logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
-                }
-            }
-        } else {
-            throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() +
-                    " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " +
-                    "IpAddress with it");
-        }
-        return true;
-    }
-
-    public void run() {
-        // before going to the while true mode we start unregister thread
-        startRegister = true; // this will be unset by someone else
-        while (startRegister || !ServerSettings.isStopAllThreads()) {
-            try {
-                MonitorID take = runningQueue.take();
-                this.registerListener(take);
-            } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
-                logger.error(e.getMessage(), e);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
-            } catch (Exception e){
-                logger.error(e.getMessage(), e);
-            }
-        }
-        Set<String> strings = availableChannels.keySet();
-        for(String key:strings) {
-            Channel channel = availableChannels.get(key);
-            try {
-                channel.close();
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
-
-    @Subscribe
-    public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
-        Iterator<MonitorID> iterator = finishQueue.iterator();
-        MonitorID next = null;
-        while(iterator.hasNext()){
-            next = iterator.next();
-            if(next.getJobID().endsWith(monitorID.getJobID())){
-                break;
-            }
-        }
-        if(next == null) {
-            logger.error("Job has removed from the queue, old obsolete message recieved");
-            return false;
-        }
-        String channelID = CommonUtils.getChannelID(next);
-        if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) {
-            finishQueue.remove(next);
-
-            // if this is the last job in the queue at this point with the same username and same host we
-            // close the channel and close the connection and remove it from availableChannels
-            if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) {
-                logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" +
-                        ", incase new job created we do subscribe again");
-                Channel channel = availableChannels.get(channelID);
-                if (channel == null) {
-                    logger.error("Already Unregistered the listener");
-                    throw new AiravataMonitorException("Already Unregistered the listener");
-                } else {
-                    try {
-                        channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
-                        channel.close();
-                        channel.getConnection().close();
-                        availableChannels.remove(channelID);
-                    } catch (IOException e) {
-                        logger.error("Error unregistering the listener");
-                        throw new AiravataMonitorException("Error unregistering the listener");
-                    }
-                }
-            }
-        }
-        next.setStatus(monitorID.getStatus());
-        JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(),
-                                                     next.getTaskID(),
-                                                     next.getWorkflowNodeID(),
-                                                     next.getExperimentID(),
-                                                     next.getJobExecutionContext().getGatewayID());
-        publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity));
-        return true;
-    }
-    @Override
-    public boolean stopRegister() throws AiravataMonitorException {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public Map<String, Channel> getAvailableChannels() {
-        return availableChannels;
-    }
-
-    public void setAvailableChannels(Map<String, Channel> availableChannels) {
-        this.availableChannels = availableChannels;
-    }
-
-    public LocalEventPublisher getPublisher() {
-        return publisher;
-    }
-
-    public void setPublisher(LocalEventPublisher publisher) {
-        this.publisher = publisher;
-    }
-
-    public BlockingQueue<MonitorID> getRunningQueue() {
-        return runningQueue;
-    }
-
-    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
-        this.runningQueue = runningQueue;
-    }
-
-    public BlockingQueue<MonitorID> getFinishQueue() {
-        return finishQueue;
-    }
-
-    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
-        this.finishQueue = finishQueue;
-    }
-
-    public String getProxyPath() {
-        return proxyPath;
-    }
-
-    public void setProxyPath(String proxyPath) {
-        this.proxyPath = proxyPath;
-    }
-
-    public List<String> getAmqpHosts() {
-        return amqpHosts;
-    }
-
-    public void setAmqpHosts(List<String> amqpHosts) {
-        this.amqpHosts = amqpHosts;
-    }
-
-    public boolean isStartRegister() {
-        return startRegister;
-    }
-
-    public void setStartRegister(boolean startRegister) {
-        this.startRegister = startRegister;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
deleted file mode 100644
index 4247524..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.core.MessageParser;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
-
-public class BasicConsumer implements Consumer {
-    private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
-
-    private MessageParser parser;
-
-    private LocalEventPublisher publisher;
-
-    public BasicConsumer(MessageParser parser, LocalEventPublisher publisher) {
-        this.parser = parser;
-        this.publisher = publisher;
-    }
-
-    public void handleCancel(String consumerTag) {
-    }
-
-    public void handleCancelOk(String consumerTag) {
-    }
-
-    public void handleConsumeOk(String consumerTag) {
-    }
-
-    public void handleDelivery(String consumerTag,
-                               Envelope envelope,
-                               AMQP.BasicProperties properties,
-                               byte[] body) {
-
-        logger.debug("job update for: " + envelope.getRoutingKey());
-        String message = new String(body);
-        message = message.replaceAll("(?m)^", "    ");
-        // Here we parse the message and get the job status and push it
-        // to the Event bus, this will be picked by
-//        AiravataJobStatusUpdator and store in to registry
-
-        logger.debug("************************************************************");
-        logger.debug("AMQP Message recieved \n" + message);
-        logger.debug("************************************************************");
-        try {
-            String jobID = envelope.getRoutingKey().split("\\.")[0];
-            MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null,null);
-            monitorID.setStatus(parser.parseMessage(message));
-            publisher.publish(monitorID);
-        } catch (AiravataMonitorException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
-
-    public void handleRecoverOk(String consumerTag) {
-    }
-
-    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/ComputingActivity.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/ComputingActivity.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/ComputingActivity.java
deleted file mode 100644
index 5a36b4a..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/ComputingActivity.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import java.util.List;
-
-/**
- * Created by syodage on 6/3/15.
- */
-public class ComputingActivity {
-    String idFromEndpoint;
-    private List<String> state;
-
-    public String getIDFromEndpoint() {
-        return idFromEndpoint;
-    }
-
-    public List<String> getState() {
-        return state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
deleted file mode 100644
index 79cc417..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.airavata.gfac.monitor.core.MessageParser;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.model.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-public class JSONMessageParser implements MessageParser {
-    private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
-
-    public JobState parseMessage(String message)throws AiravataMonitorException {
-        /*todo write a json message parser here*/
-        logger.debug(message);
-        ObjectMapper objectMapper = new ObjectMapper();
-        try {
-            ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class);
-            logger.info(computingActivity.getIDFromEndpoint());
-            List<String> stateList = computingActivity.getState();
-            JobState jobState = null;
-            for (String aState : stateList) {
-                jobState = getStatusFromString(aState);
-            }
-            // we get the last value of the state array
-            return jobState;
-        } catch (IOException e) {
-            throw new AiravataMonitorException(e);
-        }
-    }
-
-private JobState getStatusFromString(String status) {
-        logger.info("parsing the job status returned : " + status);
-        if(status != null){
-            if("ipf:finished".equals(status)){
-                return JobState.COMPLETE;
-            }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){
-                return JobState.QUEUED;
-            }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){
-                return JobState.ACTIVE;
-            }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) {
-                return JobState.HELD;
-            } else if ("ipf:suspending".equals(status)) {
-                return JobState.SUSPENDED;
-            }else if ("ipf:failed".equals(status)) {
-                return JobState.FAILED;
-            }else if ("ipf:unknown".equals(status)){
-                return JobState.UNKNOWN;
-            }
-        }
-        return JobState.UNKNOWN;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
deleted file mode 100644
index c4275f1..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class SimpleJobFinishConsumer {
-    private final static Logger logger = LoggerFactory.getLogger(SimpleJobFinishConsumer.class);
-
-    private List<String> completedJobsFromPush;
-
-    public SimpleJobFinishConsumer(List<String> completedJobsFromPush) {
-        this.completedJobsFromPush = completedJobsFromPush;
-    }
-
-    public void listen() {
-        try {
-            String queueName = ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950");
-            String uri = "amqp://localhost";
-
-            ConnectionFactory connFactory = new ConnectionFactory();
-            connFactory.setUri(uri);
-            Connection conn = connFactory.newConnection();
-            logger.info("--------Created the connection to Rabbitmq server successfully-------");
-
-            final Channel ch = conn.createChannel();
-
-            logger.info("--------Created the channel with Rabbitmq server successfully-------");
-
-            ch.queueDeclare(queueName, false, false, false, null);
-
-            logger.info("--------Declare the queue " + queueName + " in Rabbitmq server successfully-------");
-
-            final QueueingConsumer consumer = new QueueingConsumer(ch);
-            ch.basicConsume(queueName, consumer);
-            (new Thread() {
-                public void run() {
-                    try {
-                        while (true) {
-                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
-                            String message = new String(delivery.getBody());
-                            logger.info("---------------- Job Finish message received:" + message + " --------------");
-                            synchronized (completedJobsFromPush) {
-                                completedJobsFromPush.add(message);
-                            }
-                            ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
-                        }
-                    } catch (Exception ex) {
-                        logger.error("--------Cannot connect to a RabbitMQ Server--------" , ex);
-                    }
-                }
-
-            }).start();
-        } catch (Exception ex) {
-            logger.error("Cannot connect to a RabbitMQ Server: " , ex);
-            logger.info("------------- Push monitoring for HPC jobs is disabled -------------");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
deleted file mode 100644
index 41fd096..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.impl.push.amqp;
-
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
-import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
-import org.apache.airavata.model.experiment.JobState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class UnRegisterWorker{
-    private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
-    private Map<String, Channel> availableChannels;
-
-    public UnRegisterWorker(Map<String, Channel> channels) {
-        this.availableChannels = channels;
-    }
-
-    @Subscribe
-    private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException {
-        String channelID = CommonUtils.getChannelID(monitorID);
-        if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
-            Channel channel = availableChannels.get(channelID);
-            if (channel == null) {
-                logger.error("Already Unregistered the listener");
-                throw new AiravataMonitorException("Already Unregistered the listener");
-            } else {
-                try {
-                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
-                    channel.close();
-                    channel.getConnection().close();
-                    availableChannels.remove(channelID);
-                } catch (IOException e) {
-                    logger.error("Error unregistering the listener");
-                    throw new AiravataMonitorException("Error unregistering the listener");
-                }
-            }
-        }
-        return true;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
deleted file mode 100644
index 6a4ed3b..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.util;
-
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultSaslConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import java.security.KeyStore;
-import java.util.Collections;
-import java.util.List;
-
-public class AMQPConnectionUtil {
-    private final static Logger logger = LoggerFactory.getLogger(AMQPConnectionUtil.class);
-    public static Connection connect(List<String>hosts,String vhost, String proxyFile) {
-        Collections.shuffle(hosts);
-        for (String host : hosts) {
-            Connection connection = connect(host, vhost, proxyFile);
-            if (host != null) {
-                System.out.println("connected to " + host);
-                return connection;
-            }
-        }
-        return null;
-    }
-
-    public static Connection connect(String host, String vhost, String proxyFile) {
-        Connection connection;
-        try {
-            String keyPassPhrase = "test123";
-            KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
-            kmf.init(ks, keyPassPhrase.toCharArray());
-
-            KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
-            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
-            tmf.init(tks);
-
-            SSLContext c = SSLContext.getInstance("SSLv3");
-            c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
-
-            ConnectionFactory factory = new ConnectionFactory();
-            factory.setHost(host);
-            factory.setPort(5671);
-            factory.useSslProtocol(c);
-            factory.setVirtualHost(vhost);
-            factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
-
-            connection = factory.newConnection();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            return null;
-        }
-        return connection;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d9b2df03/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
deleted file mode 100644
index a0b922d..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.monitor.util;
-
-import org.apache.airavata.common.logger.AiravataLogger;
-import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.GFacHandler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.monitor.HostMonitorData;
-import org.apache.airavata.gfac.monitor.UserMonitorData;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class CommonUtils {
-    private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class);
-
-    public static String getChannelID(MonitorID monitorID) {
-        return monitorID.getUserName() + "-" + monitorID.getComputeResourceDescription().getHostName();
-    }
-
-    public static String getRoutingKey(MonitorID monitorID) {
-        return "*." + monitorID.getUserName() + "." + monitorID.getComputeResourceDescription().getIpAddresses().get(0);
-    }
-
-    public static String getChannelID(String userName,String hostAddress) {
-        return userName + "-" + hostAddress;
-    }
-
-    public static String getRoutingKey(String userName,String hostAddress) {
-        return "*." + userName + "." + hostAddress;
-    }
-
-    public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID, JobExecutionContext jobExecutionContext) throws AiravataMonitorException {
-        synchronized (queue) {
-            Iterator<UserMonitorData> iterator = queue.iterator();
-            while (iterator.hasNext()) {
-                UserMonitorData next = iterator.next();
-                if (next.getUserName().equals(monitorID.getUserName())) {
-                    // then this is the right place to update
-                    List<HostMonitorData> monitorIDs = next.getHostMonitorData();
-                    for (HostMonitorData host : monitorIDs) {
-                        if (isEqual(host.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) {
-                            // ok we found right place to add this monitorID
-                            host.addMonitorIDForHost(monitorID);
-                            logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
-                                    " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
-                            return;
-                        }
-                    }
-                    // there is a userMonitor object for this user name but no Hosts for this host
-                    // so we have to create new Hosts
-                    HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext);
-                    hostMonitorData.addMonitorIDForHost(monitorID);
-                    next.addHostMonitorData(hostMonitorData);
-                    logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
-                            " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
-                    return;
-                }
-            }
-            HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext);
-            hostMonitorData.addMonitorIDForHost(monitorID);
-
-            UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
-            userMonitorData.addHostMonitorData(hostMonitorData);
-            try {
-                queue.put(userMonitorData);
-                logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," +
-                        " task {}", monitorID.getExperimentID(), monitorID.getTaskID());
-            } catch (InterruptedException e) {
-                throw new AiravataMonitorException(e);
-            }
-        }
-    }
-
-    private static boolean isEqual(ComputeResourceDescription comRes_1, ComputeResourceDescription comRes_2) {
-        return comRes_1.getComputeResourceId().equals(comRes_2.getComputeResourceId()) &&
-                comRes_1.getHostName().equals(comRes_2.getHostName());
-    }
-
-    public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){
-        Iterator<MonitorID> iterator = queue.iterator();
-        while(iterator.hasNext()){
-            MonitorID next = iterator.next();
-            if (monitorID.getUserName().equals(next.getUserName()) &&
-                    CommonUtils.isEqual(monitorID.getComputeResourceDescription(), next.getComputeResourceDescription())) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * This method doesn't have to be synchronized because it will be invoked by HPCPullMonitor which already synchronized
-     * @param monitorID
-     * @throws AiravataMonitorException
-     */
-    public static void removeMonitorFromQueue(UserMonitorData userMonitorData, MonitorID monitorID) throws AiravataMonitorException {
-                if (userMonitorData.getUserName().equals(monitorID.getUserName())) {
-                    // then this is the right place to update
-                    List<HostMonitorData> hostMonitorData = userMonitorData.getHostMonitorData();
-                    Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator();
-                    while (iterator1.hasNext()) {
-                        HostMonitorData iHostMonitorID = iterator1.next();
-                        if (isEqual(iHostMonitorID.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) {
-                            Iterator<MonitorID> iterator2 = iHostMonitorID.getMonitorIDs().iterator();
-                            while (iterator2.hasNext()) {
-                                MonitorID iMonitorID = iterator2.next();
-                                if (iMonitorID.getJobID().equals(monitorID.getJobID())
-                                        || iMonitorID.getJobName().equals(monitorID.getJobName())) {
-                                    // OK we found the object, we cannot do list.remove(object) states of two objects
-                                    // could be different, thats why we check the jobID
-                                    iterator2.remove();
-                                    logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " +
-                                            "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString());
-
-                                    return;
-                                }
-                            }
-                        }
-                    }
-                }
-        logger.info("Cannot find the given MonitorID in the queue with userName " +
-                monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
-        logger.info("This might not be an error because someone else removed this job from the queue");
-    }
-
-
-    public static void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-
-        for (GFacHandlerConfig handlerClassName : handlers) {
-            Class<? extends GFacHandler> handlerClass;
-            GFacHandler handler;
-            try {
-                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                handler = handlerClass.newInstance();
-                handler.initProperties(handlerClassName.getProperties());
-            } catch (ClassNotFoundException e) {
-                logger.error(e.getMessage());
-                throw new GFacException("Cannot load handler class " + handlerClassName, e);
-            } catch (InstantiationException e) {
-                logger.error(e.getMessage());
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            } catch (IllegalAccessException e) {
-                logger.error(e.getMessage());
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            }
-            try {
-                handler.invoke(jobExecutionContext);
-            } catch (Exception e) {
-                // TODO: Better error reporting.
-                throw new GFacException("Error Executing a OutFlow Handler", e);
-            }
-        }
-    }
-
-        /**
-         *  Update job count for a given set of paths.
-         * @param curatorClient - CuratorFramework instance
-         * @param changeCountMap - map of change job count with relevant path
-         * @param isAdd - Should add or reduce existing job count by the given job count.
-         */
-    public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) {
-        StringBuilder changeZNodePaths = new StringBuilder();
-        try {
-            for (String path : changeCountMap.keySet()) {
-                if (isAdd) {
-                    CommonUtils.checkAndCreateZNode(curatorClient, path);
-                }
-                byte[] byteData = curatorClient.getData().forPath(path);
-                String nodeData;
-                if (byteData == null) {
-                    if (isAdd) {
-                        curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes());
-                    } else {
-                        // This is not possible, but we handle in case there any data zookeeper communication failure
-                        logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
-                        curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes());
-                    }
-                } else {
-                    nodeData = new String(byteData);
-                    if (isAdd) {
-                        curatorClient.setData().withVersion(-1).forPath(path,
-                                String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes());
-                    } else {
-                        int previousCount = Integer.parseInt(nodeData);
-                        int removeCount = changeCountMap.get(path);
-                        if (previousCount >= removeCount) {
-                            curatorClient.setData().withVersion(-1).forPath(path,
-                                    String.valueOf(previousCount - removeCount).getBytes());
-                        } else {
-                            // This is not possible, do we need to reset the job count to 0 ?
-                            logger.error("Requested remove job count is " + removeCount +
-                                    " which is higher than the existing job count " + previousCount
-                                    + " in  " + path + " path.");
-                        }
-                    }
-                }
-                changeZNodePaths.append(path).append(":");
-            }
-
-            // update stat node to trigger orchestrator watchers
-            if (changeCountMap.size() > 0) {
-                changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
-                curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes());
-            }
-        } catch (Exception e) {
-            logger.error("Error while writing job count to zookeeper", e);
-        }
-
-    }
-
-    /**
-     * Increase job count by one and update the zookeeper
-     * @param monitorID - Job monitorId
-     */
-    public static void increaseZkJobCount(MonitorID monitorID) {
-        Map<String, Integer> addMap = new HashMap<String, Integer>();
-        addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
-        updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true);
-    }
-
-    /**
-     * Construct and return the path for a given MonitorID , eg: /stat/{username}/{resourceName}/job
-     * @param monitorID - Job monitorId
-     * @return
-     */
-    public static String getJobCountUpdatePath(MonitorID monitorID){
-        return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
-                .append("/").append(monitorID.getComputeResourceDescription().getHostName()).append("/").append(Constants.JOB).toString();
-    }
-
-    /**
-     * Check whether znode is exist in given path if not create a new znode
-     * @param curatorClient - zookeeper instance
-     * @param path - path to check znode
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception {
-        if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist
-            if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode and check parent exist
-                checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/"))));
-            }
-            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
-        }
-    }
-}