You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/24 16:53:20 UTC

git commit: Supporting user based monitoring with pull - AIRAVATA-1023

Repository: airavata
Updated Branches:
  refs/heads/master 6b90e6427 -> a052f5be8


Supporting user based monitoring with pull - AIRAVATA-1023


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a052f5be
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a052f5be
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a052f5be

Branch: refs/heads/master
Commit: a052f5be8642a91c3252293887299a728dfb6129
Parents: 6b90e64
Author: lahiru <la...@apache.org>
Authored: Mon Mar 24 11:53:07 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Mar 24 11:53:07 2014 -0400

----------------------------------------------------------------------
 .../apache/airavata/gsi/ssh/api/Cluster.java    |  13 +-
 .../ssh/api/job/JobManagerConfiguration.java    |   2 +
 .../airavata/gsi/ssh/api/job/OutputParser.java  |   2 +-
 .../gsi/ssh/api/job/PBSJobConfiguration.java    |  15 +-
 .../gsi/ssh/api/job/PBSOutputParser.java        |  33 +++-
 .../gsi/ssh/api/job/SlurmJobConfiguration.java  |   4 +
 .../gsi/ssh/api/job/SlurmOutputParser.java      |   2 +-
 .../gsi/ssh/impl/GSISSHAbstractCluster.java     |   9 +
 .../airavata/job/monitor/HostMonitorData.java   |  69 ++++++++
 .../airavata/job/monitor/MonitorManager.java    |  42 ++---
 .../airavata/job/monitor/UserMonitorData.java   |  76 +++++++++
 .../job/monitor/core/MessageParser.java         |   5 +-
 .../airavata/job/monitor/core/PushMonitor.java  |   3 +-
 .../job/monitor/impl/LocalJobMonitor.java       |   2 +-
 .../monitor/impl/pull/qstat/QstatMonitor.java   | 170 ++++++++++---------
 .../impl/pull/qstat/ResourceConnection.java     |  42 +++++
 .../job/monitor/impl/push/amqp/AMQPMonitor.java |  81 ++++-----
 .../monitor/impl/push/amqp/BasicConsumer.java   |  10 +-
 .../impl/push/amqp/JSONMessageParser.java       |   3 +-
 .../airavata/job/monitor/util/CommonUtils.java  |  83 +++++++++
 20 files changed, 501 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
index 1f49bc7..4454624 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.gsi.ssh.api;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.JobStatus;
@@ -104,7 +105,17 @@ public interface Cluster {
      * @throws SSHApiException throws exception during error
      */
     public JobStatus getJobStatus(String jobID) throws SSHApiException;
-    
+
+    /**
+     * This method can be used to poll the jobstatuses based on the given
+     * user but we should pass the jobID list otherwise we will get unwanted
+     * job statuses which submitted by different middleware outside apache
+     * airavata with the same uername which we are not considering
+     * @param userName userName of the jobs which required to get the status
+     * @param jobIDs precises set of jobIDs
+     * @return
+     */
+    public void getJobStatuses(String userName,Map<String,JobStatus> jobIDs)throws SSHApiException;
     /**
      * This will list directories in computing resources
      * @param directoryPath the full qualified path for the directory user wants to create

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
index c5d1585..f31b6f5 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
@@ -30,6 +30,8 @@ public interface JobManagerConfiguration {
 
     public RawCommandInfo getMonitorCommand(String jobID);
 
+    public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
     public String getScriptExtension();
 
      public RawCommandInfo getSubmitCommand(String workingDirectory,String pbsFilePath);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
index 7c97426..0c1b2ad 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
@@ -55,5 +55,5 @@ public interface OutputParser {
      * @param statusMap list of status map will return and key will be the job ID
      * @param rawOutput
      */
-    public void parse(Map<String,JobStatus> statusMap, String rawOutput)throws SSHApiException;
+    public void parse(String userName,Map<String,JobStatus> statusMap, String rawOutput)throws SSHApiException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
index 9e07df5..18ea772 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
@@ -25,7 +25,7 @@ import org.apache.commons.io.FilenameUtils;
 
 import java.io.File;
 
-public class PBSJobConfiguration implements JobManagerConfiguration{
+public class PBSJobConfiguration implements JobManagerConfiguration {
 
     private String jobDescriptionTemplateName;
 
@@ -35,11 +35,12 @@ public class PBSJobConfiguration implements JobManagerConfiguration{
 
     private OutputParser parser;
 
-    public PBSJobConfiguration(){
+    public PBSJobConfiguration() {
         // this can be used to construct and use setter methods to set all the params in order
     }
+
     public PBSJobConfiguration(String jobDescriptionTemplateName,
-                                   String scriptExtension,String installedPath,OutputParser parser) {
+                               String scriptExtension, String installedPath, OutputParser parser) {
         this.jobDescriptionTemplateName = jobDescriptionTemplateName;
         this.scriptExtension = scriptExtension;
         this.parser = parser;
@@ -70,8 +71,8 @@ public class PBSJobConfiguration implements JobManagerConfiguration{
         return scriptExtension;
     }
 
-    public RawCommandInfo getSubmitCommand(String workingDirectory,String pbsFilePath) {
-          return new RawCommandInfo(this.installedPath + "qsub " +
+    public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+        return new RawCommandInfo(this.installedPath + "qsub " +
                 workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
     }
 
@@ -90,4 +91,8 @@ public class PBSJobConfiguration implements JobManagerConfiguration{
     public void setParser(OutputParser parser) {
         this.parser = parser;
     }
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+        return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
index 1a4fb21..d765ba5 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
@@ -20,11 +20,15 @@
 */
 package org.apache.airavata.gsi.ssh.api.job;
 
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.impl.JobStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.print.attribute.standard.JobState;
 import javax.validation.constraints.Null;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class PBSOutputParser implements OutputParser {
@@ -152,7 +156,32 @@ public class PBSOutputParser implements OutputParser {
         return null;
     }
 
-    public void parse(Map<String, JobStatus> statusMap, String rawOutput) {
-        //To change body of implemented methods use File | Settings | File Templates.
+    public void parse(String userName, Map<String, JobStatus> statusMap, String rawOutput) {
+        log.debug(rawOutput);
+        String[] info = rawOutput.split("\n");
+
+        int lastStop = 0;
+        for (String jobID : statusMap.keySet()) {
+            for(int i=lastStop;i<info.length;i++){
+               if(jobID.contains(info[i].split(" ")[0]) && !"".equals(info[i].split(" ")[0])){
+                   // now starts processing this line
+                   log.info(info[i]);
+                   String correctLine = info[i];
+                   String[] columns = correctLine.split(" ");
+                   List<String> columnList = new ArrayList<String>();
+                   for (String s : columns) {
+                       if (!"".equals(s)) {
+                           columnList.add(s);
+                       }
+                   }
+                   lastStop = i+1;
+                   statusMap.put(jobID, JobStatus.valueOf(columnList.get(9)));
+                   break;
+               }
+            }
+
+        }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
index 4bf4b00..798827b 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
@@ -90,4 +90,8 @@ public class SlurmJobConfiguration implements JobManagerConfiguration{
     public void setParser(OutputParser parser) {
         this.parser = parser;
     }
+
+    public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+        return new RawCommandInfo(this.installedPath + "squeue -u " + userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
index d118ee5..37d6d71 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
@@ -145,7 +145,7 @@ public class SlurmOutputParser implements OutputParser {
         return JobStatus.valueOf("U");
     }
 
-    public void parse(Map<String, JobStatus> statusMap, String rawOutput)throws SSHApiException {
+    public void parse(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws SSHApiException {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 0a26589..0ddea0f 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -48,6 +48,7 @@ import java.io.StringWriter;
 import java.net.URL;
 import java.security.SecureRandom;
 import java.util.List;
+import java.util.Map;
 
 public class GSISSHAbstractCluster implements Cluster {
     static {
@@ -406,6 +407,14 @@ public class GSISSHAbstractCluster implements Cluster {
         }
     }
 
+    public void getJobStatuses(String userName, Map<String,JobStatus> jobIDs)throws SSHApiException {
+        RawCommandInfo rawCommandInfo = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+        StandardOutReader stdOutReader = new StandardOutReader();
+        CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
+        String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !");
+        jobManagerConfiguration.getParser().parse(userName,jobIDs, result);
+    }
+
     public ServerInfo getServerInfo() {
         return serverInfo;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java
new file mode 100644
index 0000000..6e5fde9
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+    private HostDescription host;
+
+    private List<MonitorID> monitorIDs;
+
+    public HostMonitorData(HostDescription host) {
+        this.host = host;
+        monitorIDs = new ArrayList<MonitorID>();
+    }
+
+    public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) {
+        this.host = host;
+        this.monitorIDs = monitorIDs;
+    }
+
+    public HostDescription getHost() {
+        return host;
+    }
+
+    public void setHost(HostDescription host) {
+        this.host = host;
+    }
+
+    public List<MonitorID> getMonitorIDs() {
+        return monitorIDs;
+    }
+
+    public void setMonitorIDs(List<MonitorID> monitorIDs) {
+        this.monitorIDs = monitorIDs;
+    }
+
+    /**
+     * this method get called by CommonUtils and it will check the right place before adding
+     * so there will not be a mismatch between this.host and monitorID.host
+     * @param monitorID
+     * @throws AiravataMonitorException
+     */
+    public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException{
+        monitorIDs.add(monitorID);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index b819e2b..e1c2cb8 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -31,10 +31,10 @@ import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
+import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
 import org.apache.airavata.schemas.gfac.GlobusHostType;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,9 +56,9 @@ public class MonitorManager {
 
     private List<PushMonitor> pushMonitors;   //todo we need to support multiple monitors dynamically
 
-    private BlockingQueue<MonitorID> pullQueue;
+    private BlockingQueue<UserMonitorData> pullQueue;
 
-    private BlockingQueue<MonitorID> pushQueue;
+    private BlockingQueue<UserMonitorData> pushQueue;
 
     private BlockingQueue<MonitorID> localJobQueue;
 
@@ -74,8 +74,8 @@ public class MonitorManager {
     public MonitorManager() {
         pullMonitors = new ArrayList<PullMonitor>();
         pushMonitors = new ArrayList<PushMonitor>();
-        pullQueue = new LinkedBlockingQueue<MonitorID>();
-        pushQueue = new LinkedBlockingQueue<MonitorID>();
+        pullQueue = new LinkedBlockingQueue<UserMonitorData>();
+        pushQueue = new LinkedBlockingQueue<UserMonitorData>();
         finishQueue = new LinkedBlockingQueue<MonitorID>();
         localJobQueue = new LinkedBlockingQueue<MonitorID>();
         monitorPublisher = new MonitorPublisher(new EventBus());
@@ -156,14 +156,15 @@ public class MonitorManager {
      * @param monitorID
      * @throws AiravataMonitorException
      */
-    public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException {
+    public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
+
         if (monitorID.getHost().getType() instanceof GsisshHostType) {
             GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
             if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
                     || Constants.PULL.equals(host.getMonitorMode())) {
-                pullQueue.add(monitorID);
+                CommonUtils.addMonitortoQueue(pullQueue, monitorID);
             } else if (Constants.PUSH.equals(host.getMonitorMode())) {
-                pushQueue.add(monitorID);
+                CommonUtils.addMonitortoQueue(pushQueue, monitorID);
             }
         } else if(monitorID.getHost().getType() instanceof GlobusHostType){
             logger.error("Monitoring does not support GlubusHostType resources");
@@ -194,14 +195,15 @@ public class MonitorManager {
             (new Thread(monitor)).start();
         }
 
-        for (PushMonitor monitor : pushMonitors) {
-            (new Thread(monitor)).start();
-            if (monitor instanceof AMQPMonitor) {
-                UnRegisterThread unRegisterThread = new
-                        UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
-                unRegisterThread.start();
-            }
-        }
+        //todo fix this
+//        for (PushMonitor monitor : pushMonitors) {
+//            (new Thread(monitor)).start();
+//            if (monitor instanceof AMQPMonitor) {
+//                UnRegisterThread unRegisterThread = new
+//                        UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
+//                unRegisterThread.start();
+//            }
+//        }
     }
 
     /* getter setters for the private variables */
@@ -222,11 +224,11 @@ public class MonitorManager {
         this.pushMonitors = pushMonitors;
     }
 
-    public BlockingQueue<MonitorID> getPullQueue() {
+    public BlockingQueue<UserMonitorData> getPullQueue() {
         return pullQueue;
     }
 
-    public void setPullQueue(BlockingQueue<MonitorID> pullQueue) {
+    public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
         this.pullQueue = pullQueue;
     }
 
@@ -246,11 +248,11 @@ public class MonitorManager {
         this.finishQueue = finishQueue;
     }
 
-    public BlockingQueue<MonitorID> getPushQueue() {
+    public BlockingQueue<UserMonitorData> getPushQueue() {
         return pushQueue;
     }
 
-    public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
+    public void setPushQueue(BlockingQueue<UserMonitorData> pushQueue) {
         this.pushQueue = pushQueue;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java
new file mode 100644
index 0000000..13c177a
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+    private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+    private String  userName;
+
+    private List<HostMonitorData> hostMonitorData;
+
+
+    public UserMonitorData(String userName) {
+        this.userName = userName;
+        hostMonitorData = new ArrayList<HostMonitorData>();
+    }
+
+    public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+        this.hostMonitorData = hostMonitorDataList;
+        this.userName = userName;
+    }
+
+    public List<HostMonitorData> getHostMonitorData() {
+        return hostMonitorData;
+    }
+
+    public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+        this.hostMonitorData = hostMonitorData;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /*
+    This method will add element to the MonitorID list, user should not
+    duplicate it, we do not check it because its going to be used by airavata
+    so we have to use carefully and this method will add a host if its a new host
+     */
+    public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+        this.hostMonitorData.add(hostMonitorData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
index c70e372..d35fde1 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.core;
 
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
 
@@ -38,8 +39,8 @@ public interface MessageParser {
      * we have to makesure the correct message is given to the messageparser
      * parse method, it will not do any filtering
      * @param message content of the message
-     * @param monitorID monitorID object
+     * @param userMonitorData monitorID object
      * @return
      */
-    JobStatus parseMessage(String message,MonitorID monitorID)throws AiravataMonitorException;
+    JobStatus parseMessage(String message,UserMonitorData userMonitorData)throws AiravataMonitorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
index e3ecccd..77ae1e7 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.core;
 
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 
 /**
@@ -38,7 +39,7 @@ public abstract class PushMonitor extends AiravataAbstractMonitor {
      * @param monitorID
      * @return
      */
-    public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+    public abstract boolean registerListener(UserMonitorData monitorID)throws AiravataMonitorException;
 
     /**
      * This method can be invoked to unregister a listener with the

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
index c20eef2..8b9ebfd 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -43,7 +43,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor {
                 MonitorID take = jobQueue.take();
                 getPublisher().publish(new JobStatus(take, JobState.COMPLETE));
             } catch (Exception e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                e.printStackTrace();
             }
         } while (!ServerSettings.isStopAllThreads());
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index ee1e6fe..8bb33bd 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -20,24 +20,23 @@
 */
 package org.apache.airavata.job.monitor.impl.pull.qstat;
 
-import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Timestamp;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 
 /**
@@ -48,7 +47,7 @@ public class QstatMonitor extends PullMonitor {
     private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
 
     // I think this should use DelayedBlocking Queue to do the monitoring*/
-    private BlockingQueue<MonitorID> queue;
+    private BlockingQueue<UserMonitorData> queue;
 
     private boolean startPulling = false;
 
@@ -59,7 +58,7 @@ public class QstatMonitor extends PullMonitor {
     public QstatMonitor(){
         connections = new HashMap<String, ResourceConnection>();
     }
-    public QstatMonitor(BlockingQueue<MonitorID> queue, MonitorPublisher publisher) {
+    public QstatMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
         this.queue = queue;
         this.publisher = publisher;
         connections = new HashMap<String, ResourceConnection>();
@@ -105,98 +104,111 @@ public class QstatMonitor extends PullMonitor {
     public boolean startPulling() throws AiravataMonitorException {
         // take the top element in the queue and pull the data and put that element
         // at the tail of the queue
-        MonitorID take = null;
+        //todo this polling will not work with multiple usernames but with single user
+        // and multiple hosts, currently monitoring will work
+        UserMonitorData take = null;
         JobStatus jobStatus = new JobStatus();
+        MonitorID currentMonitorID = null;
         try {
-                take = this.queue.take();
-                if((take.getHost().getType() instanceof GsisshHostType)){
-                    long monitorDiff = 0;
-                    long startedDiff = 0;
-                    if (take.getLastMonitored() != null) {
-                        monitorDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getLastMonitored().getTime();
-                        startedDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getJobStartedTime().getTime();
-                        //todo implement an algorithm to delay the monitor based no start time, we have to delay monitoring
-                        //todo  for long running jobs
-//                    System.out.println(monitorDiff + "-" + startedDiff);
-                        if ((monitorDiff / 1000) < 5) {
-                            // its too early to monitor this job, so we put it at the tail of the queue
-                            this.queue.put(take);
-                        }
+            take = this.queue.take();
+            List<MonitorID> completedJobs = new ArrayList<MonitorID>();
+            List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
+            for (HostMonitorData iHostMonitorData : hostMonitorData) {
+                if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) {
+                    GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType();
+                    String hostName = gsisshHostType.getHostAddress();
+                    ResourceConnection connection = null;
+                    if (connections.containsKey(hostName)) {
+                        logger.debug("We already have this connection so not going to create one");
+                        connection = connections.get(hostName);
+                    } else {
+                        connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath());
+                        connections.put(hostName, connection);
                     }
-                    if (take.getLastMonitored() == null || ((monitorDiff / 1000) >= 5)) {
-                        GsisshHostType gsisshHostType = (GsisshHostType) take.getHost().getType();
-                        String hostName = gsisshHostType.getHostAddress();
-                        ResourceConnection connection = null;
-                        if (connections.containsKey(hostName)) {
-                            logger.debug("We already have this connection so not going to create one");
-                            connection = connections.get(hostName);
-                        } else {
-                            connection = new ResourceConnection(take, gsisshHostType.getInstalledPath());
-                            connections.put(hostName, connection);
-                        }
-                        take.setStatus(connection.getJobStatus(take));
-                        jobStatus.setMonitorID(take);
-                        jobStatus.setState(take.getStatus());
+                    List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+                    Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID);
+                    for (MonitorID iMonitorID : monitorID) {
+                        currentMonitorID = iMonitorID;
+                        iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
+                        jobStatus.setMonitorID(iMonitorID);
+                        jobStatus.setState(iMonitorID.getStatus());
                         // we have this JobStatus class to handle amqp monitoring
+
                         publisher.publish(jobStatus);
                         // if the job is completed we do not have to put the job to the queue again
+                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+
+                        // After successful monitoring perform following actions to cleanup the queue, if necessary
                         if (!jobStatus.getState().equals(JobState.COMPLETE)) {
-                            take.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            this.queue.put(take);
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                        }else if(iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)){
+                            completedJobs.add(iMonitorID);
+                        } else {
+                            // if the job is complete we remove it from the Map, if any of these maps
+                            // get empty this userMonitorData will get delete from the queue
+                            completedJobs.add(iMonitorID);
                         }
                     }
                 } else {
                     logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
                 }
-            } catch (InterruptedException e) {
-                if(!this.queue.contains(take)){
-                    try {
-                        this.queue.put(take);
-                    } catch (InterruptedException e1) {
-                        e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                }
-                logger.error("Error handling the job with Job ID:" + take.getJobID());
-                throw new AiravataMonitorException(e);
-            } catch (SSHApiException e) {
-                logger.error(e.getMessage());
-                if(e.getMessage().contains("Unknown Job Id Error")){
-                    // in this case job is finished or may be the given job ID is wrong
-                    jobStatus.setState(JobState.UNKNOWN);
-                    publisher.publish(jobStatus);
-                }else if(e.getMessage().contains("illegally formed job identifier")){
-                   logger.error("Wrong job ID is given so dropping the job from monitoring system");
-                } else if (!this.queue.contains(take)) {   // we put the job back to the queue only if its state is not unknown
-                    if (take.getFailedCount() < 2) {
-                        try {
-                            take.setFailedCount(take.getFailedCount() + 1);
-                            this.queue.put(take);
-                        } catch (InterruptedException e1) {
-                            e1.printStackTrace();
-                        }
-                    } else {
-                        logger.error(e.getMessage());
-                        logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID());
-                    }
+            }
+            // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
+            // now the userMonitorData goes back to the tail of the queue
+            queue.put(take);
+            // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
+            // they become empty
+            for(MonitorID completedJob:completedJobs){
+                CommonUtils.removeMonitorFromQueue(queue,completedJob);
+            }
+        } catch (InterruptedException e) {
+            if (!this.queue.contains(take)) {
+                try {
+                    this.queue.put(take);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                 }
-                throw new AiravataMonitorException("Error retrieving the job status", e);
-            } catch (Exception e){
-                if (take.getFailedCount() < 3) {
+            }
+            logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
+            throw new AiravataMonitorException(e);
+        } catch (SSHApiException e) {
+            logger.error(e.getMessage());
+            if (e.getMessage().contains("Unknown Job Id Error")) {
+                // in this case job is finished or may be the given job ID is wrong
+                jobStatus.setState(JobState.UNKNOWN);
+                publisher.publish(jobStatus);
+            } else if (e.getMessage().contains("illegally formed job identifier")) {
+                logger.error("Wrong job ID is given so dropping the job from monitoring system");
+            } else if (!this.queue.contains(take)) {   // we put the job back to the queue only if its state is not unknown
+                if (currentMonitorID.getFailedCount() < 2) {
                     try {
-                        take.setFailedCount(take.getFailedCount() + 1);
+                        currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
                         this.queue.put(take);
-                        // if we get a wrong status we wait for a while and request again
-                            Thread.sleep(10000);
                     } catch (InterruptedException e1) {
                         e1.printStackTrace();
                     }
                 } else {
                     logger.error(e.getMessage());
-                    logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID());
+                    logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
                 }
-                throw new AiravataMonitorException("Error retrieving the job status", e);
             }
-
+            throw new AiravataMonitorException("Error retrieving the job status", e);
+        } catch (Exception e) {
+            if (currentMonitorID.getFailedCount() < 3) {
+                try {
+                    currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
+                    this.queue.put(take);
+                    // if we get a wrong status we wait for a while and request again
+                    Thread.sleep(10000);
+                } catch (InterruptedException e1) {
+                    e1.printStackTrace();
+                }
+            } else {
+                logger.error(e.getMessage());
+                logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
+            }
+            throw new AiravataMonitorException("Error retrieving the job status", e);
+        }
 
 
         return true;
@@ -221,11 +233,11 @@ public class QstatMonitor extends PullMonitor {
         this.publisher = publisher;
     }
 
-    public BlockingQueue<MonitorID> getQueue() {
+    public BlockingQueue<UserMonitorData> getQueue() {
         return queue;
     }
 
-    public void setQueue(BlockingQueue<MonitorID> queue) {
+    public void setQueue(BlockingQueue<UserMonitorData> queue) {
         this.queue = queue;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
index ade4420..9dc1866 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
@@ -24,14 +24,21 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.ServerInfo;
 import org.apache.airavata.gsi.ssh.api.authentication.*;
 import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 
 public class ResourceConnection {
     private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
@@ -59,6 +66,25 @@ public class ResourceConnection {
         cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
     }
 
+    public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException {
+        AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo();
+        String hostAddress = hostMonitorData.getHost().getType().getHostAddress();
+        String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager();
+        JobManagerConfiguration jConfig = null;
+        if (jobManager == null) {
+            log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+            jConfig = CommonUtils.getPBSJobManager(installedPath);
+        } else {
+            if (org.apache.airavata.job.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) {
+                jConfig = CommonUtils.getPBSJobManager(installedPath);
+            } else if(org.apache.airavata.job.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) {
+                jConfig = CommonUtils.getSLURMJobManager(installedPath);
+            }
+            //todo support br2 etc
+        }
+        ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort());
+        cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+    }
     public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
         String jobID = monitorID.getJobID();
         //todo so currently we execute the qstat for each job but we can use user based monitoring
@@ -66,6 +92,22 @@ public class ResourceConnection {
         return getStatusFromString(cluster.getJobStatus(jobID).toString());
     }
 
+    public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException {
+        Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>();
+        Map<String,JobState> treeMap1 = new TreeMap<String,JobState>();
+        // creating a sorted map with all the jobIds and with the predefined
+        // status as UNKNOWN
+        for (MonitorID monitorID : monitorIDs) {
+            treeMap.put(monitorID.getJobID(), JobStatus.U);
+        }
+        //todo so currently we execute the qstat for each job but we can use user based monitoring
+        //todo or we should concatenate all the commands and execute them in one go and parse the response
+        cluster.getJobStatuses(userName,treeMap);
+        for(String key:treeMap.keySet()){
+            treeMap1.put(key,getStatusFromString(treeMap.get(key).toString()));
+        }
+        return treeMap1;
+    }
     private JobState getStatusFromString(String status) {
         log.info("parsing the job status returned : " + status);
         if(status != null){

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 06d21a1..d269bab 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -24,7 +24,9 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
@@ -55,7 +57,7 @@ public class AMQPMonitor extends PushMonitor {
 
     private MonitorPublisher publisher;
 
-    private BlockingQueue<MonitorID> runningQueue;
+    private BlockingQueue<UserMonitorData> runningQueue;
 
     private BlockingQueue<MonitorID> finishQueue;
 
@@ -70,7 +72,8 @@ public class AMQPMonitor extends PushMonitor {
     public AMQPMonitor(){
 
     }
-    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue,
+    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<UserMonitorData> runningQueue,
+                       BlockingQueue<MonitorID> finishQueue,
                        String proxyPath,String connectionName,List<String> hosts) {
         this.publisher = publisher;
         this.runningQueue = runningQueue;        // these will be initialized by the MonitorManager
@@ -89,33 +92,33 @@ public class AMQPMonitor extends PushMonitor {
     }
 
     @Override
-    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
-         // do initial check before creating a channel, otherwise resources will be waste
-        // and channel id will be malformed
-        // this check is not implemented in MonitorManager because it depends on
-        // the Monitoring implementation (what data is required)
-        checkMonitorID(monitorID);
-        String channelID = CommonUtils.getChannelID(monitorID);
-        System.out.println("Going to start monitoring job with ID: " + monitorID.getJobID());
-        logger.info("Going to start monitoring job with ID: " + monitorID.getJobID());
-        // if we already have a channel we do not create one
-        if (availableChannels.get(channelID) == null) {
-            //todo need to fix this rather getting it from a file
-            Connection connection = AMQPConnectionUtil.connect(amqpHosts,connectionName, proxyPath);
-            Channel channel = null;
-            try {
-                channel = connection.createChannel();
-                String queueName = channel.queueDeclare().getQueue();
-
-                BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, monitorID);
-                channel.basicConsume(queueName, true, consumer);
-                String filterString = CommonUtils.getRoutingKey(monitorID);
-                // here we queuebind to a particular user in a particular machine
-                channel.queueBind(queueName, "glue2.computing_activity", filterString);
-                logger.info("Using filtering string to monitor: " + filterString);
-            } catch (IOException e) {
-                logger.error("Error creating the connection to finishQueue the job:" + monitorID.getJobID());
+    public boolean registerListener(UserMonitorData userMonitorData) throws AiravataMonitorException {
+        List<HostMonitorData> hostNames = userMonitorData.getHostMonitorData();
+        String userName = userMonitorData.getUserName();
+        for (HostMonitorData host : hostNames) {
+            // with amqp monitor we do not use individual monitorID list but
+            // we subscribe to read user-host based subscription
+            String hostAddress = host.getHost().getType().getHostAddress();
+            String channelID = CommonUtils.getChannelID(userName, hostAddress);
+            if (availableChannels.get(channelID) == null) {
+                //todo need to fix this rather getting it from a file
+                Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+                Channel channel = null;
+                try {
+                    channel = connection.createChannel();
+                    String queueName = channel.queueDeclare().getQueue();
+
+                    BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, userMonitorData);
+                    channel.basicConsume(queueName, true, consumer);
+                    String filterString = CommonUtils.getRoutingKey(userName, hostAddress);
+                    // here we queuebind to a particular user in a particular machine
+                    channel.queueBind(queueName, "glue2.computing_activity", filterString);
+                    logger.info("Using filtering string to monitor: " + filterString);
+                } catch (IOException e) {
+                    logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
+                }
             }
+
         }
         return true;
     }
@@ -125,7 +128,7 @@ public class AMQPMonitor extends PushMonitor {
         startRegister = true; // this will be unset by someone else
         while (startRegister || !ServerSettings.isStopAllThreads()) {
             try {
-                MonitorID take = runningQueue.take();
+                UserMonitorData take = runningQueue.take();
                 this.registerListener(take);
             } catch (AiravataMonitorException e) { // catch any exceptino inside the loop
                 e.printStackTrace();
@@ -148,21 +151,7 @@ public class AMQPMonitor extends PushMonitor {
 
 
 
-    private void checkMonitorID(MonitorID monitorID) throws AiravataMonitorException {
-        if (monitorID.getUserName() == null) {
-            String error = "Username has to be given for monitoring";
-            logger.error(error);
-            throw new AiravataMonitorException(error);
-        } else if (monitorID.getHost() == null) {
-            String error = "Host has to be given for monitoring";
-            logger.error(error);
-            throw new AiravataMonitorException(error);
-        } else if (monitorID.getJobID() == null) {
-            String error = "JobID has to be given for monitoring";
-            logger.error(error);
-            throw new AiravataMonitorException(error);
-        }
-    }
+
 
 
     @Override
@@ -207,11 +196,11 @@ public class AMQPMonitor extends PushMonitor {
         this.publisher = publisher;
     }
 
-    public BlockingQueue<MonitorID> getRunningQueue() {
+    public BlockingQueue<UserMonitorData> getRunningQueue() {
         return runningQueue;
     }
 
-    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
+    public void setRunningQueue(BlockingQueue<UserMonitorData> runningQueue) {
         this.runningQueue = runningQueue;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index ad25b95..8104444 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -24,7 +24,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Consumer;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.MessageParser;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
@@ -38,12 +38,12 @@ public class BasicConsumer implements Consumer {
 
     MonitorPublisher publisher;
 
-    MonitorID monitorID;
+    UserMonitorData userMonitorData;
 
-    public BasicConsumer(MessageParser parser, MonitorPublisher publisher, MonitorID monitorID) {
+    public BasicConsumer(MessageParser parser, MonitorPublisher publisher, UserMonitorData userMonitorData) {
         this.parser = parser;
         this.publisher = publisher;
-        this.monitorID = monitorID;
+        this.userMonitorData = userMonitorData;
     }
 
     public void handleCancel(java.lang.String consumerTag) {
@@ -68,7 +68,7 @@ public class BasicConsumer implements Consumer {
         // to the Event bus, this will be picked by
         // AiravataJobStatusUpdator and store in to registry
         try {
-            publisher.publish(parser.parseMessage(message,monitorID));
+            publisher.publish(parser.parseMessage(message, userMonitorData));
         } catch (AiravataMonitorException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
index dd9d2e4..8544bff 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationConfig;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.airavata.ComputingActivity;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
 import org.apache.airavata.job.monitor.core.MessageParser;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
@@ -38,7 +39,7 @@ import java.util.List;
 public class JSONMessageParser implements MessageParser {
     private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
 
-    public JobStatus parseMessage(String message, MonitorID monitorID)throws AiravataMonitorException{
+    public JobStatus parseMessage(String message, UserMonitorData userMonitorData)throws AiravataMonitorException{
         /*todo write a json message parser here*/
         logger.info("Mesage parse invoked");
         System.out.println(message);

http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
index a6cd465..db85625 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -21,9 +21,16 @@
 package org.apache.airavata.job.monitor.util;
 
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.job.monitor.HostMonitorData;
 import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
 public class CommonUtils {
     public static boolean isPBSHost(HostDescription host){
         if("pbs".equals(((GsisshHostType)host.getType()).getJobManager()) ||
@@ -49,4 +56,80 @@ public class CommonUtils {
     public static String getRoutingKey(MonitorID monitorID) {
         return "*." + monitorID.getUserName() + "." + monitorID.getHost().getType().getHostAddress();
     }
+
+    public static String getChannelID(String userName,String hostAddress) {
+        return userName + "-" + hostAddress;
+    }
+
+    public static String getRoutingKey(String userName,String hostAddress) {
+        return "*." + userName + "." + hostAddress;
+    }
+
+    public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException {
+        Iterator<UserMonitorData> iterator = queue.iterator();
+        while (iterator.hasNext()) {
+            UserMonitorData next = iterator.next();
+            if (next.getUserName().equals(monitorID.getUserName())) {
+                // then this is the right place to update
+                List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+                for (HostMonitorData host : monitorIDs) {
+                    if (host.getHost().equals(monitorID.getHost())) {
+                        // ok we found right place to add this monitorID
+                        host.addMonitorIDForHost(monitorID);
+                        return;
+                    }
+                }
+                // there is a userMonitor object for this user name but no Hosts for this host
+                // so we have to create new Hosts
+                HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+                hostMonitorData.addMonitorIDForHost(monitorID);
+                next.addHostMonitorData(hostMonitorData);
+                return;
+            }
+        }
+        HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+        hostMonitorData.addMonitorIDForHost(monitorID);
+
+        UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+        userMonitorData.addHostMonitorData(hostMonitorData);
+        try {
+            queue.put(userMonitorData);
+        } catch (InterruptedException e) {
+            throw new AiravataMonitorException(e);
+        }
+    }
+
+    public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID monitorID) throws AiravataMonitorException {
+        Iterator<UserMonitorData> iterator = queue.iterator();
+        while(iterator.hasNext()){
+            UserMonitorData next = iterator.next();
+            if(next.getUserName().equals(monitorID.getUserName())){
+                // then this is the right place to update
+                List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
+                for(HostMonitorData iHostMonitorID:hostMonitorData){
+                    if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
+                        List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
+                        for(MonitorID iMonitorID:monitorIDs){
+                            if(iMonitorID.getJobID().equals(monitorID.getJobID())) {
+                                // OK we found the object, we cannot do list.remove(object) states of two objects
+                                // could be different, thats why we check the jobID
+                                monitorIDs.remove(iMonitorID);
+                                if(monitorIDs.size()==0) {
+                                    hostMonitorData.remove(iHostMonitorID);
+                                    if (hostMonitorData.size() == 0) {
+                                        // no useful data so we have to remove the element from the queue
+                                        queue.remove(next);
+                                    }
+                                }
+                                return;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " +
+                monitorID.getUserName() + "  and jobID " + monitorID.getJobID());
+
+    }
 }