You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:31 UTC

[04/15] airavata git commit: wrapping up working version of queue based communication between orchestrator and gfac

wrapping up working version of queue based communication between orchestrator and gfac


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b6bf782d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b6bf782d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b6bf782d

Branch: refs/heads/master
Commit: b6bf782db30b0e7555f78852da3efcfdbfd530e4
Parents: 0149c1a
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Thu Feb 12 11:36:02 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Thu Feb 12 11:36:02 2015 -0500

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java         |  8 ++++----
 .../apache/airavata/gfac/server/GfacServerHandler.java | 13 ++++---------
 .../airavata/messaging/core/impl/RabbitMQProducer.java | 11 ++++++++++-
 .../core/impl/RabbitMQTaskLaunchPublisher.java         |  4 ++--
 .../core/impl/GFACPassiveJobSubmitter.java             |  4 +++-
 5 files changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b90e0ff..8483da7 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -60,7 +60,7 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_2e539083-665d-40fd-aaa2-4a751028326b";
+    private static String echoAppId = "Echo_78e34255-39f3-4c07-add6-a1a672c80104";
     private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36";
@@ -85,7 +85,7 @@ public class CreateLaunchExperiment {
     public static void main(String[] args) throws Exception {
                 airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
                 System.out.println("API version is " + airavataClient.getAPIVersion());
-//                registerApplications(); // run this only the first time
+//               registerApplications(); // run this only the first time
                 createAndLaunchExp();
     }
     
@@ -101,13 +101,13 @@ public class CreateLaunchExperiment {
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //               final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
-//                final String expId = createExperimentEchoForLocalHost(airavataClient);
+                final String expId = createExperimentEchoForLocalHost(airavataClient);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
 //                final String expId = createExperimentWRFStampede(airavataClient);
 //                final String expId = createExperimentForStampedeAmber(airavataClient);
-                final String expId = createExperimentForTrestlesAmber(airavataClient);
+//                final String expId = createExperimentForTrestlesAmber(airavataClient);
 //                final String expId = createExperimentGROMACSStampede(airavataClient);
 //                final String expId = createExperimentESPRESSOStampede(airavataClient);
 //                final String expId = createExperimentLAMMPSStampede(airavataClient);

http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index d428d9c..c8f1100 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -113,6 +113,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
 
             if(ServerSettings.isGFacPassiveMode()) {
                 rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+                rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
             }
 
 
@@ -291,19 +292,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         }
     }
 
-    private class NotificationMessageHandler implements MessageHandler {
+    private class TaskLaunchMessageHandler implements MessageHandler {
         private String experimentId;
 
-        private NotificationMessageHandler(String experimentId) {
-            this.experimentId = experimentId;
-        }
-
+      
         public Map<String, Object> getProperties() {
             Map<String, Object> props = new HashMap<String, Object>();
-            List<String> routingKeys = new ArrayList<String>();
-            routingKeys.add(experimentId);
-            routingKeys.add(experimentId + ".*");
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
             return props;
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index b4a6d46..570b17f 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -48,6 +48,15 @@ public class RabbitMQProducer {
 
     private String url;
 
+    private String getExchangeType = "topic";
+
+
+    public RabbitMQProducer(String url, String exchangeName,String getExchangeType) {
+        this.exchangeName = exchangeName;
+        this.url = url;
+        this.getExchangeType = getExchangeType;
+    }
+
     public RabbitMQProducer(String url, String exchangeName) {
         this.exchangeName = exchangeName;
         this.url = url;
@@ -104,7 +113,7 @@ public class RabbitMQProducer {
                 log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
                 channel.basicQos(prefetchCount);
             }
-            channel.exchangeDeclare(exchangeName, "topic", false);
+            channel.exchangeDeclare(exchangeName, getExchangeType, false);
         } catch (Exception e) {
             reset();
             String msg = "could not open channel for exchange " + exchangeName;

http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index fe58042..23b2379 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -44,13 +44,13 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
         String exchangeName;
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+            exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
             log.error(message, e);
             throw new AiravataException(message, e);
         }
-        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName,"fanout");
         rabbitMQProducer.open();
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 58ac982..bfe2b16 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -22,6 +22,7 @@ package org.apache.airavata.orchestrator.core.impl;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -141,7 +142,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                             gatewayId = ServerSettings.getDefaultUserGateway();
                         }
                         TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId);
-                        MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TASK-"+ UUID.randomUUID().toString(),gatewayId);
+                        MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+                        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
                         publisher.publish(messageContext);
                     }
                 }