You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/25 23:29:00 UTC

git commit: more improvements to amqp monitor - AIRAVATA-1022

Repository: airavata
Updated Branches:
  refs/heads/master cb826d37d -> 6435ad35b


more improvements to amqp monitor - AIRAVATA-1022


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6435ad35
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6435ad35
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6435ad35

Branch: refs/heads/master
Commit: 6435ad35bcf53d619a0d87a55841609c75e11d26
Parents: cb826d3
Author: lahiru <la...@apache.org>
Authored: Tue Mar 25 18:28:53 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Mar 25 18:28:53 2014 -0400

----------------------------------------------------------------------
 .../src/main/resources/conf/log4j.properties    |  1 +
 modules/gfac/gfac-monitor/pom.xml               |  2 +-
 .../airavata/job/monitor/MonitorManager.java    |  9 +--
 .../server/OrchestratorServerHandler.java       |  1 +
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 28 ++++----
 .../monitor/impl/push/amqp/BasicConsumer.java   |  4 +-
 .../impl/push/amqp/UnRegisterThread.java        | 76 --------------------
 .../impl/push/amqp/UnRegisterWorker.java        | 68 ++++++++++++++++++
 .../airavata/job/monitor/util/CommonUtils.java  |  2 +-
 9 files changed, 93 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties b/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
index cf9c45d..f10bf8f 100644
--- a/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
+++ b/modules/distribution/airavata-server/src/main/resources/conf/log4j.properties
@@ -23,6 +23,7 @@ log4j.rootLogger=INFO, CONSOLE, LOGFILE
 
 # Set the enterprise logger priority to FATAL
 log4j.logger.org.apache.axis2.enterprise=FATAL
+log4j.logger.org.apache.airavata=DEBUG
 log4j.logger.de.hunsicker.jalopy.io=FATAL
 log4j.logger.httpclient.wire.header=FATAL
 log4j.logger.org.apache.commons.httpclient=FATAL

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/gfac/gfac-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/pom.xml b/modules/gfac/gfac-monitor/pom.xml
index 0582e52..f09be35 100644
--- a/modules/gfac/gfac-monitor/pom.xml
+++ b/modules/gfac/gfac-monitor/pom.xml
@@ -32,7 +32,7 @@
 	<!-- monitoring tool from tools/job-monitor -->
 	 <dependency>
             <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-job-monitor</artifactId>
+            <artifactId>job-monitor-tool</artifactId>
             <version>${project.version}</version>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 3db297d..2334048 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -30,7 +30,7 @@ import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
+import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterWorker;
 import org.apache.airavata.job.monitor.util.CommonUtils;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
 import org.apache.airavata.schemas.gfac.GlobusHostType;
@@ -69,6 +69,7 @@ public class MonitorManager {
 
     private Monitor localJobMonitor;
 
+
     /**
      * This will initialize the major monitoring system.
      */
@@ -94,6 +95,7 @@ public class MonitorManager {
         monitor.setFinishQueue(this.getFinishQueue());
         monitor.setRunningQueue(this.getPushQueue());
         addPushMonitor(monitor);
+        registerListener(new UnRegisterWorker(monitor.getAvailableChannels()));
     }
 
 
@@ -202,11 +204,6 @@ public class MonitorManager {
         //todo fix this
         for (PushMonitor monitor : pushMonitors) {
             (new Thread(monitor)).start();
-            if (monitor instanceof AMQPMonitor) {
-                UnRegisterThread unRegisterThread = new
-                        UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
-                unRegisterThread.start();
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index e2f25fc..763549d 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -115,6 +115,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                     } else if (monitor instanceof PushMonitor) {
                         if (monitor instanceof AMQPMonitor) {
                             ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
+                            monitorManager.registerListener(monitor);
                             monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
                         }
                     } else if(monitor instanceof LocalJobMonitor){

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 44d4653..8025b15 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.job.monitor.impl.push.amqp;
 
+import com.google.common.eventbus.Subscribe;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -97,13 +98,16 @@ public class AMQPMonitor extends PushMonitor {
             // with amqp monitor we do not use individual monitorID list but
             // we subscribe to read user-host based subscription
             String hostAddress = host.getHost().getType().getHostAddress();
-            String channelID = CommonUtils.getChannelID(userName, hostAddress);
-            if (availableChannels.get(channelID) == null) {
-                //todo need to fix this rather getting it from a file
-                Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
-                Channel channel = null;
-                try {
+            // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it
+            // will be picked by the Monitor, so always new usermonitorData object will get create
+            MonitorID monitorID = host.getMonitorIDs().get(0);
+            String channelID = CommonUtils.getChannelID(monitorID);
+            try {
+                    //todo need to fix this rather getting it from a file
+                    Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath);
+                    Channel channel = null;
                     channel = connection.createChannel();
+                    availableChannels.put(channelID, channel);
                     String queueName = channel.queueDeclare().getQueue();
 
                     BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), publisher, host);
@@ -112,9 +116,8 @@ public class AMQPMonitor extends PushMonitor {
                     // here we queuebind to a particular user in a particular machine
                     channel.queueBind(queueName, "glue2.computing_activity", filterString);
                     logger.info("Using filtering string to monitor: " + filterString);
-                } catch (IOException e) {
-                    logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
-                }
+            } catch (IOException e) {
+                logger.error("Error creating the connection to finishQueue the job:" + userMonitorData.getUserName());
             }
 
         }
@@ -152,7 +155,7 @@ public class AMQPMonitor extends PushMonitor {
 
 
 
-    @Override
+    @Subscribe
     public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
         String channelID = CommonUtils.getChannelID(monitorID);
         Channel channel = availableChannels.get(channelID);
@@ -162,13 +165,12 @@ public class AMQPMonitor extends PushMonitor {
         } else {
             try {
                 channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+                channel.close();
+                channel.getConnection().close();
             } catch (IOException e) {
                 logger.error("Error unregistering the listener");
                 throw new AiravataMonitorException("Error unregistering the listener");
             }
-
-
-
         }
         return true;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index dfd5af8..9c08399 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -66,12 +66,14 @@ public class BasicConsumer implements Consumer {
                                byte[] body) {
 
         logger.debug("  job update for: " + envelope.getRoutingKey());
-
         String message = new String(body);
         message = message.replaceAll("(?m)^", "    ");
         // Here we parse the message and get the job status and push it
         // to the Event bus, this will be picked by
 //        AiravataJobStatusUpdator and store in to registry
+        logger.debug("************************************************************");
+        logger.debug("AMQP Message recieved \n" + message);
+        logger.debug("************************************************************");
         try {
             String jobID = envelope.getRoutingKey().split("\\.")[0];
             List<MonitorID> monitorIDs = hostMonitorData.getMonitorIDs();

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
deleted file mode 100644
index 528b9f6..0000000
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.job.monitor.impl.push.amqp;
-
-import com.rabbitmq.client.Channel;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.job.monitor.MonitorID;
-import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
-import org.apache.airavata.job.monitor.util.CommonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class UnRegisterThread extends Thread {
-    private final static Logger logger = LoggerFactory.getLogger(UnRegisterThread.class);
-    private BlockingQueue<MonitorID> finishQueue;
-    private Map<String, Channel> availableChannels;
-
-    public UnRegisterThread(BlockingQueue<MonitorID> monitor, Map<String, Channel> channels) {
-        this.finishQueue = monitor;
-        this.availableChannels = channels;
-    }
-
-    public void run() {
-        while (!ServerSettings.isStopAllThreads()) {
-            try {
-                MonitorID monitorID = this.finishQueue.take();
-                unRegisterListener(monitorID);
-            //
-            } catch (AiravataMonitorException e) {
-                logger.error(e.getLocalizedMessage());
-            } catch (InterruptedException e) {
-                logger.error(e.getLocalizedMessage());
-            }
-        }
-    }
-
-    private boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
-        String channelID = CommonUtils.getChannelID(monitorID);
-        Channel channel = availableChannels.get(channelID);
-        if (channel == null) {
-            logger.error("Already Unregistered the listener");
-            throw new AiravataMonitorException("Already Unregistered the listener");
-        } else {
-            try {
-                channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
-            } catch (IOException e) {
-                logger.error("Error unregistering the listener");
-                throw new AiravataMonitorException("Error unregistering the listener");
-            }
-        }
-        return true;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
new file mode 100644
index 0000000..a893372
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl.push.amqp;
+
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class UnRegisterWorker{
+    private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class);
+    private Map<String, Channel> availableChannels;
+
+    public UnRegisterWorker(Map<String, Channel> channels) {
+        this.availableChannels = channels;
+    }
+
+    @Subscribe
+    private boolean unRegisterListener(JobStatus jobStatus) throws AiravataMonitorException {
+        MonitorID monitorID = jobStatus.getMonitorID();
+        String channelID = CommonUtils.getChannelID(monitorID);
+        if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
+            Channel channel = availableChannels.get(channelID);
+            if (channel == null) {
+                logger.error("Already Unregistered the listener");
+                throw new AiravataMonitorException("Already Unregistered the listener");
+            } else {
+                try {
+                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID));
+                    channel.close();
+                    channel.getConnection().close();
+                    availableChannels.remove(channelID);
+                } catch (IOException e) {
+                    logger.error("Error unregistering the listener");
+                    throw new AiravataMonitorException("Error unregistering the listener");
+                }
+            }
+        }
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/6435ad35/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
index db85625..8a7b160 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -50,7 +50,7 @@ public class CommonUtils {
         }
     }
     public static String getChannelID(MonitorID monitorID) {
-        return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName();
+        return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName() + "-" + monitorID.getJobID();
     }
 
     public static String getRoutingKey(MonitorID monitorID) {