You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/08/11 21:32:43 UTC
[3/4] update airavata to use new events
http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
index 2cde735..a701326 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java
@@ -23,9 +23,9 @@ package org.apache.airavata.gfac.monitor.impl.push.amqp;
import com.google.common.eventbus.Subscribe;
import com.rabbitmq.client.Channel;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +42,7 @@ public class UnRegisterWorker{
}
@Subscribe
- private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException {
- MonitorID monitorID = jobStatus.getMonitorID();
+ private boolean unRegisterListener(JobStatusChangeEvent jobStatus, MonitorID monitorID) throws AiravataMonitorException {
String channelID = CommonUtils.getChannelID(monitorID);
if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){
Channel channel = availableChannels.get(channelID);
http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 1c52bae..94528b9 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
@@ -39,6 +38,7 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Assert;
import org.junit.Before;
@@ -162,7 +162,7 @@ public class AMQPMonitorTest {
}
class InnerClassAMQP{
@Subscribe
- private void getStatus(JobStatusChangeRequest status){
+ private void getStatus(JobStatusChangeEvent status){
Assert.assertNotNull(status);
pushThread.interrupt();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 5488652..537d8bb 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -29,7 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.monitor.HPCMonitorID;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
@@ -41,6 +40,7 @@ import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Assert;
import org.testng.annotations.Test;
@@ -80,7 +80,7 @@ public class QstatMonitorTestWithMyProxyAuth {
class InnerClassQstat {
@Subscribe
- private void getStatus(JobStatusChangeRequest status) {
+ private void getStatus(JobStatusChangeEvent status) {
Assert.assertNotNull(status);
System.out.println(status.getState().toString());
monitorThread.interrupt();
@@ -165,7 +165,7 @@ public class QstatMonitorTestWithMyProxyAuth {
}
@Subscribe
- public void testCaseShutDown(JobStatusChangeRequest status) {
+ public void testCaseShutDown(JobStatusChangeEvent status) {
Assert.assertNotNull(status.getState());
monitorThread.stop();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
index 606b233..dd9c917 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
@@ -44,8 +44,8 @@ import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.common.utils.XMLUtil;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.monitor.state.TaskOutputDataChangedEvent;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
@@ -1398,8 +1398,8 @@ public class WorkflowInterpreter implements AbstractActivityListener{
}
@Subscribe
- public void taskOutputChanged(TaskOutputDataChangedEvent taskOutputEvent){
- String taskId = taskOutputEvent.getIdentity().getTaskId();
+ public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
+ String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
if (isTaskAwaiting(taskId)){
WorkflowNodeState state=WorkflowNodeState.COMPLETED;
Node node = getAwaitingNodeForTask(taskId);
@@ -1425,8 +1425,8 @@ public class WorkflowInterpreter implements AbstractActivityListener{
}
@Subscribe
- public void taskStatusChanged(TaskStatusChangedEvent taskStatus){
- String taskId = taskStatus.getIdentity().getTaskId();
+ public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
+ String taskId = taskStatus.getTaskIdentity().getTaskId();
if (isTaskAwaiting(taskId)){
WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
Node node = getAwaitingNodeForTask(taskId);