You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:31 UTC
[04/15] airavata git commit: wrapping up working version of queue
based communication between orchestrator and gfac
wrapping up working version of queue based communication between orchestrator and gfac
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b6bf782d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b6bf782d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b6bf782d
Branch: refs/heads/master
Commit: b6bf782db30b0e7555f78852da3efcfdbfd530e4
Parents: 0149c1a
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Thu Feb 12 11:36:02 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Thu Feb 12 11:36:02 2015 -0500
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 8 ++++----
.../apache/airavata/gfac/server/GfacServerHandler.java | 13 ++++---------
.../airavata/messaging/core/impl/RabbitMQProducer.java | 11 ++++++++++-
.../core/impl/RabbitMQTaskLaunchPublisher.java | 4 ++--
.../core/impl/GFACPassiveJobSubmitter.java | 4 +++-
5 files changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b90e0ff..8483da7 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -60,7 +60,7 @@ public class CreateLaunchExperiment {
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_2e539083-665d-40fd-aaa2-4a751028326b";
+ private static String echoAppId = "Echo_78e34255-39f3-4c07-add6-a1a672c80104";
private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36";
@@ -85,7 +85,7 @@ public class CreateLaunchExperiment {
public static void main(String[] args) throws Exception {
airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
System.out.println("API version is " + airavataClient.getAPIVersion());
-// registerApplications(); // run this only the first time
+// registerApplications(); // run this only the first time
createAndLaunchExp();
}
@@ -101,13 +101,13 @@ public class CreateLaunchExperiment {
// final String expId = createMPIExperimentForFSD(airavataClient);
// final String expId = createEchoExperimentForStampede(airavataClient);
// final String expId = createEchoExperimentForTrestles(airavataClient);
-// final String expId = createExperimentEchoForLocalHost(airavataClient);
+ final String expId = createExperimentEchoForLocalHost(airavataClient);
// final String expId = createExperimentWRFTrestles(airavataClient);
// final String expId = createExperimentForBR2(airavataClient);
// final String expId = createExperimentForBR2Amber(airavataClient);
// final String expId = createExperimentWRFStampede(airavataClient);
// final String expId = createExperimentForStampedeAmber(airavataClient);
- final String expId = createExperimentForTrestlesAmber(airavataClient);
+// final String expId = createExperimentForTrestlesAmber(airavataClient);
// final String expId = createExperimentGROMACSStampede(airavataClient);
// final String expId = createExperimentESPRESSOStampede(airavataClient);
// final String expId = createExperimentLAMMPSStampede(airavataClient);
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index d428d9c..c8f1100 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -113,6 +113,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
if(ServerSettings.isGFacPassiveMode()) {
rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+ rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
}
@@ -291,19 +292,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
}
}
- private class NotificationMessageHandler implements MessageHandler {
+ private class TaskLaunchMessageHandler implements MessageHandler {
private String experimentId;
- private NotificationMessageHandler(String experimentId) {
- this.experimentId = experimentId;
- }
-
+
public Map<String, Object> getProperties() {
Map<String, Object> props = new HashMap<String, Object>();
- List<String> routingKeys = new ArrayList<String>();
- routingKeys.add(experimentId);
- routingKeys.add(experimentId + ".*");
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
return props;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index b4a6d46..570b17f 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -48,6 +48,15 @@ public class RabbitMQProducer {
private String url;
+ private String getExchangeType = "topic";
+
+
+ public RabbitMQProducer(String url, String exchangeName,String getExchangeType) {
+ this.exchangeName = exchangeName;
+ this.url = url;
+ this.getExchangeType = getExchangeType;
+ }
+
public RabbitMQProducer(String url, String exchangeName) {
this.exchangeName = exchangeName;
this.url = url;
@@ -104,7 +113,7 @@ public class RabbitMQProducer {
log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
channel.basicQos(prefetchCount);
}
- channel.exchangeDeclare(exchangeName, "topic", false);
+ channel.exchangeDeclare(exchangeName, getExchangeType, false);
} catch (Exception e) {
reset();
String msg = "could not open channel for exchange " + exchangeName;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index fe58042..23b2379 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -44,13 +44,13 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
String exchangeName;
try {
brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+ exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
log.error(message, e);
throw new AiravataException(message, e);
}
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName,"fanout");
rabbitMQProducer.open();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 58ac982..bfe2b16 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -22,6 +22,7 @@ package org.apache.airavata.orchestrator.core.impl;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
@@ -141,7 +142,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
gatewayId = ServerSettings.getDefaultUserGateway();
}
TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId);
- MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TASK-"+ UUID.randomUUID().toString(),gatewayId);
+ MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
}
}