You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/03/10 19:55:59 UTC

git commit: fixing issues with job monitor - AIRAVATA-1027

Repository: airavata
Updated Branches:
  refs/heads/master 4ee0852d0 -> 24babee47


fixing issues with job monitor - AIRAVATA-1027


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/24babee4
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/24babee4
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/24babee4

Branch: refs/heads/master
Commit: 24babee475123141141d21b98e71da7cdc4bc66b
Parents: 4ee0852
Author: lahiru <la...@apache.org>
Authored: Mon Mar 10 14:55:48 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Mon Mar 10 14:55:48 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  | 23 +++---
 .../airavata/client/tools/DocumentCreator.java  |  1 +
 .../job/monitor/AiravataJobStatusUpdator.java   |  2 +-
 .../apache/airavata/job/monitor/MonitorID.java  | 25 ++++++
 .../airavata/job/monitor/MonitorManager.java    | 85 ++++++++++++-------
 .../airavata/job/monitor/core/Monitor.java      |  3 +-
 .../job/monitor/event/MonitorPublisher.java     |  1 +
 .../monitor/impl/pull/qstat/QstatMonitor.java   | 24 ++----
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 50 ++++++++---
 .../airavata/job/monitor/state/JobStatus.java   | 17 +---
 .../airavata/job/monitor/AMQPMonitorTest.java   |  3 +-
 .../airavata/job/monitor/QstatMonitorTest.java  |  2 +-
 .../src/test/resources/monitor.properties       |  1 +
 .../main/resources/schemas/HostDescription.xsd  |  1 +
 .../apache/airavata/common/utils/Constants.java |  2 +
 .../resources/conf/airavata-server.properties   |  5 +-
 .../server/OrchestratorServerHandler.java       | 87 +++++++++-----------
 17 files changed, 192 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 4f473f1..6c11091 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,9 +61,9 @@ public class CreateLaunchExperiment {
             final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavata.GetAPIVersion());
             addDescriptors();
-            final String expId = createExperimentForTrestles(airavata);
+//            final String expId = createExperimentForTrestles(airavata);
 //            final String expId = createUS3ExperimentForTrestles(airavata);
-//            final String expId = createExperimentForStampede(airavata);
+            final String expId = createExperimentForStampede(airavata);
 //            final String expId = createUS3ExperimentForStampede(airavata);
             System.out.println("Experiment ID : " + expId);
             launchExperiment(airavata, expId);
@@ -109,16 +109,17 @@ public class CreateLaunchExperiment {
 
 //            Experiment experiment = airavata.getExperiment(expId);
 //            System.out.println("retrieved exp id : " + experiment.getExperimentID());
-        } catch (TException e) {
+        } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
             e.printStackTrace();
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error while creating airavata API object", e.getMessage());
-            e.printStackTrace();
-        } catch (AiravataAPIInvocationException e) {
-            logger.error("Error while creating airavata API object", e.getMessage());
-            e.printStackTrace();
         }
+//        } catch (ApplicationSettingsException e) {
+//            logger.error("Error while creating airavata API object", e.getMessage());
+//            e.printStackTrace();
+//        } catch (AiravataAPIInvocationException e) {
+//            logger.error("Error while creating airavata API object", e.getMessage());
+//            e.printStackTrace();
+//        }
     }
 
     public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException  {
@@ -129,8 +130,8 @@ public class CreateLaunchExperiment {
 //            documentCreator.createPBSDocs();
 //            documentCreator.createPBSDocsForOGCE();
 //            documentCreator.createMPIPBSDocsTrestles();
-//            documentCreator.createSlurmDocs();
-              documentCreator.createMPIPBSDocsStampede();
+            documentCreator.createSlurmDocs();
+//              documentCreator.createMPIPBSDocsStampede();
         } catch (AiravataAPIInvocationException e) {
             logger.error("Unable to create airavata API", e.getMessage());
             throw new AiravataAPIInvocationException(e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index f6b953e..eb3a0ca 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -638,6 +638,7 @@ public class DocumentCreator {
         ((GsisshHostType) host.getType()).setJobManager("slurm");
         ((GsisshHostType) host.getType()).setInstalledPath("/usr/bin/");
         ((GsisshHostType) host.getType()).setPort(2222);
+//        ((GsisshHostType) host.getType()).setMo(2222);
 
 
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 7407a10..6dc007c 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -81,7 +81,7 @@ public class AiravataJobStatusUpdator{
                     break;
                 case UNKNOWN:
                     logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
-                    logger.info("Unknown job status came, if the old job status is RUNNING or something active, we have to make it complete");
+                    jobsToMonitor.remove(jobStatus.getMonitorID());
                     //todo implement this logic
                     break;
                 case QUEUED:

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index 100c141..945362b 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -23,6 +23,9 @@ package org.apache.airavata.job.monitor;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.omg.PortableInterceptor.ACTIVE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +62,7 @@ public class MonitorID {
 
     private int failedCount = 0;
 
+    private JobState state;
 
     public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName) {
         this.host = host;
@@ -171,4 +175,25 @@ public class MonitorID {
     public void setFailedCount(int failedCount) {
         this.failedCount = failedCount;
     }
+
+    public JobState getStatus() {
+        return state;
+    }
+
+    public void setStatus(JobState status) {
+        if (this.state != null && status.equals(JobState.UNKNOWN)) {
+            switch (this.state) {
+                case ACTIVE:
+                    this.state = JobState.COMPLETE;
+                    break;
+                case QUEUED:
+                    this.state = JobState.COMPLETE;
+                    break;
+
+            }
+        }else{
+            // normal scenario
+            this.state = status;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index aad7d41..a0927a5 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor;
 
 import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -29,9 +30,11 @@ import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.validation.constraints.Null;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -50,7 +53,11 @@ public class MonitorManager {
 
     private List<PushMonitor> pushMonitors;   //todo we need to support multiple monitors dynamically
 
-    private BlockingQueue<MonitorID> runningQueue;
+    private BlockingQueue<MonitorID> pullQueue;
+
+    private BlockingQueue<MonitorID> pushQueue;
+
+    private BlockingQueue<MonitorID> localJobQueue;
 
     private BlockingQueue<MonitorID> finishQueue;
 
@@ -62,7 +69,8 @@ public class MonitorManager {
     public MonitorManager() {
         pullMonitors = new ArrayList<PullMonitor>();
         pushMonitors = new ArrayList<PushMonitor>();
-        runningQueue = new LinkedBlockingDeque<MonitorID>();
+        pullQueue = new LinkedBlockingDeque<MonitorID>();
+        pushQueue = new LinkedBlockingDeque<MonitorID>();
         finishQueue = new LinkedBlockingDeque<MonitorID>();
         monitorPublisher = new MonitorPublisher(new EventBus());
         registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue));
@@ -77,7 +85,7 @@ public class MonitorManager {
     public void addAMQPMonitor(AMQPMonitor monitor) {
         monitor.setPublisher(this.getMonitorPublisher());
         monitor.setFinishQueue(this.getFinishQueue());
-        monitor.setRunningQueue(this.getRunningQueue());
+        monitor.setRunningQueue(this.getPushQueue());
         addPushMonitor(monitor);
     }
 
@@ -89,7 +97,7 @@ public class MonitorManager {
      */
     public void addQstatMonitor(QstatMonitor qstatMonitor) {
         qstatMonitor.setPublisher(this.getMonitorPublisher());
-        qstatMonitor.setQueue(this.getRunningQueue());
+        qstatMonitor.setQueue(this.getPullQueue());
         addPullMonitor(qstatMonitor);
 
     }
@@ -127,13 +135,19 @@ public class MonitorManager {
      * This is going to be useful during the startup of the launching process
      *
      * @param monitorID
+     * @throws AiravataMonitorException
      */
     public void addAJobToMonitor(MonitorID monitorID) throws AiravataMonitorException {
-        try {
-            runningQueue.put(monitorID);
-        } catch (InterruptedException e) {
-            String error = "Error while putting the job: " + monitorID.getJobID() + " the monitor queue";
-            throw new AiravataMonitorException(error, e);
+        if (monitorID.getHost().getType() instanceof GsisshHostType) {
+            GsisshHostType host = (GsisshHostType) monitorID.getHost().getType();
+            if ("".equals(host.getMonitorMode()) || host.getMonitorMode() == null
+                    || Constants.PULL.equals(host.getMonitorMode())) {
+                pullQueue.add(monitorID);
+            } else if (Constants.PUSH.equals(host.getMonitorMode())) {
+                pushQueue.add(monitorID);
+            }
+        } else {
+            logger.error("We only support Gsissh host types currently");
         }
     }
 
@@ -148,26 +162,19 @@ public class MonitorManager {
      * @throws AiravataMonitorException
      */
     public void launchMonitor() throws AiravataMonitorException {
-        if (pushMonitors.isEmpty()) {
-            if (pullMonitors.isEmpty()) {
-                logger.error("Before launching MonitorManager should have atleast one Monitor");
-                return;
-            } else {
-                //no push monitor is configured so we launch pull monitor
-                QstatMonitor pullMonitor = (QstatMonitor)pullMonitors.get(0);
-                (new Thread(pullMonitor)).start();
-            }
-        } else {
-            // there is a push monitor configured, so we schedule the push monitor
-            // We currently support dealing with one type of monitor
-            AMQPMonitor pushMonitor = (AMQPMonitor) pushMonitors.get(0);
-            (new Thread(pushMonitor)).start();
-
-            UnRegisterThread unRegisterThread = new
-                    UnRegisterThread(pushMonitor.getFinishQueue(), pushMonitor.getAvailableChannels());
-            unRegisterThread.start();
+        //no push monitor is configured so we launch pull monitor
+        for (PullMonitor monitor : pullMonitors) {
+            (new Thread(monitor)).start();
         }
 
+        for (PushMonitor monitor : pushMonitors) {
+            (new Thread(monitor)).start();
+            if (monitor instanceof AMQPMonitor) {
+                UnRegisterThread unRegisterThread = new
+                        UnRegisterThread(((AMQPMonitor) monitor).getFinishQueue(), ((AMQPMonitor) monitor).getAvailableChannels());
+                unRegisterThread.start();
+            }
+        }
     }
 
     /* getter setters for the private variables */
@@ -188,12 +195,12 @@ public class MonitorManager {
         this.pushMonitors = pushMonitors;
     }
 
-    public BlockingQueue<MonitorID> getRunningQueue() {
-        return runningQueue;
+    public BlockingQueue<MonitorID> getPullQueue() {
+        return pullQueue;
     }
 
-    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
-        this.runningQueue = runningQueue;
+    public void setPullQueue(BlockingQueue<MonitorID> pullQueue) {
+        this.pullQueue = pullQueue;
     }
 
     public MonitorPublisher getMonitorPublisher() {
@@ -211,4 +218,20 @@ public class MonitorManager {
     public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
         this.finishQueue = finishQueue;
     }
+
+    public BlockingQueue<MonitorID> getPushQueue() {
+        return pushQueue;
+    }
+
+    public void setPushQueue(BlockingQueue<MonitorID> pushQueue) {
+        this.pushQueue = pushQueue;
+    }
+
+    public BlockingQueue<MonitorID> getLocalJobQueue() {
+        return localJobQueue;
+    }
+
+    public void setLocalJobQueue(BlockingQueue<MonitorID> localJobQueue) {
+        this.localJobQueue = localJobQueue;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
index ce8cf22..9627bbc 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/Monitor.java
@@ -20,12 +20,11 @@
 */
 package org.apache.airavata.job.monitor.core;
 
-import org.apache.airavata.job.monitor.event.MonitorPublisher;
 
 /**
  * This is the primary interface for Monitors,
  * This can be used to implement different methods of monitoring
  */
-public interface Monitor {
+public interface Monitor extends Runnable {
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
index 12c27fa..95b64ab 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/event/MonitorPublisher.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.job.monitor.event;
 
 import com.google.common.eventbus.EventBus;
+import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.state.JobStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
index a2c85ed..1978ad8 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/pull/qstat/QstatMonitor.java
@@ -20,7 +20,7 @@
 */
 package org.apache.airavata.job.monitor.impl.pull.qstat;
 
-import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PullMonitor;
@@ -29,7 +29,6 @@ import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.state.JobStatus;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +43,7 @@ import java.util.concurrent.BlockingQueue;
  * This monitor is based on qstat command which can be run
  * in grid resources and retrieve the job status.
  */
-public class QstatMonitor extends PullMonitor implements Runnable {
+public class QstatMonitor extends PullMonitor {
     private final static Logger logger = LoggerFactory.getLogger(QstatMonitor.class);
 
     // I think this should use DelayedBlocking Queue to do the monitoring*/
@@ -95,16 +94,9 @@ public class QstatMonitor extends PullMonitor implements Runnable {
         // at the tail of the queue
         MonitorID take = null;
         JobStatus jobStatus = new JobStatus();
-        while (!this.queue.isEmpty()) {
-            try {
-                Iterator<MonitorID> iterator = this.queue.iterator();
-                // no need to check iterator.hasNext because its already checked
-                MonitorID next = iterator.next();
-                // we check whether the job is type of gsissh otherwise we return the job back to the queue
-                // Here we use iterator because it not fair to take the object from the queue unless its
-                // the correct host type,so if its not the right type it will remain in the queue
-                if(next.getHost().getType() instanceof GsisshHostType){
-                    take = this.queue.take();
+        try {
+                take = this.queue.take();
+                if((take.getHost().getType() instanceof GsisshHostType)){
                     long monitorDiff = 0;
                     long startedDiff = 0;
                     if (take.getLastMonitored() != null) {
@@ -129,8 +121,10 @@ public class QstatMonitor extends PullMonitor implements Runnable {
                             connection = new ResourceConnection(take, gsisshHostType.getInstalledPath());
                             connections.put(hostName, connection);
                         }
+                        take.setStatus(connection.getJobStatus(take));
                         jobStatus.setMonitorID(take);
-                        jobStatus.setState(connection.getJobStatus(take));
+                        jobStatus.setState(take.getStatus());
+                        // we have this JobStatus class to handle amqp monitoring
                         publisher.publish(jobStatus);
                         // if the job is completed we do not have to put the job to the queue again
                         if (!jobStatus.getState().equals(JobState.COMPLETE)) {
@@ -186,7 +180,7 @@ public class QstatMonitor extends PullMonitor implements Runnable {
                 }
                 throw new AiravataMonitorException("Error retrieving the job status", e);
             }
-        }
+
 
 
         return true;

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 6bf5f01..b5b6e8f 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -22,17 +22,21 @@ package org.apache.airavata.job.monitor.impl.push.amqp;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.job.monitor.MonitorID;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.util.AMQPConnectionUtil;
 import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
@@ -41,7 +45,7 @@ import java.util.concurrent.BlockingQueue;
  * rabbitmq client to recieve AMQP based monitoring data from
  * mostly excede resources.
  */
-public class AMQPMonitor extends PushMonitor implements Runnable {
+public class AMQPMonitor extends PushMonitor {
     private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
 
 
@@ -57,24 +61,36 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
 
     private BlockingQueue<MonitorID> finishQueue;
 
+    private String connectionName;
+
+    private String proxyPath;
+
+    private List<String> amqpHosts;
+
     public AMQPMonitor(){
 
     }
-    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue) {
+    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue finishQueue,
+                       String proxyPath,String connectionName,List<String> hosts) {
         this.publisher = publisher;
-        this.runningQueue = runningQueue;
-        this.finishQueue = finishQueue;
+        this.runningQueue = runningQueue;        // these will be initialized by the MonitorManager
+        this.finishQueue = finishQueue;          // these will be initialized by the MonitorManager
         availableChannels = new HashMap<String, Channel>();
-//        UnRegisterThread unRegisterThread = new UnRegisterThread(finishQueue,availableChannels);
-//        unRegisterThread.run();
-        System.out.println("Testing");
+        this.connectionName = connectionName;
+        this.proxyPath = proxyPath;
+        this.amqpHosts = hosts;
+    }
+
+    public void initialize(String proxyPath,String connectionName,List<String> hosts){
+        this.connectionName = connectionName;
+        this.proxyPath = proxyPath;
+        this.amqpHosts = hosts;
     }
 
     public void run() {
         try {
             // before going to the while true mode we start unregister thread
             while (true) {
-                // we got a new job to do the monitoring
                 MonitorID take = runningQueue.take();
                 this.registerListener(take);
             }
@@ -99,7 +115,7 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
         // if we already have a channel we do not create one
         if (availableChannels.get(channelID) == null) {
             //todo need to fix this rather getting it from a file
-            Connection connection = AMQPConnectionUtil.connect("xsede_private", "/Users/lahirugunathilake/Downloads/x509up_u503876");
+            Connection connection = AMQPConnectionUtil.connect(connectionName, proxyPath);
             Channel channel = null;
             try {
                 channel = connection.createChannel();
@@ -185,9 +201,19 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
         this.finishQueue = finishQueue;
     }
 
-    /**
-     * implementing a logic to handle the finished job and unsubscribe
-     */
+    public String getProxyPath() {
+        return proxyPath;
+    }
+
+    public void setProxyPath(String proxyPath) {
+        this.proxyPath = proxyPath;
+    }
 
+    public List<String> getAmqpHosts() {
+        return amqpHosts;
+    }
 
+    public void setAmqpHosts(List<String> amqpHosts) {
+        this.amqpHosts = amqpHosts;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
index 9ee6ce8..fe623fb 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
@@ -43,22 +43,7 @@ public class JobStatus {
     }
 
     public void setState(JobState state) {
-        // this is to handle quick change of status and getStatus returns unknown values
-        // because job is already finished and information is removed in the resource
-        if (this.state != null && state.equals(JobState.UNKNOWN)) {
-            switch (this.state) {
-                case ACTIVE:
-                    this.state = JobState.COMPLETE;
-                    break;
-                case QUEUED:
-                    this.state = JobState.COMPLETE;
-                    break;
-
-            }
-        }else{
-            // normal scenario
-            this.state = state;
-        }
+       this.state = state;
     }
 
     public MonitorID getMonitorID() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index a6ef7ea..6327050 100644
--- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -31,7 +31,6 @@ import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
-import org.apache.axiom.om.util.CommonUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -69,7 +68,7 @@ public class AMQPMonitorTest {
         monitorManager = new MonitorManager();
         AMQPMonitor amqpMonitor = new
                 AMQPMonitor(monitorManager.getMonitorPublisher(),
-                monitorManager.getRunningQueue(), monitorManager.getFinishQueue());
+                monitorManager.getPullQueue(), monitorManager.getFinishQueue());
         try {
             monitorManager.addPushMonitor(amqpMonitor);
             monitorManager.launchMonitor();

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
index aabc0d7..126b8ae 100644
--- a/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
+++ b/modules/airavata-job-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@ -68,7 +68,7 @@ public class QstatMonitorTest {
 
         monitorManager = new MonitorManager();
         QstatMonitor qstatMonitor = new
-                QstatMonitor(monitorManager.getRunningQueue(), monitorManager.getMonitorPublisher());
+                QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
         try {
             monitorManager.addPullMonitor(qstatMonitor);
             monitorManager.launchMonitor();

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/airavata-job-monitor/src/test/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/test/resources/monitor.properties b/modules/airavata-job-monitor/src/test/resources/monitor.properties
index 0b0b5f4..a4d68cf 100644
--- a/modules/airavata-job-monitor/src/test/resources/monitor.properties
+++ b/modules/airavata-job-monitor/src/test/resources/monitor.properties
@@ -1,2 +1,3 @@
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede_private
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
----------------------------------------------------------------------
diff --git a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
index 4ec3a4a..c4052d9 100644
--- a/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
+++ b/modules/commons/gfac-schema/src/main/resources/schemas/HostDescription.xsd
@@ -112,6 +112,7 @@
                     <element name="postJobCommands" type="xsd:string" minOccurs="0" maxOccurs="unbounded"/>
                     <element name="installedPath" type="xsd:string" minOccurs="0" maxOccurs="1" default="/opt/torque/bin"/>
                     <element name="jobManager" type="xsd:string" minOccurs="0" maxOccurs="1"/>
+                    <element name="monitorMode" type="xsd:string" minOccurs="0" maxOccurs="1"/>
                 </sequence>
             </extension>
         </complexContent>

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index effc4bb..bac5913 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -29,4 +29,6 @@ public final class Constants {
     public static final String USER_IN_SESSION = "userName";
     public static final String GATEWAY_NAME = "gateway_id";
     public static final String GFAC_CONFIG_XML = "gfac-config.xml";
+    public static final String PUSH = "push";
+    public static final String PULL = "pull";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
index e0c4b4b..d0c94c9 100644
--- a/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
+++ b/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
@@ -256,11 +256,10 @@ TwoPhase=true
 ###---------------------------Monitoring module Configurations---------------------------###
 #This will be the primary monitoring tool which runs in airavata, in future there will be multiple monitoring
 #mechanisms and one would be able to start a monitor
-primaryMonitor=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor
-#We do not support a secondaray monitoring at this point or host specific monitoring
-secondaryMonitor=org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
+monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor
 #This is the amqp related configuration and this lists down the Rabbitmq host, this is an xsede specific configuration
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede_private
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/24babee4/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index defbdcf..0ac4756 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -44,13 +44,12 @@ import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
+import java.lang.String;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
@@ -63,14 +62,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 
     private Registry registry;
 
-    private boolean pushMode = true;
-
     GSIAuthenticationInfo authenticationInfo = null;
 
     /**
      * Query orchestrator server to fetch the CPI version
      */
-    @Override
     public String getOrchestratorCPIVersion() throws TException {
 
         return orchestrator_cpi_serviceConstants.ORCHESTRATOR_CPI_VERSION;
@@ -95,34 +91,33 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                     7512, 17280000, certPath);
 
             // loading Monitor configuration
-            String primaryMonitor = properties.getProperty("primaryMonitor");
-            String secondaryMonitor = properties.getProperty("secondaryMonitor");
-
+            String monitors = properties.getProperty("monitors");
+            List<String> monitorList = Arrays.asList(monitors.split(","));
+            List<String> list = Arrays.asList(properties.getProperty("amqp.hosts").split(","));
+            String proxyPath = properties.getProperty("proxy.file.path");
+            String connectionName = properties.getProperty("connection.name");
 
-            if (primaryMonitor == null) {
+            if (monitors == null) {
                 log.error("Error loading primaryMonitor and there has to be a primary monitor");
             } else {
-                Class<? extends Monitor> aClass = Class.forName(primaryMonitor).asSubclass(Monitor.class);
-                Monitor monitor = aClass.newInstance();
-                if (monitor instanceof PullMonitor) {
-                    if(monitor instanceof QstatMonitor){
-                        monitorManager.addQstatMonitor((QstatMonitor)monitor);
+                for (String monitorClass : monitorList) {
+                    Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
+                    Monitor monitor = aClass.newInstance();
+                    if (monitor instanceof PullMonitor) {
+                        if (monitor instanceof QstatMonitor) {
+                            monitorManager.addQstatMonitor((QstatMonitor) monitor);
+                        }
+                    } else if (monitor instanceof PushMonitor) {
+                        if (monitor instanceof AMQPMonitor) {
+                            ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
+                            monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
+                        }
+                    } else {
+                        log.error("Wrong class is given to primary Monitor");
                     }
-                    pushMode = false;
-                } else if (monitor instanceof PushMonitor) {
-                    if(monitor instanceof AMQPMonitor){
-                        monitorManager.addAMQPMonitor((AMQPMonitor)monitor);
-                    }
-                } else {
-                    log.error("Wrong class is given to primary Monitor");
                 }
-            }
-            if (secondaryMonitor == null) {
-                log.info("No secondary Monitor has configured !!!!");
-            } else {
-                // todo we do not support a secondary Monitor at this point
-            }
 
+            }
             monitorManager.registerListener(orchestrator);
             // Now Monitor Manager is properly configured, now we have to start the monitoring system.
             // This will initialize all the required threads and required queues
@@ -152,20 +147,19 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
      *
      * @param experimentId
      */
-    @Override
     public boolean launchExperiment(String experimentId) throws TException {
         //TODO: Write the Orchestrator implementaion
         try {
             List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
             MonitorID monitorID = null;
-            if(tasks.size() > 1){
+            if (tasks.size() > 1) {
                 log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
             }
-            for(TaskDetails taskID:tasks) {
+            for (TaskDetails taskID : tasks) {
                 //iterate through all the generated tasks and performs the job submisssion+monitoring
 
                 Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
-                if(experiment == null){
+                if (experiment == null) {
                     log.error("Error retrieving the Experiment by the given experimentID: " + experimentId);
                     return false;
                 }
@@ -174,27 +168,28 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                 HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
 
                 // creating monitorID to register with monitoring queue
+                // this is a special case because amqp has to be in place before submitting the job
+                if ((hostDescription instanceof GsisshHostType) &&
+                        Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
 
-                if(pushMode){
-                    // during the pull we need the monitorID in the queue inadvance
-                    // For this we have enough data at this point
-                    monitorID = new MonitorID(hostDescription, null,taskID.getTaskID(),experimentId, userName);
+                    monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), experimentId, userName);
                     monitorManager.addAJobToMonitor(monitorID);
-                }
-                // Launching job for each task
-                String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
-                log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
-                // if the monitoring is pull mode then we add the monitorID for each task after submitting
-                // the job with the jobID, otherwise we don't need the jobID
-                if(!pushMode) {
-                    monitorID = new MonitorID(hostDescription, jobID,taskID.getTaskID(),experimentId, userName, authenticationInfo);
+                    String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+                    log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+                } else {
+                    // Launching job for each task
+                    // if the monitoring is pull mode then we add the monitorID for each task after submitting
+                    // the job with the jobID, otherwise we don't need the jobID
+                    String jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+                    log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+                    monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), experimentId, userName, authenticationInfo);
                     monitorManager.addAJobToMonitor(monitorID);
                 }
             }
         } catch (Exception e) {
             throw new TException(e);
         }
-        return false;
+        return true;
     }
 
     @Override