You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2014/03/26 16:36:24 UTC

[2/3] git commit: merged the changes

merged the changes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7e845c10
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7e845c10
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7e845c10

Branch: refs/heads/master
Commit: 7e845c10441ea7067167f12bfd2e088adb39c060
Parents: 04c502b c1f3429
Author: raminder <ra...@apache.org>
Authored: Wed Mar 26 11:35:53 2014 -0400
Committer: raminder <ra...@apache.org>
Committed: Wed Mar 26 11:35:53 2014 -0400

----------------------------------------------------------------------
 modules/distribution/airavata-client/pom.xml    |   2 +-
 .../src/main/resources/conf/log4j.properties    |   1 +
 modules/gfac/gfac-monitor/.pom.xml.swp          | Bin 16384 -> 0 bytes
 modules/gfac/gfac-monitor/pom.xml               |   2 +-
 .../airavata/job/monitor/MonitorManager.java    |  20 ++-
 .../airavata/job/monitor/AMQPMonitorTest.java   |  36 +++---
 .../airavata/job/monitor/QstatMonitorTest.java  |  31 ++---
 .../orchestrator-client-sdks/pom.xml            |   2 +-
 modules/orchestrator/orchestrator-core/pom.xml  |   2 +-
 .../job/monitor/core/MessageParser.java         |   4 +-
 .../airavata/job/monitor/core/PushMonitor.java  |   2 +-
 .../job/monitor/event/MonitorPublisher.java     |   4 +
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 125 +++++++++++--------
 .../monitor/impl/push/amqp/BasicConsumer.java   |  34 ++---
 .../impl/push/amqp/JSONMessageParser.java       |   4 +-
 .../impl/push/amqp/UnRegisterThread.java        |  76 -----------
 .../impl/push/amqp/UnRegisterWorker.java        |  68 ++++++++++
 .../airavata/job/monitor/util/CommonUtils.java  |   1 +
 18 files changed, 203 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/7e845c10/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index 4a1126a,107f0dc..1085dd0
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@@ -20,11 -20,6 +20,13 @@@
  */
  package org.apache.airavata.job.monitor;
  
 +import java.io.File;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.LinkedBlockingQueue;
 +
  import org.apache.airavata.commons.gfac.type.HostDescription;
  import org.apache.airavata.gsi.ssh.api.Cluster;
  import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@@ -33,22 -28,27 +35,24 @@@ import org.apache.airavata.gsi.ssh.api.
  import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
  import org.apache.airavata.gsi.ssh.impl.PBSCluster;
  import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
++import org.apache.airavata.job.monitor.event.MonitorPublisher;
  import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
  import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
  import org.apache.airavata.schemas.gfac.GsisshHostType;
  import org.junit.Before;
  import org.junit.Test;
  
- public class AMQPMonitorTest {
-     private MonitorManager monitorManager;
 -import java.io.File;
 -import java.util.ArrayList;
 -import java.util.Arrays;
 -import java.util.List;
++import com.google.common.eventbus.EventBus;
  
+ public class AMQPMonitorTest {
      private String myProxyUserName;
      private String myProxyPassword;
      private String certificateLocation;
--    private String pbsFilePath;
      private String workingDirectory;
      private HostDescription hostDescription;
--
++    private BlockingQueue<MonitorID> amqpQueue;
++    private BlockingQueue<MonitorID> finishQueue;
++    private AMQPMonitor amqpMonitor;
      @Before
      public void setUp() throws Exception {
          System.setProperty("myproxy.user", "ogce");
@@@ -65,19 -65,19 +69,12 @@@
                      "E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
              throw new Exception("Need my proxy user name password to run tests.");
          }
--
--        monitorManager = new MonitorManager();
--        AMQPMonitor amqpMonitor = new
--                AMQPMonitor(monitorManager.getMonitorPublisher(),
-                 monitorManager.getPullQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
 -                monitorManager.getPushQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
++        amqpQueue = new LinkedBlockingQueue<MonitorID>();
++        finishQueue = new LinkedBlockingQueue<MonitorID>();
++        amqpMonitor = new
++                AMQPMonitor(new MonitorPublisher(new EventBus()),
++                amqpQueue,finishQueue,"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
                  Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
--        try {
--            monitorManager.addPushMonitor(amqpMonitor);
--            monitorManager.launchMonitor();
--        } catch (AiravataMonitorException e) {
--            e.printStackTrace();
--        }
--
          hostDescription = new HostDescription(GsisshHostType.type);
          hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
          hostDescription.getType().setHostName("gsissh-gordon");
@@@ -125,12 -125,11 +122,9 @@@
          String jobID = pbsCluster.submitBatchJob(jobDescriptor);
          System.out.println(jobID);
          try {
--            monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, "ogce"));
-         }catch (InterruptedException e) {
-             e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-         } 
-         catch (AiravataMonitorException e) {
++             amqpMonitor.registerListener(new MonitorID(hostDescription,jobID,null,null,myProxyUserName));
+         } catch (AiravataMonitorException e) {
              e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
 -        } catch (InterruptedException e) {
 -            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
          }
          try {
              Thread.sleep(10000);

http://git-wip-us.apache.org/repos/asf/airavata/blob/7e845c10/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
index fdfe841,68460d4..bc6c4c5
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@@ -20,6 -20,6 +20,7 @@@
  */
  package org.apache.airavata.job.monitor;
  
++import com.google.common.eventbus.EventBus;
  import org.apache.airavata.commons.gfac.type.HostDescription;
  import org.apache.airavata.gsi.ssh.api.Cluster;
  import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@@ -28,9 -28,9 +29,10 @@@ import org.apache.airavata.gsi.ssh.api.
  import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
  import org.apache.airavata.gsi.ssh.impl.PBSCluster;
  import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
--import org.apache.airavata.gsi.ssh.util.CommonUtils;
++import org.apache.airavata.job.monitor.event.MonitorPublisher;
  import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
  import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
++import org.apache.airavata.job.monitor.util.CommonUtils;
  import org.apache.airavata.schemas.gfac.GsisshHostType;
  import org.junit.Before;
  import org.junit.Test;
@@@ -38,17 -38,16 +40,17 @@@
  import java.io.File;
  import java.util.ArrayList;
  import java.util.List;
++import java.util.concurrent.BlockingQueue;
  
  public class QstatMonitorTest {
--    private MonitorManager monitorManager;
- 
      private String myProxyUserName;
      private String myProxyPassword;
      private String certificateLocation;
      private String pbsFilePath;
      private String workingDirectory;
      private HostDescription hostDescription;
--
++    private BlockingQueue<UserMonitorData> q;
++    private QstatMonitor qstatMonitor;
      @Before
      public void setUp() throws Exception {
          System.setProperty("myproxy.user", "ogce");
@@@ -66,19 -65,19 +68,13 @@@
              throw new Exception("Need my proxy user name password to run tests.");
          }
  
--        monitorManager = new MonitorManager();
--        QstatMonitor qstatMonitor = new
--                QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
--        try {
--            monitorManager.addPullMonitor(qstatMonitor);
--            monitorManager.launchMonitor();
--        } catch (AiravataMonitorException e) {
--            e.printStackTrace();
--        }
++        qstatMonitor = new
++                QstatMonitor(q, new MonitorPublisher(new EventBus()));
  
          hostDescription = new HostDescription(GsisshHostType.type);
          hostDescription.getType().setHostAddress("trestles.sdsc.edu");
          hostDescription.getType().setHostName("gsissh-gordon");
++        qstatMonitor.startPulling();
      }
  
      @Test
@@@ -93,7 -92,7 +89,7 @@@
          ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
  
  
--        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/"));
++        Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, org.apache.airavata.gsi.ssh.util.CommonUtils.getPBSJobManager("/opt/torque/bin/"));
  
  
          // Execute command
@@@ -124,12 -123,11 +120,9 @@@
              MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
              monitorID.setAuthenticationInfo(authenticationInfo);
              try {
--                monitorManager.addAJobToMonitor(monitorID);
-             }catch (InterruptedException e) {
-                 e.printStackTrace();  
-             }
-             catch (AiravataMonitorException e) {
++                CommonUtils.addMonitortoQueue(q,monitorID);
+             } catch (AiravataMonitorException e) {
                  e.printStackTrace();
 -            } catch (InterruptedException e) {
 -                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
              }
          }
          try {