You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/16 20:23:22 UTC

git commit: publish messages to rabbitmq

Repository: airavata
Updated Branches:
  refs/heads/messaging_framework 8a8a02b88 -> e257af721


publish messages to rabbitmq


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e257af72
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e257af72
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e257af72

Branch: refs/heads/messaging_framework
Commit: e257af7219415ba31b840b5b9c80620463b77c0c
Parents: 8a8a02b
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Tue Sep 16 14:21:41 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Tue Sep 16 14:21:41 2014 -0400

----------------------------------------------------------------------
 .../server/listener/AiravataExperimentStatusUpdator.java |  9 +++++++--
 .../org/apache/airavata/common/utils/ServerSettings.java |  5 +++++
 modules/distribution/server/pom.xml                      |  5 +++++
 .../server/src/main/assembly/bin-assembly.xml            |  1 +
 modules/gfac/gfac-core/pom.xml                           |  5 +++++
 .../apache/airavata/gfac/core/cpi/BetterGfacImpl.java    |  6 +++++-
 .../gfac/core/monitor/AiravataJobStatusUpdator.java      |  7 ++++++-
 .../gfac/core/monitor/AiravataTaskStatusUpdator.java     | 11 +++++++----
 .../core/monitor/AiravataWorkflowNodeStatusUpdator.java  | 11 ++++++++---
 9 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index c78390a..aff0e07 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -25,6 +25,7 @@ import java.util.Calendar;
 import org.apache.airavata.api.server.util.DataModelUtils;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
 import org.apache.airavata.model.util.ExecutionType;
@@ -42,6 +43,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 
     private Registry airavataRegistry;
     private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
 
     public Registry getAiravataRegistry() {
         return airavataRegistry;
@@ -85,6 +87,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 			updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);
 			logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
 			monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
+			publisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
 		} catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
 		}
@@ -111,7 +114,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 				this.airavataRegistry=(Registry)configuration;
 			} else if (configuration instanceof MonitorPublisher){
 				this.monitorPublisher=(MonitorPublisher) configuration;
-			} 
-		}
+			} else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 93f3bc3..6594ecc 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -66,6 +66,7 @@ public class ServerSettings extends ApplicationSettings{
     private static final String MY_PROXY_USER = "myproxy.user";
     private static final String MY_PROXY_PASSWORD = "myproxy.password";
     private static final String MY_PROXY_LIFETIME = "myproxy.life";
+    private static final String ACTIVITY_PUBLISHER = "activity.publisher";
     private static final String ACTIVITY_LISTENERS = "activity.listeners";
 
     private static boolean stopAllThreads = false;
@@ -219,6 +220,10 @@ public class ServerSettings extends ApplicationSettings{
     public static String[] getActivityListeners() throws ApplicationSettingsException {
         return getSetting(ACTIVITY_LISTENERS).split(",");
     }
+
+    public static String getActivityPublisher() throws ApplicationSettingsException{
+        return getSetting(ACTIVITY_PUBLISHER);
+    }
     public static boolean isEmbeddedZK() {
         return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 8138baa..4c1570f 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -254,6 +254,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
             <artifactId>app-catalog-data</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index e84d7b9..90fd9a3 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -220,6 +220,7 @@
                 <include>org.apache.airavata:airavata-message-monitor:jar</include>
                 <include>org.apache.airavata:airavata-workflow-model-core:jar</include>
                 <include>org.apache.airavata:airavata-messenger-commons:jar</include>
+                <include>org.apache.airavata:airavata-messaging-core:jar</include>
                 <include>org.apache.airavata:airavata-messenger-client:jar</include>
                 <include>org.apache.airavata:airavata-workflow-tracking:jar</include>
                 <include>org.apache.airavata:airavata-workflow-engine:jar</include>

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index f0a73f6..bb7836e 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -77,6 +77,11 @@
             <artifactId>airavata-credential-store</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messaging-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
 
         <!-- Test -->

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index e843e4d..75f5694 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -50,6 +50,7 @@ import org.apache.airavata.gfac.core.provider.GFacRecoverableProvider;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.states.GfacPluginState;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
@@ -126,11 +127,14 @@ public class BetterGfacImpl implements GFac {
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
         try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
+            String activityPublisher = ServerSettings.getActivityPublisher();
+            Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class);
+            Publisher rabbitMQPublisher = aPublisher.newInstance();
             for (String listenerClass : listenerClassList) {
                 Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
                 AbstractActivityListener abstractActivityListener = aClass.newInstance();
                 activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(publisher, registry, zk);
+                abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher);
                 log.info("Registering listener: " + listenerClass);
                 publisher.registerListener(abstractActivityListener);
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index 473debd..d142ceb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
 
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
@@ -41,6 +42,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
     private Registry airavataRegistry;
 
     private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
 
 
     public Registry getAiravataRegistry() {
@@ -65,6 +67,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
                 updateJobStatus(taskID, jobID, state);
     			logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString());
             	monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
+                publisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()));
             } catch (Exception e) {
                 logger.error("Error persisting data" + e.getLocalizedMessage(), e);
             }
@@ -93,7 +96,9 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
 				this.airavataRegistry=(Registry)configuration;
 			} else if (configuration instanceof MonitorPublisher){
 				this.monitorPublisher=(MonitorPublisher) configuration;
-			} 
+			} else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index e6ab5ef..f4e6241 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
 
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.messaging.event.TaskIdentity;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -37,10 +38,9 @@ import com.google.common.eventbus.Subscribe;
 
 public class AiravataTaskStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
-
     private Registry airavataRegistry;
-
     private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
     
     public Registry getAiravataRegistry() {
         return airavataRegistry;
@@ -93,6 +93,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
                                                          jobStatus.getJobIdentity().getWorkflowNodeId(),
                                                          jobStatus.getJobIdentity().getExperimentId());
             monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
+            publisher.publish(new TaskStatusChangeEvent(state, taskIdentity));
 
         } catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
@@ -119,7 +120,9 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
 				this.airavataRegistry=(Registry)configuration;
 			} else if (configuration instanceof MonitorPublisher){
 				this.monitorPublisher=(MonitorPublisher) configuration;
-			} 
-		}
+			} else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
 	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e257af72/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index e0b8f9e..fe24bd0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
 
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
+import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.WorkflowIdentity;
 import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent;
@@ -41,8 +42,9 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
     private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
 
     private Registry airavataRegistry;
-
     private MonitorPublisher monitorPublisher;
+    private Publisher publisher;
+
 
     public Registry getAiravataRegistry() {
         return airavataRegistry;
@@ -78,6 +80,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
 			logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString());
             WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
             monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
+            publisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity));
 		} catch (Exception e) {
             logger.error("Error persisting data" + e.getLocalizedMessage(), e);
 		}
@@ -103,7 +106,9 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
 				this.airavataRegistry=(Registry)configuration;
 			} else if (configuration instanceof MonitorPublisher){
 				this.monitorPublisher=(MonitorPublisher) configuration;
-			} 
-		}
+			}  else if (configuration instanceof Publisher){
+                this.publisher=(Publisher) configuration;
+            }
+        }
 	}
 }