You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/08/11 21:32:43 UTC

[3/4] update airavata to use new events

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
index 2cde735..a701326 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -23,9 +23,9 @@ package org.apache.airavata.gfac.monitor.impl.push.amqp;
 import com.google.common.eventbus.Subscribe;
 import com.rabbitmq.client.Channel;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,8 +42,7 @@ public class UnRegisterWorker{
     }
 
     @Subscribe
-    private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
-        MonitorID monitorID = jobStatus.getMonitorID();
+    private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException {
         String channelID = CommonUtils.getChannelID(monitorID);
         if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
             Channel channel = availableChannels.get(channelID);

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 1c52bae..94528b9 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.gsi.ssh.api.Cluster;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@ -39,6 +38,7 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.junit.Assert;
 import org.junit.Before;
@@ -162,7 +162,7 @@ public class AMQPMonitorTest {
         }
         class InnerClassAMQP{
             @Subscribe
-            private void getStatus(JobStatusChangeRequest status){
+            private void getStatus(JobStatusChangeEvent status){
                 Assert.assertNotNull(status);
                 pushThread.interrupt();
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 5488652..537d8bb 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.commons.gfac.type.HostDescription;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.monitor.HPCMonitorID;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
@@ -41,6 +40,7 @@ import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.gsi.ssh.impl.PBSCluster;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
 import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.junit.Assert;
 import org.testng.annotations.Test;
@@ -80,7 +80,7 @@ public class QstatMonitorTestWithMyProxyAuth {
         class InnerClassQstat {
 
             @Subscribe
-            private void getStatus(JobStatusChangeRequest status) {
+            private void getStatus(JobStatusChangeEvent status) {
                 Assert.assertNotNull(status);
                 System.out.println(status.getState().toString());
                 monitorThread.interrupt();
@@ -165,7 +165,7 @@ public class QstatMonitorTestWithMyProxyAuth {
     }
 
     @Subscribe
-    public void testCaseShutDown(JobStatusChangeRequest status) {
+    public void testCaseShutDown(JobStatusChangeEvent status) {
         Assert.assertNotNull(status.getState());
         monitorThread.stop();
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
index 606b233..dd9c917 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
@@ -44,8 +44,8 @@ import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
 import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.common.utils.XMLUtil;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.monitor.state.TaskOutputDataChangedEvent;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.model.workspace.experiment.DataObjectType;
 import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
@@ -1398,8 +1398,8 @@ public class WorkflowInterpreter implements AbstractActivityListener{
 	}
 	
 	@Subscribe
-    public void taskOutputChanged(TaskOutputDataChangedEvent taskOutputEvent){
-		String taskId = taskOutputEvent.getIdentity().getTaskId();
+    public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
+		String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
 		if (isTaskAwaiting(taskId)){
         	WorkflowNodeState state=WorkflowNodeState.COMPLETED;
 			Node node = getAwaitingNodeForTask(taskId);
@@ -1425,8 +1425,8 @@ public class WorkflowInterpreter implements AbstractActivityListener{
 	}
 	
     @Subscribe
-    public void taskStatusChanged(TaskStatusChangedEvent taskStatus){
-    	String taskId = taskStatus.getIdentity().getTaskId();
+    public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
+    	String taskId = taskStatus.getTaskIdentity().getTaskId();
 		if (isTaskAwaiting(taskId)){
         	WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
 			Node node = getAwaitingNodeForTask(taskId);