You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/10 19:55:59 UTC
git commit: fixing issues with job monitor - AIRAVATA-1027
Repository: airavata
Updated Branches:
refs/heads/master 4ee0852d0 -> 24babee47
fixing issues with job monitor - AIRAVATA-1027
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/24babee4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/24babee4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/24babee4
Branch: refs/heads/master
Commit: 24babee475123141141d21b98e71da7cdc4bc66b
Parents: 4ee0852
Author: lahiru <la...@apache.org>
Authored: Mon Mar 10 14:55:48 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Mar 10 14:55:48 2014 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 23 +++---
.../airavata/client/tools/DocumentCreator.java | 1 +
.../job/monitor/AiravataJobStatusUpdator.java | 2 +-
.../apache/airavata/job/monitor/MonitorID.java | 25 ++++++
.../airavata/job/monitor/MonitorManager.java | 85 ++++++++++++-------
.../airavata/job/monitor/core/Monitor.java | 3 +-
.../job/monitor/event/MonitorPublisher.java | 1 +
.../monitor/impl/pull/qstat/QstatMonitor.java | 24 ++----
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 50 ++++++++---
.../airavata/job/monitor/state/JobStatus.java | 17 +---
.../airavata/job/monitor/AMQPMonitorTest.java | 3 +-
.../airavata/job/monitor/QstatMonitorTest.java | 2 +-
.../src/test/resources/monitor.properties | 1 +
.../main/resources/schemas/HostDescription.xsd | 1 +
.../apache/airavata/common/utils/Constants.java | 2 +
.../resources/conf/airavata-server.properties | 5 +-
.../server/OrchestratorServerHandler.java | 87 +++++++++-----------
17 files changed, 192 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 4f473f1..6c11091 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,9 +61,9 @@ public class CreateLaunchExperiment {
final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
System.out.println("API version is " + airavata.GetAPIVersion());
addDescriptors();
- final String expId = createExperimentForTrestles(airavata);
+// final String expId = createExperimentForTrestles(airavata);
// final String expId = createUS3ExperimentForTrestles(airavata);
-// final String expId = createExperimentForStampede(airavata);
+ final String expId = createExperimentForStampede(airavata);
// final String expId = createUS3ExperimentForStampede(airavata);
System.out.println("Experiment ID : " + expId);
launchExperiment(airavata, expId);
@@ -109,16 +109,17 @@ public class CreateLaunchExperiment {
// Experiment experiment = airavata.getExperiment(expId);
// System.out.println("retrieved exp id : " + experiment.getExperimentID());
- } catch (TException e) {
+ } catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
e.printStackTrace();
- } catch (ApplicationSettingsException e) {
- logger.error("Error while creating airavata API object", e.getMessage());
- e.printStackTrace();
- } catch (AiravataAPIInvocationException e) {
- logger.error("Error while creating airavata API object", e.getMessage());
- e.printStackTrace();
}
+// } catch (ApplicationSettingsException e) {
+// logger.error("Error while creating airavata API object", e.getMessage());
+// e.printStackTrace();
+// } catch (AiravataAPIInvocationException e) {
+// logger.error("Error while creating airavata API object", e.getMessage());
+// e.printStackTrace();
+// }
}
public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException {
@@ -129,8 +130,8 @@ public class CreateLaunchExperiment {
// documentCreator.createPBSDocs();
// documentCreator.createPBSDocsForOGCE();
// documentCreator.createMPIPBSDocsTrestles();
-// documentCreator.createSlurmDocs();
- documentCreator.createMPIPBSDocsStampede();
+ documentCreator.createSlurmDocs();
+// documentCreator.createMPIPBSDocsStampede();
} catch (AiravataAPIInvocationException e) {
logger.error("Unable to create airavata API", e.getMessage());
throw new AiravataAPIInvocationException(e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index f6b953e..eb3a0ca 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -638,6 +638,7 @@ public class DocumentCreator {
((GsisshHostType) host.getType()).setJobManager("slurm");
((GsisshHostType) host.getType()).setInstalledPath("/usr/bin/");
((GsisshHostType) host.getType()).setPort(2222);
+// ((GsisshHostType) host.getType()).setMo(2222);
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 7407a10..6dc007c 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -81,7 +81,7 @@ public class AiravataJobStatusUpdator{
break;
case UNKNOWN:
logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
- logger.info("Unknown job status came, if the old job status is RUNNING or something active, we have to make it complete");
+ jobsToMonitor.remove(jobStatus.getMonitorID());
//todo implement this logic
break;
case QUEUED:
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index 100c141..945362b 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -23,6 +23,9 @@ package org.apache.airavata.job.monitor;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.omg.PortableInterceptor.ACTIVE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +62,7 @@ public class MonitorID {
private int failedCount = 0;
+ private JobState state;
public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) {
this.host = host;
@@ -171,4 +175,25 @@ public class MonitorID {
public void setFailedCount(int failedCount) {
this.failedCount = failedCount;
}
+
+ public JobState getStatus() {
+ return state;
+ }
+
+ public void setStatus(JobState status) {
+ if (this.state != null && status.equals(JobState.UNKNOWN)) {
+ switch (this.state) {
+ case ACTIVE:
+ this.state = JobState.COMPLETE;
+ break;
+ case QUEUED:
+ this.state = JobState.COMPLETE;
+ break;
+
+ }
+ }else{
+ // normal scenario
+ this.state = status;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index aad7d41..a0927a5 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -21,6 +21,7 @@
package org.apache.airavata.job.monitor;
import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.job.monitor.core.PullMonitor;
import org.apache.airavata.job.monitor.core.PushMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -29,9 +30,11 @@ import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.validation.constraints.Null;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -50,7 +53,11 @@ public class MonitorManager {
private List<PushMonitor> pushMonitors; //todo we need to support multiple monitors dynamically
- private BlockingQueue<MonitorID> runningQueue;
+ private BlockingQueue<MonitorID> pullQueue;
+
+ private BlockingQueue<MonitorID> pushQueue;
+
+ private BlockingQueue<MonitorID> localJobQueue;
private BlockingQueue<MonitorID> finishQueue;
@@ -62,7 +69,8 @@ public class MonitorManager {
public MonitorManager() {
pullMonitors = new ArrayList<PullMonitor>();
pushMonitors = new ArrayList<PushMonitor>();
- runningQueue = new LinkedBlockingDeque<MonitorID>();
+ pullQueue = new LinkedBlockingDeque<MonitorID>();
+ pushQueue = new LinkedBlockingDeque<MonitorID>();
finishQueue = new LinkedBlockingDeque<MonitorID>();
monitorPublisher = new MonitorPublisher(new EventBus());
registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
@@ -77,7 +85,7 @@ public class MonitorManager {
public void addAMQPMonitor(AMQPMonitor monitor) {
monitor.setPublisher(this.getMonitorPublisher());
monitor.setFinishQueue(this.getFinishQueue());
- monitor.setRunningQueue(this.getRunningQueue());
+ monitor.setRunningQueue(this.getPushQueue());
addPushMonitor(monitor);
}
@@ -89,7 +97,7 @@ public class MonitorManager {
*/
public void addQstatMonitor(QstatMonitor qstatMonitor) {
qstatMonitor.setPublisher(this.getMonitorPublisher());
- qstatMonitor.setQueue(this.getRunningQueue());
+ qstatMonitor.setQueue(this.getPullQueue());
addPullMonitor(qstatMonitor);
}
@@ -127,13 +135,19 @@ public class MonitorManager {
* This is going to be useful during the startup of the launching process
*
* @param monitorID
+ * @throws AiravataMonitorException
*/
public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException {
- try {
- runningQueue.put(monitorID);
- } catch (InterruptedException e) {
- String error = "Error while putting the job: " + monitorID.getJobID() + " the monitor queue";
- throw new AiravataMonitorException(error, e);
+ if (monitorID.getHost().getType() instanceof GsisshHostType) {
+ GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
+ if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
+ || Constants.PULL.equals(host.getMonitorMode())) {
+ pullQueue.add(monitorID);
+ } else if (Constants.PUSH.equals(host.getMonitorMode())) {
+ pushQueue.add(monitorID);
+ }
+ } else {
+ logger.error("We only support Gsissh host types currently");
}
}
@@ -148,26 +162,19 @@ public class MonitorManager {
* @throws AiravataMonitorException
*/
public void launchMonitor() throws AiravataMonitorException {
- if (pushMonitors.isEmpty()) {
- if (pullMonitors.isEmpty()) {
- logger.error("Before launching MonitorManager should have atleast one Monitor");
- return;
- } else {
- //no push monitor is configured so we launch pull monitor
- QstatMonitor pullMonitor = (QstatMonitor)pullMonitors.get(0);
- (new Thread(pullMonitor)).start();
- }
- } else {
- // there is a push monitor configured, so we schedule the push monitor
- // We currently support dealing with one type of monitor
- AMQPMonitor pushMonitor = (AMQPMonitor) pushMonitors.get(0);
- (new Thread(pushMonitor)).start();
-
- UnRegisterThread unRegisterThread = new
- UnRegisterThread(pushMonitor.getFinishQueue(), pushMonitor.getAvailableChannels());
- unRegisterThread.start();
+ //no push monitor is configured so we launch pull monitor
+ for (PullMonitor monitor : pullMonitors) {
+ (new Thread(monitor)).start();
}
+ for (PushMonitor monitor : pushMonitors) {
+ (new Thread(monitor)).start();
+ if (monitor instanceof AMQPMonitor) {
+ UnRegisterThread unRegisterThread = new
+ UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
+ unRegisterThread.start();
+ }
+ }
}
/* getter setters for the private variables */
@@ -188,12 +195,12 @@ public class MonitorManager {
this.pushMonitors = pushMonitors;
}
- public BlockingQueue<MonitorID> getRunningQueue() {
- return runningQueue;
+ public BlockingQueue<MonitorID> getPullQueue() {
+ return pullQueue;
}
- public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
- this.runningQueue = runningQueue;
+ public void setPullQueue(BlockingQueue<MonitorID> pullQueue) {
+ this.pullQueue = pullQueue;
}
public MonitorPublisher getMonitorPublisher() {
@@ -211,4 +218,20 @@ public class MonitorManager {
public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
this.finishQueue = finishQueue;
}
+
+ public BlockingQueue<MonitorID> getPushQueue() {
+ return pushQueue;
+ }
+
+ public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
+ this.pushQueue = pushQueue;
+ }
+
+ public BlockingQueue<MonitorID> getLocalJobQueue() {
+ return localJobQueue;
+ }
+
+ public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
+ this.localJobQueue = localJobQueue;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
index ce8cf22..9627bbc 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
@@ -20,12 +20,11 @@
*/
package org.apache.airavata.job.monitor.core;
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
/**
* This is the primary interface for Monitors,
* This can be used to implement different methods of monitoring
*/
-public interface Monitor {
+public interface Monitor extends Runnable {
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index 12c27fa..95b64ab 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -21,6 +21,7 @@
package org.apache.airavata.job.monitor.event;
import com.google.common.eventbus.EventBus;
+import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.state.JobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index a2c85ed..1978ad8 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.job.monitor.impl.pull.qstat;
-import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.core.PullMonitor;
@@ -29,7 +29,6 @@ import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.state.JobStatus;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +43,7 @@ import java.util.concurrent.BlockingQueue;
* This monitor is based on qstat command which can be run
* in grid resources and retrieve the job status.
*/
-public class QstatMonitor extends PullMonitor implements Runnable {
+public class QstatMonitor extends PullMonitor {
private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
// I think this should use DelayedBlocking Queue to do the monitoring*/
@@ -95,16 +94,9 @@ public class QstatMonitor extends PullMonitor implements Runnable {
// at the tail of the queue
MonitorID take = null;
JobStatus jobStatus = new JobStatus();
- while (!this.queue.isEmpty()) {
- try {
- Iterator<MonitorID> iterator = this.queue.iterator();
- // no need to check iterator.hasNext because its already checked
- MonitorID next = iterator.next();
- // we check whether the job is type of gsissh otherwise we return the job back to the queue
- // Here we use iterator because it not fair to take the object from the queue unless its
- // the correct host type,so if its not the right type it will remain in the queue
- if(next.getHost().getType() instanceof GsisshHostType){
- take = this.queue.take();
+ try {
+ take = this.queue.take();
+ if((take.getHost().getType() instanceof GsisshHostType)){
long monitorDiff = 0;
long startedDiff = 0;
if (take.getLastMonitored() != null) {
@@ -129,8 +121,10 @@ public class QstatMonitor extends PullMonitor implements Runnable {
connection = new ResourceConnection(take, gsisshHostType.getInstalledPath());
connections.put(hostName, connection);
}
+ take.setStatus(connection.getJobStatus(take));
jobStatus.setMonitorID(take);
- jobStatus.setState(connection.getJobStatus(take));
+ jobStatus.setState(take.getStatus());
+ // we have this JobStatus class to handle amqp monitoring
publisher.publish(jobStatus);
// if the job is completed we do not have to put the job to the queue again
if (!jobStatus.getState().equals(JobState.COMPLETE)) {
@@ -186,7 +180,7 @@ public class QstatMonitor extends PullMonitor implements Runnable {
}
throw new AiravataMonitorException("Error retrieving the job status", e);
}
- }
+
return true;
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 6bf5f01..b5b6e8f 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -22,17 +22,21 @@ package org.apache.airavata.job.monitor.impl.push.amqp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
+import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.core.PushMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -41,7 +45,7 @@ import java.util.concurrent.BlockingQueue;
* rabbitmq client to recieve AMQP based monitoring data from
* mostly excede resources.
*/
-public class AMQPMonitor extends PushMonitor implements Runnable {
+public class AMQPMonitor extends PushMonitor {
private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
@@ -57,24 +61,36 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
private BlockingQueue<MonitorID> finishQueue;
+ private String connectionName;
+
+ private String proxyPath;
+
+ private List<String> amqpHosts;
+
public AMQPMonitor(){
}
- public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue) {
+ public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue,
+ String proxyPath,String connectionName,List<String> hosts) {
this.publisher = publisher;
- this.runningQueue = runningQueue;
- this.finishQueue = finishQueue;
+ this.runningQueue = runningQueue; // these will be initialized by the MonitorManager
+ this.finishQueue = finishQueue; // these will be initialized by the MonitorManager
availableChannels = new HashMap<String, Channel>();
-// UnRegisterThread unRegisterThread = new UnRegisterThread(finishQueue,availableChannels);
-// unRegisterThread.run();
- System.out.println("Testing");
+ this.connectionName = connectionName;
+ this.proxyPath = proxyPath;
+ this.amqpHosts = hosts;
+ }
+
+ public void initialize(String proxyPath,String connectionName,List<String> hosts){
+ this.connectionName = connectionName;
+ this.proxyPath = proxyPath;
+ this.amqpHosts = hosts;
}
public void run() {
try {
// before going to the while true mode we start unregister thread
while (true) {
- // we got a new job to do the monitoring
MonitorID take = runningQueue.take();
this.registerListener(take);
}
@@ -99,7 +115,7 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
// if we already have a channel we do not create one
if (availableChannels.get(channelID) == null) {
//todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect("xsede_private", "/Users/lahirugunathilake/Downloads/x509up_u503876");
+ Connection connection = AMQPConnectionUtil.connect(connectionName, proxyPath);
Channel channel = null;
try {
channel = connection.createChannel();
@@ -185,9 +201,19 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
this.finishQueue = finishQueue;
}
- /**
- * implementing a logic to handle the finished job and unsubscribe
- */
+ public String getProxyPath() {
+ return proxyPath;
+ }
+
+ public void setProxyPath(String proxyPath) {
+ this.proxyPath = proxyPath;
+ }
+ public List<String> getAmqpHosts() {
+ return amqpHosts;
+ }
+ public void setAmqpHosts(List<String> amqpHosts) {
+ this.amqpHosts = amqpHosts;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
index 9ee6ce8..fe623fb 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
@@ -43,22 +43,7 @@ public class JobStatus {
}
public void setState(JobState state) {
- // this is to handle quick change of status and getStatus returns unknown values
- // because job is already finished and information is removed in the resource
- if (this.state != null && state.equals(JobState.UNKNOWN)) {
- switch (this.state) {
- case ACTIVE:
- this.state = JobState.COMPLETE;
- break;
- case QUEUED:
- this.state = JobState.COMPLETE;
- break;
-
- }
- }else{
- // normal scenario
- this.state = state;
- }
+ this.state = state;
}
public MonitorID getMonitorID() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index a6ef7ea..6327050 100644
--- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -31,7 +31,6 @@ import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.axiom.om.util.CommonUtils;
import org.junit.Before;
import org.junit.Test;
@@ -69,7 +68,7 @@ public class AMQPMonitorTest {
monitorManager = new MonitorManager();
AMQPMonitor amqpMonitor = new
AMQPMonitor(monitorManager.getMonitorPublisher(),
- monitorManager.getRunningQueue(), monitorManager.getFinishQueue());
+ monitorManager.getPullQueue(), monitorManager.getFinishQueue());
try {
monitorManager.addPushMonitor(amqpMonitor);
monitorManager.launchMonitor();
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
index aabc0d7..126b8ae 100644
--- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
+++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@ -68,7 +68,7 @@ public class QstatMonitorTest {
monitorManager = new MonitorManager();
QstatMonitor qstatMonitor = new
- QstatMonitor(monitorManager.getRunningQueue(), monitorManager.getMonitorPublisher());
+ QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
try {
monitorManager.addPullMonitor(qstatMonitor);
monitorManager.launchMonitor();
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/resources/monitor.properties b/modules/airavata-job-monitor/src/test/resources/monitor.properties
index 0b0b5f4..a4d68cf 100644
--- a/modules/airavata-job-monitor/src/test/resources/monitor.properties
+++ b/modules/airavata-job-monitor/src/test/resources/monitor.properties
@@ -1,2 +1,3 @@
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede_private
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
----------------------------------------------------------------------
diff --git a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
index 4ec3a4a..c4052d9 100644
--- a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
+++ b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
@@ -112,6 +112,7 @@
<element name="postJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/>
<element name="installedPath" type="xsd:string" minOccurs="0" maxOccurs="1" default="/opt/torque/bin"/>
<element name="jobManager" type="xsd:string" minOccurs="0" maxOccurs="1"/>
+ <element name="monitorMode" type="xsd:string" minOccurs="0" maxOccurs="1"/>
</sequence>
</extension>
</complexContent>
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index effc4bb..bac5913 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -29,4 +29,6 @@ public final class Constants {
public static final String USER_IN_SESSION = "userName";
public static final String GATEWAY_NAME = "gateway_id";
public static final String GFAC_CONFIG_XML = "gfac-config.xml";
+ public static final String PUSH = "push";
+ public static final String PULL = "pull";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
index e0c4b4b..d0c94c9 100644
--- a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
+++ b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
@@ -256,11 +256,10 @@ TwoPhase=true
###---------------------------Monitoring module Configurations---------------------------###
#This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
#mechanisms and one would be able to start a monitor
-primaryMonitor=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor
-#We do not support a secondaray monitoring at this point or host specific monitoring
-secondaryMonitor=org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
+monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
#This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede_private
http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index defbdcf..0ac4756 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -44,13 +44,12 @@ import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
+import java.lang.String;
+import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@@ -63,14 +62,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private Registry registry;
- private boolean pushMode = true;
-
GSIAuthenticationInfo authenticationInfo = null;
/**
* Query orchestrator server to fetch the CPI version
*/
- @Override
public String getOrchestratorCPIVersion() throws TException {
return orchestrator_cpi_serviceConstants.ORCHESTRATOR_CPI_VERSION;
@@ -95,34 +91,33 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
7512, 17280000, certPath);
// loading Monitor configuration
- String primaryMonitor = properties.getProperty("primaryMonitor");
- String secondaryMonitor = properties.getProperty("secondaryMonitor");
-
+ String monitors = properties.getProperty("monitors");
+ List<String> monitorList = Arrays.asList(monitors.split(","));
+ List<String> list = Arrays.asList(properties.getProperty("amqp.hosts").split(","));
+ String proxyPath = properties.getProperty("proxy.file.path");
+ String connectionName = properties.getProperty("connection.name");
- if (primaryMonitor == null) {
+ if (monitors == null) {
log.error("Error loading primaryMonitor and there has to be a primary monitor");
} else {
- Class<? extends Monitor> aClass = Class.forName(primaryMonitor).asSubclass(Monitor.class);
- Monitor monitor = aClass.newInstance();
- if (monitor instanceof PullMonitor) {
- if(monitor instanceof QstatMonitor){
- monitorManager.addQstatMonitor((QstatMonitor)monitor);
+ for (String monitorClass : monitorList) {
+ Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
+ Monitor monitor = aClass.newInstance();
+ if (monitor instanceof PullMonitor) {
+ if (monitor instanceof QstatMonitor) {
+ monitorManager.addQstatMonitor((QstatMonitor) monitor);
+ }
+ } else if (monitor instanceof PushMonitor) {
+ if (monitor instanceof AMQPMonitor) {
+ ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
+ monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
+ }
+ } else {
+ log.error("Wrong class is given to primary Monitor");
}
- pushMode = false;
- } else if (monitor instanceof PushMonitor) {
- if(monitor instanceof AMQPMonitor){
- monitorManager.addAMQPMonitor((AMQPMonitor)monitor);
- }
- } else {
- log.error("Wrong class is given to primary Monitor");
}
- }
- if (secondaryMonitor == null) {
- log.info("No secondary Monitor has configured !!!!");
- } else {
- // todo we do not support a secondary Monitor at this point
- }
+ }
monitorManager.registerListener(orchestrator);
// Now Monitor Manager is properly configured, now we have to start the monitoring system.
// This will initialize all the required threads and required queues
@@ -152,20 +147,19 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
*
* @param experimentId
*/
- @Override
public boolean launchExperiment(String experimentId) throws TException {
//TODO: Write the Orchestrator implementaion
try {
List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
MonitorID monitorID = null;
- if(tasks.size() > 1){
+ if (tasks.size() > 1) {
log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
}
- for(TaskDetails taskID:tasks) {
+ for (TaskDetails taskID : tasks) {
//iterate through all the generated tasks and performs the job submisssion+monitoring
Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
- if(experiment == null){
+ if (experiment == null) {
log.error("Error retrieving the Experiment by the given experimentID: " + experimentId);
return false;
}
@@ -174,27 +168,28 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
// creating monitorID to register with monitoring queue
+ // this is a special case because amqp has to be in place before submitting the job
+ if ((hostDescription instanceof GsisshHostType) &&
+ Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
- if(pushMode){
- // during the pull we need the monitorID in the queue inadvance
- // For this we have enough data at this point
- monitorID = new MonitorID(hostDescription, null,taskID.getTaskID(),experimentId, userName);
+ monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), experimentId, userName);
monitorManager.addAJobToMonitor(monitorID);
- }
- // Launching job for each task
- String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
- log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
- // if the monitoring is pull mode then we add the monitorID for each task after submitting
- // the job with the jobID, otherwise we don't need the jobID
- if(!pushMode) {
- monitorID = new MonitorID(hostDescription, jobID,taskID.getTaskID(),experimentId, userName, authenticationInfo);
+ String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+ log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+ } else {
+ // Launching job for each task
+ // if the monitoring is pull mode then we add the monitorID for each task after submitting
+ // the job with the jobID, otherwise we don't need the jobID
+ String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+ log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+ monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), experimentId, userName, authenticationInfo);
monitorManager.addAJobToMonitor(monitorID);
}
}
} catch (Exception e) {
throw new TException(e);
}
- return false;
+ return true;
}
@Override