You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/25 23:29:00 UTC
git commit: more improvements to amqp monitor - AIRAVATA-1022
Repository: airavata
Updated Branches:
refs/heads/master cb826d37d -> 6435ad35b
more improvements to amqp monitor - AIRAVATA-1022
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6435ad35
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6435ad35
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6435ad35
Branch: refs/heads/master
Commit: 6435ad35bcf53d619a0d87a55841609c75e11d26
Parents: cb826d3
Author: lahiru <la...@apache.org>
Authored: Tue Mar 25 18:28:53 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Mar 25 18:28:53 2014 -0400
----------------------------------------------------------------------
.../src/main/resources/conf/log4j.properties | 1 +
modules/gfac/gfac-monitor/pom.xml | 2 +-
.../airavata/job/monitor/MonitorManager.java | 9 +--
.../server/OrchestratorServerHandler.java | 1 +
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 28 ++++----
.../monitor/impl/push/amqp/BasicConsumer.java | 4 +-
.../impl/push/amqp/UnRegisterThread.java | 76 --------------------
.../impl/push/amqp/UnRegisterWorker.java | 68 ++++++++++++++++++
.../airavata/job/monitor/util/CommonUtils.java | 2 +-
9 files changed, 93 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties b/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
index cf9c45d..f10bf8f 100644
--- a/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
+++ b/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
@@ -23,6 +23,7 @@ log4j.rootLogger=INFO, CONSOLE, LOGFILE
# Set the enterprise logger priority to FATAL
log4j.logger.org.apache.axis2.enterprise=FATAL
+log4j.logger.org.apache.airavata=DEBUG
log4j.logger.de.hunsicker.jalopy.io=FATAL
log4j.logger.httpclient.wire.header=FATAL
log4j.logger.org.apache.commons.httpclient=FATAL
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/gfac/gfac-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/pom.xml b/modules/gfac/gfac-monitor/pom.xml
index 0582e52..f09be35 100644
--- a/modules/gfac/gfac-monitor/pom.xml
+++ b/modules/gfac/gfac-monitor/pom.xml
@@ -32,7 +32,7 @@
<!-- monitoring tool from tools/job-monitor -->
<dependency>
<groupId>org.apache.airavata</groupId>
- <artifactId>airavata-job-monitor</artifactId>
+ <artifactId>job-monitor-tool</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 3db297d..2334048 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -30,7 +30,7 @@ import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
+import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterWorker;
import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
import org.apache.airavata.schemas.gfac.GlobusHostType;
@@ -69,6 +69,7 @@ public class MonitorManager {
private Monitor localJobMonitor;
+
/**
* This will initialize the major monitoring system.
*/
@@ -94,6 +95,7 @@ public class MonitorManager {
monitor.setFinishQueue(this.getFinishQueue());
monitor.setRunningQueue(this.getPushQueue());
addPushMonitor(monitor);
+ registerListener(new UnRegisterWorker(monitor.getAvailableChannels()));
}
@@ -202,11 +204,6 @@ public class MonitorManager {
//todo fix this
for (PushMonitor monitor : pushMonitors) {
(new Thread(monitor)).start();
- if (monitor instanceof AMQPMonitor) {
- UnRegisterThread unRegisterThread = new
- UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
- unRegisterThread.start();
- }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e2f25fc..763549d 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -115,6 +115,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
} else if (monitor instanceof PushMonitor) {
if (monitor instanceof AMQPMonitor) {
((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
+ monitorManager.registerListener(monitor);
monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
}
} else if(monitor instanceof LocalJobMonitor){
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 44d4653..8025b15 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.job.monitor.impl.push.amqp;
+import com.google.common.eventbus.Subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.apache.airavata.common.utils.ServerSettings;
@@ -97,13 +98,16 @@ public class AMQPMonitor extends PushMonitor {
// with amqp monitor we do not use individual monitorID list but
// we subscribe to read user-host based subscription
String hostAddress = host.getHost().getType().getHostAddress();
- String channelID = CommonUtils.getChannelID(userName, hostAddress);
- if (availableChannels.get(channelID) == null) {
- //todo need to fix this rather getting it from a file
- Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
- Channel channel = null;
- try {
+ // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+ // will be picked by the Monitor, so always new usermonitorData object will get create
+ MonitorID monitorID = host.getMonitorIDs().get(0);
+ String channelID = CommonUtils.getChannelID(monitorID);
+ try {
+ //todo need to fix this rather getting it from a file
+ Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+ Channel channel = null;
channel = connection.createChannel();
+ availableChannels.put(channelID, channel);
String queueName = channel.queueDeclare().getQueue();
BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, host);
@@ -112,9 +116,8 @@ public class AMQPMonitor extends PushMonitor {
// here we queuebind to a particular user in a particular machine
channel.queueBind(queueName, "glue2.computing_activity", filterString);
logger.info("Using filtering string to monitor: " + filterString);
- } catch (IOException e) {
- logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
- }
+ } catch (IOException e) {
+ logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
}
}
@@ -152,7 +155,7 @@ public class AMQPMonitor extends PushMonitor {
- @Override
+ @Subscribe
public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
String channelID = CommonUtils.getChannelID(monitorID);
Channel channel = availableChannels.get(channelID);
@@ -162,13 +165,12 @@ public class AMQPMonitor extends PushMonitor {
} else {
try {
channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+ channel.close();
+ channel.getConnection().close();
} catch (IOException e) {
logger.error("Error unregistering the listener");
throw new AiravataMonitorException("Error unregistering the listener");
}
-
-
-
}
return true;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index dfd5af8..9c08399 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -66,12 +66,14 @@ public class BasicConsumer implements Consumer {
byte[] body) {
logger.debug(" job update for: " + envelope.getRoutingKey());
-
String message = new String(body);
message = message.replaceAll("(?m)^", " ");
// Here we parse the message and get the job status and push it
// to the Event bus, this will be picked by
// AiravataJobStatusUpdator and store in to registry
+ logger.debug("************************************************************");
+ logger.debug("AMQP Message recieved \n" + message);
+ logger.debug("************************************************************");
try {
String jobID = envelope.getRoutingKey().split("\\.")[0];
List<MonitorID> monitorIDs = hostMonitorData.getMonitorIDs();
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
deleted file mode 100644
index 528b9f6..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.Channel;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class UnRegisterThread extends Thread {
- private final static Logger logger = LoggerFactory.getLogger(UnRegisterThread.class);
- private BlockingQueue<MonitorID> finishQueue;
- private Map<String, Channel> availableChannels;
-
- public UnRegisterThread(BlockingQueue<MonitorID> monitor, Map<String, Channel> channels) {
- this.finishQueue = monitor;
- this.availableChannels = channels;
- }
-
- public void run() {
- while (!ServerSettings.isStopAllThreads()) {
- try {
- MonitorID monitorID = this.finishQueue.take();
- unRegisterListener(monitorID);
- //
- } catch (AiravataMonitorException e) {
- logger.error(e.getLocalizedMessage());
- } catch (InterruptedException e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- }
-
- private boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
- String channelID = CommonUtils.getChannelID(monitorID);
- Channel channel = availableChannels.get(channelID);
- if (channel == null) {
- logger.error("Already Unregistered the listener");
- throw new AiravataMonitorException("Already Unregistered the listener");
- } else {
- try {
- channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
- } catch (IOException e) {
- logger.error("Error unregistering the listener");
- throw new AiravataMonitorException("Error unregistering the listener");
- }
- }
- return true;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
new file mode 100644
index 0000000..a893372
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class UnRegisterWorker{
+ private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
+ private Map<String, Channel> availableChannels;
+
+ public UnRegisterWorker(Map<String, Channel> channels) {
+ this.availableChannels = channels;
+ }
+
+ @Subscribe
+ private boolean unRegisterListener(JobStatus jobStatus) throws AiravataMonitorException {
+ MonitorID monitorID = jobStatus.getMonitorID();
+ String channelID = CommonUtils.getChannelID(monitorID);
+ if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
+ Channel channel = availableChannels.get(channelID);
+ if (channel == null) {
+ logger.error("Already Unregistered the listener");
+ throw new AiravataMonitorException("Already Unregistered the listener");
+ } else {
+ try {
+ channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+ channel.close();
+ channel.getConnection().close();
+ availableChannels.remove(channelID);
+ } catch (IOException e) {
+ logger.error("Error unregistering the listener");
+ throw new AiravataMonitorException("Error unregistering the listener");
+ }
+ }
+ }
+ return true;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
index db85625..8a7b160 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -50,7 +50,7 @@ public class CommonUtils {
}
}
public static String getChannelID(MonitorID monitorID) {
- return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName();
+ return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName() + "-" + monitorID.getJobID();
}
public static String getRoutingKey(MonitorID monitorID) {