You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ra...@apache.org on 2014/03/26 16:36:24 UTC
[2/3] git commit: merged the changes
merged the changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7e845c10
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7e845c10
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7e845c10
Branch: refs/heads/master
Commit: 7e845c10441ea7067167f12bfd2e088adb39c060
Parents: 04c502b c1f3429
Author: raminder <ra...@apache.org>
Authored: Wed Mar 26 11:35:53 2014 -0400
Committer: raminder <ra...@apache.org>
Committed: Wed Mar 26 11:35:53 2014 -0400
----------------------------------------------------------------------
modules/distribution/airavata-client/pom.xml | 2 +-
.../src/main/resources/conf/log4j.properties | 1 +
modules/gfac/gfac-monitor/.pom.xml.swp | Bin 16384 -> 0 bytes
modules/gfac/gfac-monitor/pom.xml | 2 +-
.../airavata/job/monitor/MonitorManager.java | 20 ++-
.../airavata/job/monitor/AMQPMonitorTest.java | 36 +++---
.../airavata/job/monitor/QstatMonitorTest.java | 31 ++---
.../orchestrator-client-sdks/pom.xml | 2 +-
modules/orchestrator/orchestrator-core/pom.xml | 2 +-
.../job/monitor/core/MessageParser.java | 4 +-
.../airavata/job/monitor/core/PushMonitor.java | 2 +-
.../job/monitor/event/MonitorPublisher.java | 4 +
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 125 +++++++++++--------
.../monitor/impl/push/amqp/BasicConsumer.java | 34 ++---
.../impl/push/amqp/JSONMessageParser.java | 4 +-
.../impl/push/amqp/UnRegisterThread.java | 76 -----------
.../impl/push/amqp/UnRegisterWorker.java | 68 ++++++++++
.../airavata/job/monitor/util/CommonUtils.java | 1 +
18 files changed, 203 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/7e845c10/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index 4a1126a,107f0dc..1085dd0
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@@ -20,11 -20,6 +20,13 @@@
*/
package org.apache.airavata.job.monitor;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@@ -33,22 -28,27 +35,24 @@@ import org.apache.airavata.gsi.ssh.api.
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
++import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Before;
import org.junit.Test;
- public class AMQPMonitorTest {
- private MonitorManager monitorManager;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
++import com.google.common.eventbus.EventBus;
+ public class AMQPMonitorTest {
private String myProxyUserName;
private String myProxyPassword;
private String certificateLocation;
-- private String pbsFilePath;
private String workingDirectory;
private HostDescription hostDescription;
--
++ private BlockingQueue<MonitorID> amqpQueue;
++ private BlockingQueue<MonitorID> finishQueue;
++ private AMQPMonitor amqpMonitor;
@Before
public void setUp() throws Exception {
System.setProperty("myproxy.user", "ogce");
@@@ -65,19 -65,19 +69,12 @@@
"E.g :- mvn clean install -Dmyproxy.user=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<");
throw new Exception("Need my proxy user name password to run tests.");
}
--
-- monitorManager = new MonitorManager();
-- AMQPMonitor amqpMonitor = new
-- AMQPMonitor(monitorManager.getMonitorPublisher(),
- monitorManager.getPullQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
- monitorManager.getPushQueue(), monitorManager.getFinishQueue(),"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
++ amqpQueue = new LinkedBlockingQueue<MonitorID>();
++ finishQueue = new LinkedBlockingQueue<MonitorID>();
++ amqpMonitor = new
++ AMQPMonitor(new MonitorPublisher(new EventBus()),
++ amqpQueue,finishQueue,"/Users/lahirugunathilake/Downloads/x509up_u503876","xsede_private",
Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
-- try {
-- monitorManager.addPushMonitor(amqpMonitor);
-- monitorManager.launchMonitor();
-- } catch (AiravataMonitorException e) {
-- e.printStackTrace();
-- }
--
hostDescription = new HostDescription(GsisshHostType.type);
hostDescription.getType().setHostAddress("gordon.sdsc.xsede.org");
hostDescription.getType().setHostName("gsissh-gordon");
@@@ -125,12 -125,11 +122,9 @@@
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println(jobID);
try {
-- monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, "ogce"));
- }catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- catch (AiravataMonitorException e) {
++ amqpMonitor.registerListener(new MonitorID(hostDescription,jobID,null,null,myProxyUserName));
+ } catch (AiravataMonitorException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
try {
Thread.sleep(10000);
http://git-wip-us.apache.org/repos/asf/airavata/blob/7e845c10/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
index fdfe841,68460d4..bc6c4c5
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTest.java
@@@ -20,6 -20,6 +20,7 @@@
*/
package org.apache.airavata.job.monitor;
++import com.google.common.eventbus.EventBus;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@@ -28,9 -28,9 +29,10 @@@ import org.apache.airavata.gsi.ssh.api.
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
--import org.apache.airavata.gsi.ssh.util.CommonUtils;
++import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
++import org.apache.airavata.job.monitor.util.CommonUtils;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Before;
import org.junit.Test;
@@@ -38,17 -38,16 +40,17 @@@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
++import java.util.concurrent.BlockingQueue;
public class QstatMonitorTest {
-- private MonitorManager monitorManager;
-
private String myProxyUserName;
private String myProxyPassword;
private String certificateLocation;
private String pbsFilePath;
private String workingDirectory;
private HostDescription hostDescription;
--
++ private BlockingQueue<UserMonitorData> q;
++ private QstatMonitor qstatMonitor;
@Before
public void setUp() throws Exception {
System.setProperty("myproxy.user", "ogce");
@@@ -66,19 -65,19 +68,13 @@@
throw new Exception("Need my proxy user name password to run tests.");
}
-- monitorManager = new MonitorManager();
-- QstatMonitor qstatMonitor = new
-- QstatMonitor(monitorManager.getPullQueue(), monitorManager.getMonitorPublisher());
-- try {
-- monitorManager.addPullMonitor(qstatMonitor);
-- monitorManager.launchMonitor();
-- } catch (AiravataMonitorException e) {
-- e.printStackTrace();
-- }
++ qstatMonitor = new
++ QstatMonitor(q, new MonitorPublisher(new EventBus()));
hostDescription = new HostDescription(GsisshHostType.type);
hostDescription.getType().setHostAddress("trestles.sdsc.edu");
hostDescription.getType().setHostName("gsissh-gordon");
++ qstatMonitor.startPulling();
}
@Test
@@@ -93,7 -92,7 +89,7 @@@
ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress());
-- Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/"));
++ Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, org.apache.airavata.gsi.ssh.util.CommonUtils.getPBSJobManager("/opt/torque/bin/"));
// Execute command
@@@ -124,12 -123,11 +120,9 @@@
MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
monitorID.setAuthenticationInfo(authenticationInfo);
try {
-- monitorManager.addAJobToMonitor(monitorID);
- }catch (InterruptedException e) {
- e.printStackTrace();
- }
- catch (AiravataMonitorException e) {
++ CommonUtils.addMonitortoQueue(q,monitorID);
+ } catch (AiravataMonitorException e) {
e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
try {