You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/16 20:23:22 UTC
git commit: publish messages to rabbitmq
Repository: airavata
Updated Branches:
refs/heads/messaging_framework 8a8a02b88 -> e257af721
publish messages to rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e257af72
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e257af72
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e257af72
Branch: refs/heads/messaging_framework
Commit: e257af7219415ba31b840b5b9c80620463b77c0c
Parents: 8a8a02b
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Tue Sep 16 14:21:41 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Tue Sep 16 14:21:41 2014 -0400
----------------------------------------------------------------------
.../server/listener/AiravataExperimentStatusUpdator.java | 9 +++++++--
.../org/apache/airavata/common/utils/ServerSettings.java | 5 +++++
modules/distribution/server/pom.xml | 5 +++++
.../server/src/main/assembly/bin-assembly.xml | 1 +
modules/gfac/gfac-core/pom.xml | 5 +++++
.../apache/airavata/gfac/core/cpi/BetterGfacImpl.java | 6 +++++-
.../gfac/core/monitor/AiravataJobStatusUpdator.java | 7 ++++++-
.../gfac/core/monitor/AiravataTaskStatusUpdator.java | 11 +++++++----
.../core/monitor/AiravataWorkflowNodeStatusUpdator.java | 11 ++++++++---
9 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index c78390a..aff0e07 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -25,6 +25,7 @@ import java.util.Calendar;
import org.apache.airavata.api.server.util.DataModelUtils;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
import org.apache.airavata.model.util.ExecutionType;
@@ -42,6 +43,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
private Registry airavataRegistry;
private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
public Registry getAiravataRegistry() {
return airavataRegistry;
@@ -85,6 +87,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);
logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
+ publisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
@@ -111,7 +114,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
this.airavataRegistry=(Registry)configuration;
} else if (configuration instanceof MonitorPublisher){
this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 93f3bc3..6594ecc 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -66,6 +66,7 @@ public class ServerSettings extends ApplicationSettings{
private static final String MY_PROXY_USER = "myproxy.user";
private static final String MY_PROXY_PASSWORD = "myproxy.password";
private static final String MY_PROXY_LIFETIME = "myproxy.life";
+ private static final String ACTIVITY_PUBLISHER = "activity.publisher";
private static final String ACTIVITY_LISTENERS = "activity.listeners";
private static boolean stopAllThreads = false;
@@ -219,6 +220,10 @@ public class ServerSettings extends ApplicationSettings{
public static String[] getActivityListeners() throws ApplicationSettingsException {
return getSetting(ACTIVITY_LISTENERS).split(",");
}
+
+ public static String getActivityPublisher() throws ApplicationSettingsException{
+ return getSetting(ACTIVITY_PUBLISHER);
+ }
public static boolean isEmbeddedZK() {
return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 8138baa..4c1570f 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -254,6 +254,11 @@
</dependency>
<dependency>
<groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
<artifactId>app-catalog-data</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index e84d7b9..90fd9a3 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -220,6 +220,7 @@
<include>org.apache.airavata:airavata-message-monitor:jar</include>
<include>org.apache.airavata:airavata-workflow-model-core:jar</include>
<include>org.apache.airavata:airavata-messenger-commons:jar</include>
+ <include>org.apache.airavata:airavata-messaging-core:jar</include>
<include>org.apache.airavata:airavata-messenger-client:jar</include>
<include>org.apache.airavata:airavata-workflow-tracking:jar</include>
<include>org.apache.airavata:airavata-workflow-engine:jar</include>
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index f0a73f6..bb7836e 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -77,6 +77,11 @@
<artifactId>airavata-credential-store</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messaging-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Test -->
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index e843e4d..75f5694 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -50,6 +50,7 @@ import org.apache.airavata.gfac.core.provider.GFacRecoverableProvider;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
@@ -126,11 +127,14 @@ public class BetterGfacImpl implements GFac {
public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
try {
String[] listenerClassList = ServerSettings.getActivityListeners();
+ String activityPublisher = ServerSettings.getActivityPublisher();
+ Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class);
+ Publisher rabbitMQPublisher = aPublisher.newInstance();
for (String listenerClass : listenerClassList) {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry, zk);
+ abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher);
log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index 473debd..d142ceb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
@@ -41,6 +42,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
private Registry airavataRegistry;
private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
public Registry getAiravataRegistry() {
@@ -65,6 +67,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
updateJobStatus(taskID, jobID, state);
logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString());
monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
+ publisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
@@ -93,7 +96,9 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
this.airavataRegistry=(Registry)configuration;
} else if (configuration instanceof MonitorPublisher){
this.monitorPublisher=(MonitorPublisher) configuration;
- }
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index e6ab5ef..f4e6241 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.messaging.event.TaskIdentity;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -37,10 +38,9 @@ import com.google.common.eventbus.Subscribe;
public class AiravataTaskStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
-
private Registry airavataRegistry;
-
private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
public Registry getAiravataRegistry() {
return airavataRegistry;
@@ -93,6 +93,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
jobStatus.getJobIdentity().getWorkflowNodeId(),
jobStatus.getJobIdentity().getExperimentId());
monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
+ publisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
@@ -119,7 +120,9 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
this.airavataRegistry=(Registry)configuration;
} else if (configuration instanceof MonitorPublisher){
this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index e0b8f9e..fe24bd0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.messaging.event.WorkflowIdentity;
import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
@@ -41,8 +42,9 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
private Registry airavataRegistry;
-
private MonitorPublisher monitorPublisher;
+ private Publisher publisher;
+
public Registry getAiravataRegistry() {
return airavataRegistry;
@@ -78,6 +80,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString());
WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
+ publisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
@@ -103,7 +106,9 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
this.airavataRegistry=(Registry)configuration;
} else if (configuration instanceof MonitorPublisher){
this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
+ } else if (configuration instanceof Publisher){
+ this.publisher=(Publisher) configuration;
+ }
+ }
}
}