You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/24 16:53:20 UTC
git commit: Supporting user based monitoring with pull - AIRAVATA-1023
Repository: airavata
Updated Branches:
refs/heads/master 6b90e6427 -> a052f5be8
Supporting user based monitoring with pull - AIRAVATA-1023
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a052f5be
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a052f5be
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a052f5be
Branch: refs/heads/master
Commit: a052f5be8642a91c3252293887299a728dfb6129
Parents: 6b90e64
Author: lahiru <la...@apache.org>
Authored: Mon Mar 24 11:53:07 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Mar 24 11:53:07 2014 -0400
----------------------------------------------------------------------
.../apache/airavata/gsi/ssh/api/Cluster.java | 13 +-
.../ssh/api/job/JobManagerConfiguration.java | 2 +
.../airavata/gsi/ssh/api/job/OutputParser.java | 2 +-
.../gsi/ssh/api/job/PBSJobConfiguration.java | 15 +-
.../gsi/ssh/api/job/PBSOutputParser.java | 33 +++-
.../gsi/ssh/api/job/SlurmJobConfiguration.java | 4 +
.../gsi/ssh/api/job/SlurmOutputParser.java | 2 +-
.../gsi/ssh/impl/GSISSHAbstractCluster.java | 9 +
.../airavata/job/monitor/HostMonitorData.java | 69 ++++++++
.../airavata/job/monitor/MonitorManager.java | 42 ++---
.../airavata/job/monitor/UserMonitorData.java | 76 +++++++++
.../job/monitor/core/MessageParser.java | 5 +-
.../airavata/job/monitor/core/PushMonitor.java | 3 +-
.../job/monitor/impl/LocalJobMonitor.java | 2 +-
.../monitor/impl/pull/qstat/QstatMonitor.java | 170 ++++++++++---------
.../impl/pull/qstat/ResourceConnection.java | 42 +++++
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 81 ++++-----
.../monitor/impl/push/amqp/BasicConsumer.java | 10 +-
.../impl/push/amqp/JSONMessageParser.java | 3 +-
.../airavata/job/monitor/util/CommonUtils.java | 83 +++++++++
20 files changed, 501 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
index 1f49bc7..4454624 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/Cluster.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gsi.ssh.api;
import java.util.List;
+import java.util.Map;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.JobStatus;
@@ -104,7 +105,17 @@ public interface Cluster {
* @throws SSHApiException throws exception during error
*/
public JobStatus getJobStatus(String jobID) throws SSHApiException;
-
+
+ /**
+ * This method can be used to poll the jobstatuses based on the given
+ * user but we should pass the jobID list otherwise we will get unwanted
+ * job statuses which submitted by different middleware outside apache
+ * airavata with the same uername which we are not considering
+ * @param userName userName of the jobs which required to get the status
+ * @param jobIDs precises set of jobIDs
+ * @return
+ */
+ public void getJobStatuses(String userName,Map<String,JobStatus> jobIDs)throws SSHApiException;
/**
* This will list directories in computing resources
* @param directoryPath the full qualified path for the directory user wants to create
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
index c5d1585..f31b6f5 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/JobManagerConfiguration.java
@@ -30,6 +30,8 @@ public interface JobManagerConfiguration {
public RawCommandInfo getMonitorCommand(String jobID);
+ public RawCommandInfo getUserBasedMonitorCommand(String userName);
+
public String getScriptExtension();
public RawCommandInfo getSubmitCommand(String workingDirectory,String pbsFilePath);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
index 7c97426..0c1b2ad 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/OutputParser.java
@@ -55,5 +55,5 @@ public interface OutputParser {
* @param statusMap list of status map will return and key will be the job ID
* @param rawOutput
*/
- public void parse(Map<String,JobStatus> statusMap, String rawOutput)throws SSHApiException;
+ public void parse(String userName,Map<String,JobStatus> statusMap, String rawOutput)throws SSHApiException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
index 9e07df5..18ea772 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSJobConfiguration.java
@@ -25,7 +25,7 @@ import org.apache.commons.io.FilenameUtils;
import java.io.File;
-public class PBSJobConfiguration implements JobManagerConfiguration{
+public class PBSJobConfiguration implements JobManagerConfiguration {
private String jobDescriptionTemplateName;
@@ -35,11 +35,12 @@ public class PBSJobConfiguration implements JobManagerConfiguration{
private OutputParser parser;
- public PBSJobConfiguration(){
+ public PBSJobConfiguration() {
// this can be used to construct and use setter methods to set all the params in order
}
+
public PBSJobConfiguration(String jobDescriptionTemplateName,
- String scriptExtension,String installedPath,OutputParser parser) {
+ String scriptExtension, String installedPath, OutputParser parser) {
this.jobDescriptionTemplateName = jobDescriptionTemplateName;
this.scriptExtension = scriptExtension;
this.parser = parser;
@@ -70,8 +71,8 @@ public class PBSJobConfiguration implements JobManagerConfiguration{
return scriptExtension;
}
- public RawCommandInfo getSubmitCommand(String workingDirectory,String pbsFilePath) {
- return new RawCommandInfo(this.installedPath + "qsub " +
+ public RawCommandInfo getSubmitCommand(String workingDirectory, String pbsFilePath) {
+ return new RawCommandInfo(this.installedPath + "qsub " +
workingDirectory + File.separator + FilenameUtils.getName(pbsFilePath));
}
@@ -90,4 +91,8 @@ public class PBSJobConfiguration implements JobManagerConfiguration{
public void setParser(OutputParser parser) {
this.parser = parser;
}
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + "qstat -u " + userName);
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
index 1a4fb21..d765ba5 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/PBSOutputParser.java
@@ -20,11 +20,15 @@
*/
package org.apache.airavata.gsi.ssh.api.job;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.impl.JobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.print.attribute.standard.JobState;
import javax.validation.constraints.Null;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
public class PBSOutputParser implements OutputParser {
@@ -152,7 +156,32 @@ public class PBSOutputParser implements OutputParser {
return null;
}
- public void parse(Map<String, JobStatus> statusMap, String rawOutput) {
- //To change body of implemented methods use File | Settings | File Templates.
+ public void parse(String userName, Map<String, JobStatus> statusMap, String rawOutput) {
+ log.debug(rawOutput);
+ String[] info = rawOutput.split("\n");
+
+ int lastStop = 0;
+ for (String jobID : statusMap.keySet()) {
+ for(int i=lastStop;i<info.length;i++){
+ if(jobID.contains(info[i].split(" ")[0]) && !"".equals(info[i].split(" ")[0])){
+ // now starts processing this line
+ log.info(info[i]);
+ String correctLine = info[i];
+ String[] columns = correctLine.split(" ");
+ List<String> columnList = new ArrayList<String>();
+ for (String s : columns) {
+ if (!"".equals(s)) {
+ columnList.add(s);
+ }
+ }
+ lastStop = i+1;
+ statusMap.put(jobID, JobStatus.valueOf(columnList.get(9)));
+ break;
+ }
+ }
+
+ }
}
+
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
index 4bf4b00..798827b 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmJobConfiguration.java
@@ -90,4 +90,8 @@ public class SlurmJobConfiguration implements JobManagerConfiguration{
public void setParser(OutputParser parser) {
this.parser = parser;
}
+
+ public RawCommandInfo getUserBasedMonitorCommand(String userName) {
+ return new RawCommandInfo(this.installedPath + "squeue -u " + userName);
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
index d118ee5..37d6d71 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/api/job/SlurmOutputParser.java
@@ -145,7 +145,7 @@ public class SlurmOutputParser implements OutputParser {
return JobStatus.valueOf("U");
}
- public void parse(Map<String, JobStatus> statusMap, String rawOutput)throws SSHApiException {
+ public void parse(String userName, Map<String, JobStatus> statusMap, String rawOutput) throws SSHApiException {
//To change body of implemented methods use File | Settings | File Templates.
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 0a26589..0ddea0f 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -48,6 +48,7 @@ import java.io.StringWriter;
import java.net.URL;
import java.security.SecureRandom;
import java.util.List;
+import java.util.Map;
public class GSISSHAbstractCluster implements Cluster {
static {
@@ -406,6 +407,14 @@ public class GSISSHAbstractCluster implements Cluster {
}
}
+ public void getJobStatuses(String userName, Map<String,JobStatus> jobIDs)throws SSHApiException {
+ RawCommandInfo rawCommandInfo = jobManagerConfiguration.getUserBasedMonitorCommand(userName);
+ StandardOutReader stdOutReader = new StandardOutReader();
+ CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader);
+ String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !");
+ jobManagerConfiguration.getParser().parse(userName,jobIDs, result);
+ }
+
public ServerInfo getServerInfo() {
return serverInfo;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java
new file mode 100644
index 0000000..6e5fde9
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/HostMonitorData.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HostMonitorData {
+ private HostDescription host;
+
+ private List<MonitorID> monitorIDs;
+
+ public HostMonitorData(HostDescription host) {
+ this.host = host;
+ monitorIDs = new ArrayList<MonitorID>();
+ }
+
+ public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) {
+ this.host = host;
+ this.monitorIDs = monitorIDs;
+ }
+
+ public HostDescription getHost() {
+ return host;
+ }
+
+ public void setHost(HostDescription host) {
+ this.host = host;
+ }
+
+ public List<MonitorID> getMonitorIDs() {
+ return monitorIDs;
+ }
+
+ public void setMonitorIDs(List<MonitorID> monitorIDs) {
+ this.monitorIDs = monitorIDs;
+ }
+
+ /**
+ * this method get called by CommonUtils and it will check the right place before adding
+ * so there will not be a mismatch between this.host and monitorID.host
+ * @param monitorID
+ * @throws AiravataMonitorException
+ */
+ public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException{
+ monitorIDs.add(monitorID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index b819e2b..e1c2cb8 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -31,10 +31,10 @@ import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
+import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,9 +56,9 @@ public class MonitorManager {
private List<PushMonitor> pushMonitors; //todo we need to support multiple monitors dynamically
- private BlockingQueue<MonitorID> pullQueue;
+ private BlockingQueue<UserMonitorData> pullQueue;
- private BlockingQueue<MonitorID> pushQueue;
+ private BlockingQueue<UserMonitorData> pushQueue;
private BlockingQueue<MonitorID> localJobQueue;
@@ -74,8 +74,8 @@ public class MonitorManager {
public MonitorManager() {
pullMonitors = new ArrayList<PullMonitor>();
pushMonitors = new ArrayList<PushMonitor>();
- pullQueue = new LinkedBlockingQueue<MonitorID>();
- pushQueue = new LinkedBlockingQueue<MonitorID>();
+ pullQueue = new LinkedBlockingQueue<UserMonitorData>();
+ pushQueue = new LinkedBlockingQueue<UserMonitorData>();
finishQueue = new LinkedBlockingQueue<MonitorID>();
localJobQueue = new LinkedBlockingQueue<MonitorID>();
monitorPublisher = new MonitorPublisher(new EventBus());
@@ -156,14 +156,15 @@ public class MonitorManager {
* @param monitorID
* @throws AiravataMonitorException
*/
- public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException {
+ public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException, InterruptedException {
+
if (monitorID.getHost().getType() instanceof GsisshHostType) {
GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
|| Constants.PULL.equals(host.getMonitorMode())) {
- pullQueue.add(monitorID);
+ CommonUtils.addMonitortoQueue(pullQueue, monitorID);
} else if (Constants.PUSH.equals(host.getMonitorMode())) {
- pushQueue.add(monitorID);
+ CommonUtils.addMonitortoQueue(pushQueue, monitorID);
}
} else if(monitorID.getHost().getType() instanceof GlobusHostType){
logger.error("Monitoring does not support GlubusHostType resources");
@@ -194,14 +195,15 @@ public class MonitorManager {
(new Thread(monitor)).start();
}
- for (PushMonitor monitor : pushMonitors) {
- (new Thread(monitor)).start();
- if (monitor instanceof AMQPMonitor) {
- UnRegisterThread unRegisterThread = new
- UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
- unRegisterThread.start();
- }
- }
+ //todo fix this
+// for (PushMonitor monitor : pushMonitors) {
+// (new Thread(monitor)).start();
+// if (monitor instanceof AMQPMonitor) {
+// UnRegisterThread unRegisterThread = new
+// UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
+// unRegisterThread.start();
+// }
+// }
}
/* getter setters for the private variables */
@@ -222,11 +224,11 @@ public class MonitorManager {
this.pushMonitors = pushMonitors;
}
- public BlockingQueue<MonitorID> getPullQueue() {
+ public BlockingQueue<UserMonitorData> getPullQueue() {
return pullQueue;
}
- public void setPullQueue(BlockingQueue<MonitorID> pullQueue) {
+ public void setPullQueue(BlockingQueue<UserMonitorData> pullQueue) {
this.pullQueue = pullQueue;
}
@@ -246,11 +248,11 @@ public class MonitorManager {
this.finishQueue = finishQueue;
}
- public BlockingQueue<MonitorID> getPushQueue() {
+ public BlockingQueue<UserMonitorData> getPushQueue() {
return pushQueue;
}
- public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
+ public void setPushQueue(BlockingQueue<UserMonitorData> pushQueue) {
this.pushQueue = pushQueue;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java
new file mode 100644
index 0000000..13c177a
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/UserMonitorData.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor;
+
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the datastructure to keep the user centric job data, rather keeping
+ * the individual jobs we keep the jobs based on the each user
+ */
+public class UserMonitorData {
+ private final static Logger logger = LoggerFactory.getLogger(UserMonitorData.class);
+
+ private String userName;
+
+ private List<HostMonitorData> hostMonitorData;
+
+
+ public UserMonitorData(String userName) {
+ this.userName = userName;
+ hostMonitorData = new ArrayList<HostMonitorData>();
+ }
+
+ public UserMonitorData(String userName, List<HostMonitorData> hostMonitorDataList) {
+ this.hostMonitorData = hostMonitorDataList;
+ this.userName = userName;
+ }
+
+ public List<HostMonitorData> getHostMonitorData() {
+ return hostMonitorData;
+ }
+
+ public void setHostMonitorData(List<HostMonitorData> hostMonitorData) {
+ this.hostMonitorData = hostMonitorData;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /*
+ This method will add element to the MonitorID list, user should not
+ duplicate it, we do not check it because its going to be used by airavata
+ so we have to use carefully and this method will add a host if its a new host
+ */
+ public void addHostMonitorData(HostMonitorData hostMonitorData) throws AiravataMonitorException {
+ this.hostMonitorData.add(hostMonitorData);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
index c70e372..d35fde1 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -21,6 +21,7 @@
package org.apache.airavata.job.monitor.core;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.state.JobStatus;
@@ -38,8 +39,8 @@ public interface MessageParser {
* we have to makesure the correct message is given to the messageparser
* parse method, it will not do any filtering
* @param message content of the message
- * @param monitorID monitorID object
+ * @param userMonitorData monitorID object
* @return
*/
- JobStatus parseMessage(String message,MonitorID monitorID)throws AiravataMonitorException;
+ JobStatus parseMessage(String message,UserMonitorData userMonitorData)throws AiravataMonitorException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
index e3ecccd..77ae1e7 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/core/PushMonitor.java
@@ -21,6 +21,7 @@
package org.apache.airavata.job.monitor.core;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
/**
@@ -38,7 +39,7 @@ public abstract class PushMonitor extends AiravataAbstractMonitor {
* @param monitorID
* @return
*/
- public abstract boolean registerListener(MonitorID monitorID)throws AiravataMonitorException;
+ public abstract boolean registerListener(UserMonitorData monitorID)throws AiravataMonitorException;
/**
* This method can be invoked to unregister a listener with the
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
index c20eef2..8b9ebfd 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -43,7 +43,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor {
MonitorID take = jobQueue.take();
getPublisher().publish(new JobStatus(take, JobState.COMPLETE));
} catch (Exception e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
} while (!ServerSettings.isStopAllThreads());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index ee1e6fe..8bb33bd 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -20,24 +20,23 @@
*/
package org.apache.airavata.job.monitor.impl.pull.qstat;
-import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.job.monitor.HostMonitorData;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.PullMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
/**
@@ -48,7 +47,7 @@ public class QstatMonitor extends PullMonitor {
private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
// I think this should use DelayedBlocking Queue to do the monitoring*/
- private BlockingQueue<MonitorID> queue;
+ private BlockingQueue<UserMonitorData> queue;
private boolean startPulling = false;
@@ -59,7 +58,7 @@ public class QstatMonitor extends PullMonitor {
public QstatMonitor(){
connections = new HashMap<String, ResourceConnection>();
}
- public QstatMonitor(BlockingQueue<MonitorID> queue, MonitorPublisher publisher) {
+ public QstatMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
this.queue = queue;
this.publisher = publisher;
connections = new HashMap<String, ResourceConnection>();
@@ -105,98 +104,111 @@ public class QstatMonitor extends PullMonitor {
public boolean startPulling() throws AiravataMonitorException {
// take the top element in the queue and pull the data and put that element
// at the tail of the queue
- MonitorID take = null;
+ //todo this polling will not work with multiple usernames but with single user
+ // and multiple hosts, currently monitoring will work
+ UserMonitorData take = null;
JobStatus jobStatus = new JobStatus();
+ MonitorID currentMonitorID = null;
try {
- take = this.queue.take();
- if((take.getHost().getType() instanceof GsisshHostType)){
- long monitorDiff = 0;
- long startedDiff = 0;
- if (take.getLastMonitored() != null) {
- monitorDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getLastMonitored().getTime();
- startedDiff = (new Timestamp((new Date()).getTime())).getTime() - take.getJobStartedTime().getTime();
- //todo implement an algorithm to delay the monitor based no start time, we have to delay monitoring
- //todo for long running jobs
-// System.out.println(monitorDiff + "-" + startedDiff);
- if ((monitorDiff / 1000) < 5) {
- // its too early to monitor this job, so we put it at the tail of the queue
- this.queue.put(take);
- }
+ take = this.queue.take();
+ List<MonitorID> completedJobs = new ArrayList<MonitorID>();
+ List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
+ for (HostMonitorData iHostMonitorData : hostMonitorData) {
+ if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) {
+ GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType();
+ String hostName = gsisshHostType.getHostAddress();
+ ResourceConnection connection = null;
+ if (connections.containsKey(hostName)) {
+ logger.debug("We already have this connection so not going to create one");
+ connection = connections.get(hostName);
+ } else {
+ connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath());
+ connections.put(hostName, connection);
}
- if (take.getLastMonitored() == null || ((monitorDiff / 1000) >= 5)) {
- GsisshHostType gsisshHostType = (GsisshHostType) take.getHost().getType();
- String hostName = gsisshHostType.getHostAddress();
- ResourceConnection connection = null;
- if (connections.containsKey(hostName)) {
- logger.debug("We already have this connection so not going to create one");
- connection = connections.get(hostName);
- } else {
- connection = new ResourceConnection(take, gsisshHostType.getInstalledPath());
- connections.put(hostName, connection);
- }
- take.setStatus(connection.getJobStatus(take));
- jobStatus.setMonitorID(take);
- jobStatus.setState(take.getStatus());
+ List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+ Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID);
+ for (MonitorID iMonitorID : monitorID) {
+ currentMonitorID = iMonitorID;
+ iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()));
+ jobStatus.setMonitorID(iMonitorID);
+ jobStatus.setState(iMonitorID.getStatus());
// we have this JobStatus class to handle amqp monitoring
+
publisher.publish(jobStatus);
// if the job is completed we do not have to put the job to the queue again
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+
+ // After successful monitoring perform following actions to cleanup the queue, if necessary
if (!jobStatus.getState().equals(JobState.COMPLETE)) {
- take.setLastMonitored(new Timestamp((new Date()).getTime()));
- this.queue.put(take);
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ }else if(iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)){
+ completedJobs.add(iMonitorID);
+ } else {
+ // if the job is complete we remove it from the Map, if any of these maps
+ // get empty this userMonitorData will get delete from the queue
+ completedJobs.add(iMonitorID);
}
}
} else {
logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
}
- } catch (InterruptedException e) {
- if(!this.queue.contains(take)){
- try {
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- logger.error("Error handling the job with Job ID:" + take.getJobID());
- throw new AiravataMonitorException(e);
- } catch (SSHApiException e) {
- logger.error(e.getMessage());
- if(e.getMessage().contains("Unknown Job Id Error")){
- // in this case job is finished or may be the given job ID is wrong
- jobStatus.setState(JobState.UNKNOWN);
- publisher.publish(jobStatus);
- }else if(e.getMessage().contains("illegally formed job identifier")){
- logger.error("Wrong job ID is given so dropping the job from monitoring system");
- } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown
- if (take.getFailedCount() < 2) {
- try {
- take.setFailedCount(take.getFailedCount() + 1);
- this.queue.put(take);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- logger.error(e.getMessage());
- logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID());
- }
+ }
+ // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back
+ // now the userMonitorData goes back to the tail of the queue
+ queue.put(take);
+ // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if
+ // they become empty
+ for(MonitorID completedJob:completedJobs){
+ CommonUtils.removeMonitorFromQueue(queue,completedJob);
+ }
+ } catch (InterruptedException e) {
+ if (!this.queue.contains(take)) {
+ try {
+ this.queue.put(take);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
- throw new AiravataMonitorException("Error retrieving the job status", e);
- } catch (Exception e){
- if (take.getFailedCount() < 3) {
+ }
+ logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID());
+ throw new AiravataMonitorException(e);
+ } catch (SSHApiException e) {
+ logger.error(e.getMessage());
+ if (e.getMessage().contains("Unknown Job Id Error")) {
+ // in this case job is finished or may be the given job ID is wrong
+ jobStatus.setState(JobState.UNKNOWN);
+ publisher.publish(jobStatus);
+ } else if (e.getMessage().contains("illegally formed job identifier")) {
+ logger.error("Wrong job ID is given so dropping the job from monitoring system");
+ } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown
+ if (currentMonitorID.getFailedCount() < 2) {
try {
- take.setFailedCount(take.getFailedCount() + 1);
+ currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
this.queue.put(take);
- // if we get a wrong status we wait for a while and request again
- Thread.sleep(10000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
logger.error(e.getMessage());
- logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + take.getJobID());
+ logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
}
- throw new AiravataMonitorException("Error retrieving the job status", e);
}
-
+ throw new AiravataMonitorException("Error retrieving the job status", e);
+ } catch (Exception e) {
+ if (currentMonitorID.getFailedCount() < 3) {
+ try {
+ currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1);
+ this.queue.put(take);
+ // if we get a wrong status we wait for a while and request again
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } else {
+ logger.error(e.getMessage());
+ logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID());
+ }
+ throw new AiravataMonitorException("Error retrieving the job status", e);
+ }
return true;
@@ -221,11 +233,11 @@ public class QstatMonitor extends PullMonitor {
this.publisher = publisher;
}
- public BlockingQueue<MonitorID> getQueue() {
+ public BlockingQueue<UserMonitorData> getQueue() {
return queue;
}
- public void setQueue(BlockingQueue<MonitorID> queue) {
+ public void setQueue(BlockingQueue<UserMonitorData> queue) {
this.queue = queue;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
index ade4420..9dc1866 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/ResourceConnection.java
@@ -24,14 +24,21 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
import org.apache.airavata.gsi.ssh.api.authentication.*;
import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration;
+import org.apache.airavata.gsi.ssh.impl.JobStatus;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.job.monitor.HostMonitorData;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
public class ResourceConnection {
private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class);
@@ -59,6 +66,25 @@ public class ResourceConnection {
cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
}
+ public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException {
+ AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo();
+ String hostAddress = hostMonitorData.getHost().getType().getHostAddress();
+ String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager();
+ JobManagerConfiguration jConfig = null;
+ if (jobManager == null) {
+ log.error("No Job Manager is configured, so we are picking pbs as the default job manager");
+ jConfig = CommonUtils.getPBSJobManager(installedPath);
+ } else {
+ if (org.apache.airavata.job.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) {
+ jConfig = CommonUtils.getPBSJobManager(installedPath);
+ } else if(org.apache.airavata.job.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) {
+ jConfig = CommonUtils.getSLURMJobManager(installedPath);
+ }
+ //todo support br2 etc
+ }
+ ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort());
+ cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig);
+ }
public JobState getJobStatus(MonitorID monitorID) throws SSHApiException {
String jobID = monitorID.getJobID();
//todo so currently we execute the qstat for each job but we can use user based monitoring
@@ -66,6 +92,22 @@ public class ResourceConnection {
return getStatusFromString(cluster.getJobStatus(jobID).toString());
}
+ public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException {
+ Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>();
+ Map<String,JobState> treeMap1 = new TreeMap<String,JobState>();
+ // creating a sorted map with all the jobIds and with the predefined
+ // status as UNKNOWN
+ for (MonitorID monitorID : monitorIDs) {
+ treeMap.put(monitorID.getJobID(), JobStatus.U);
+ }
+ //todo so currently we execute the qstat for each job but we can use user based monitoring
+ //todo or we should concatenate all the commands and execute them in one go and parse the response
+ cluster.getJobStatuses(userName,treeMap);
+ for(String key:treeMap.keySet()){
+ treeMap1.put(key,getStatusFromString(treeMap.get(key).toString()));
+ }
+ return treeMap1;
+ }
private JobState getStatusFromString(String status) {
log.info("parsing the job status returned : " + status);
if(status != null){
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 06d21a1..d269bab 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -24,7 +24,9 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.HostMonitorData;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.PushMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
@@ -55,7 +57,7 @@ public class AMQPMonitor extends PushMonitor {
private MonitorPublisher publisher;
- private BlockingQueue<MonitorID> runningQueue;
+ private BlockingQueue<UserMonitorData> runningQueue;
private BlockingQueue<MonitorID> finishQueue;
@@ -70,7 +72,8 @@ public class AMQPMonitor extends PushMonitor {
public AMQPMonitor(){
}
- public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue,
+ public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<UserMonitorData> runningQueue,
+ BlockingQueue<MonitorID> finishQueue,
String proxyPath,String connectionName,List<String> hosts) {
this.publisher = publisher;
this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
@@ -89,33 +92,33 @@ public class AMQPMonitor extends PushMonitor {
}
@Override
- public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
- // do initial check before creating a channel, otherwise resources will be waste
- // and channel id will be malformed
- // this check is not implemented in MonitorManager because it depends on
- // the Monitoring implementation (what data is required)
- checkMonitorID(monitorID);
- String channelID = CommonUtils.getChannelID(monitorID);
- System.out.println("Going to start monitoring job with ID: " + monitorID.getJobID());
- logger.info("Going to start monitoring job with ID: " + monitorID.getJobID());
- // if we already have a channel we do not create one
- if (availableChannels.get(channelID) == null) {
- //todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect(amqpHosts,connectionName, proxyPath);
- Channel channel = null;
- try {
- channel = connection.createChannel();
- String queueName = channel.queueDeclare().getQueue();
-
- BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, monitorID);
- channel.basicConsume(queueName, true, consumer);
- String filterString = CommonUtils.getRoutingKey(monitorID);
- // here we queuebind to a particular user in a particular machine
- channel.queueBind(queueName, "glue2.computing_activity", filterString);
- logger.info("Using filtering string to monitor: " + filterString);
- } catch (IOException e) {
- logger.error("Error creating the connection to finishQueue the job:" + monitorID.getJobID());
+ public boolean registerListener(UserMonitorData userMonitorData) throws AiravataMonitorException {
+ List<HostMonitorData> hostNames = userMonitorData.getHostMonitorData();
+ String userName = userMonitorData.getUserName();
+ for (HostMonitorData host : hostNames) {
+ // with amqp monitor we do not use individual monitorID list but
+ // we subscribe to read user-host based subscription
+ String hostAddress = host.getHost().getType().getHostAddress();
+ String channelID = CommonUtils.getChannelID(userName, hostAddress);
+ if (availableChannels.get(channelID) == null) {
+ //todo need to fix this rather getting it from a file
+ Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+ Channel channel = null;
+ try {
+ channel = connection.createChannel();
+ String queueName = channel.queueDeclare().getQueue();
+
+ BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, userMonitorData);
+ channel.basicConsume(queueName, true, consumer);
+ String filterString = CommonUtils.getRoutingKey(userName, hostAddress);
+ // here we queuebind to a particular user in a particular machine
+ channel.queueBind(queueName, "glue2.computing_activity", filterString);
+ logger.info("Using filtering string to monitor: " + filterString);
+ } catch (IOException e) {
+ logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
+ }
}
+
}
return true;
}
@@ -125,7 +128,7 @@ public class AMQPMonitor extends PushMonitor {
startRegister = true; // this will be unset by someone else
while (startRegister || !ServerSettings.isStopAllThreads()) {
try {
- MonitorID take = runningQueue.take();
+ UserMonitorData take = runningQueue.take();
this.registerListener(take);
} catch (AiravataMonitorException e) { // catch any exceptino inside the loop
e.printStackTrace();
@@ -148,21 +151,7 @@ public class AMQPMonitor extends PushMonitor {
- private void checkMonitorID(MonitorID monitorID) throws AiravataMonitorException {
- if (monitorID.getUserName() == null) {
- String error = "Username has to be given for monitoring";
- logger.error(error);
- throw new AiravataMonitorException(error);
- } else if (monitorID.getHost() == null) {
- String error = "Host has to be given for monitoring";
- logger.error(error);
- throw new AiravataMonitorException(error);
- } else if (monitorID.getJobID() == null) {
- String error = "JobID has to be given for monitoring";
- logger.error(error);
- throw new AiravataMonitorException(error);
- }
- }
+
@Override
@@ -207,11 +196,11 @@ public class AMQPMonitor extends PushMonitor {
this.publisher = publisher;
}
- public BlockingQueue<MonitorID> getRunningQueue() {
+ public BlockingQueue<UserMonitorData> getRunningQueue() {
return runningQueue;
}
- public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
+ public void setRunningQueue(BlockingQueue<UserMonitorData> runningQueue) {
this.runningQueue = runningQueue;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index ad25b95..8104444 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -24,7 +24,7 @@ import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.MessageParser;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
@@ -38,12 +38,12 @@ public class BasicConsumer implements Consumer {
MonitorPublisher publisher;
- MonitorID monitorID;
+ UserMonitorData userMonitorData;
- public BasicConsumer(MessageParser parser, MonitorPublisher publisher, MonitorID monitorID) {
+ public BasicConsumer(MessageParser parser, MonitorPublisher publisher, UserMonitorData userMonitorData) {
this.parser = parser;
this.publisher = publisher;
- this.monitorID = monitorID;
+ this.userMonitorData = userMonitorData;
}
public void handleCancel(java.lang.String consumerTag) {
@@ -68,7 +68,7 @@ public class BasicConsumer implements Consumer {
// to the Event bus, this will be picked by
// AiravataJobStatusUpdator and store in to registry
try {
- publisher.publish(parser.parseMessage(message,monitorID));
+ publisher.publish(parser.parseMessage(message, userMonitorData));
} catch (AiravataMonitorException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
index dd9d2e4..8544bff 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.airavata.ComputingActivity;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
import org.apache.airavata.job.monitor.core.MessageParser;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.state.JobStatus;
@@ -38,7 +39,7 @@ import java.util.List;
public class JSONMessageParser implements MessageParser {
private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class);
- public JobStatus parseMessage(String message, MonitorID monitorID)throws AiravataMonitorException{
+ public JobStatus parseMessage(String message, UserMonitorData userMonitorData)throws AiravataMonitorException{
/*todo write a json message parser here*/
logger.info("Mesage parse invoked");
System.out.println(message);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a052f5be/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
index a6cd465..db85625 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -21,9 +21,16 @@
package org.apache.airavata.job.monitor.util;
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.job.monitor.HostMonitorData;
import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.UserMonitorData;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.schemas.gfac.GsisshHostType;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
public class CommonUtils {
public static boolean isPBSHost(HostDescription host){
if("pbs".equals(((GsisshHostType)host.getType()).getJobManager()) ||
@@ -49,4 +56,80 @@ public class CommonUtils {
public static String getRoutingKey(MonitorID monitorID) {
return "*." + monitorID.getUserName() + "." + monitorID.getHost().getType().getHostAddress();
}
+
+ public static String getChannelID(String userName,String hostAddress) {
+ return userName + "-" + hostAddress;
+ }
+
+ public static String getRoutingKey(String userName,String hostAddress) {
+ return "*." + userName + "." + hostAddress;
+ }
+
+ public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ UserMonitorData next = iterator.next();
+ if (next.getUserName().equals(monitorID.getUserName())) {
+ // then this is the right place to update
+ List<HostMonitorData> monitorIDs = next.getHostMonitorData();
+ for (HostMonitorData host : monitorIDs) {
+ if (host.getHost().equals(monitorID.getHost())) {
+ // ok we found right place to add this monitorID
+ host.addMonitorIDForHost(monitorID);
+ return;
+ }
+ }
+ // there is a userMonitor object for this user name but no Hosts for this host
+ // so we have to create new Hosts
+ HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+ hostMonitorData.addMonitorIDForHost(monitorID);
+ next.addHostMonitorData(hostMonitorData);
+ return;
+ }
+ }
+ HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost());
+ hostMonitorData.addMonitorIDForHost(monitorID);
+
+ UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName());
+ userMonitorData.addHostMonitorData(hostMonitorData);
+ try {
+ queue.put(userMonitorData);
+ } catch (InterruptedException e) {
+ throw new AiravataMonitorException(e);
+ }
+ }
+
+ public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue,MonitorID monitorID) throws AiravataMonitorException {
+ Iterator<UserMonitorData> iterator = queue.iterator();
+ while(iterator.hasNext()){
+ UserMonitorData next = iterator.next();
+ if(next.getUserName().equals(monitorID.getUserName())){
+ // then this is the right place to update
+ List<HostMonitorData> hostMonitorData = next.getHostMonitorData();
+ for(HostMonitorData iHostMonitorID:hostMonitorData){
+ if(iHostMonitorID.getHost().equals(monitorID.getHost())) {
+ List<MonitorID> monitorIDs = iHostMonitorID.getMonitorIDs();
+ for(MonitorID iMonitorID:monitorIDs){
+ if(iMonitorID.getJobID().equals(monitorID.getJobID())) {
+ // OK we found the object, we cannot do list.remove(object) states of two objects
+ // could be different, thats why we check the jobID
+ monitorIDs.remove(iMonitorID);
+ if(monitorIDs.size()==0) {
+ hostMonitorData.remove(iHostMonitorID);
+ if (hostMonitorData.size() == 0) {
+ // no useful data so we have to remove the element from the queue
+ queue.remove(next);
+ }
+ }
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+ throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " +
+ monitorID.getUserName() + " and jobID " + monitorID.getJobID());
+
+ }
}