You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/09 22:19:40 UTC
[2/4] airavata git commit: Refactored messaging module to remove
duplicate code and support multiple subscribers
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
deleted file mode 100644
index 561cde2..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.Consumer;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQStatusConsumer implements Consumer {
- public static final String EXCHANGE_TYPE = "topic";
- private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
- private String exchangeName;
- private String url;
- private Connection connection;
- private Channel channel;
- private int prefetchCount;
- private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-
- public RabbitMQStatusConsumer() throws AiravataException {
- try {
- url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
- prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
- createConnection();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- }
-
- public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException {
- this.exchangeName = exchangeName;
- this.url = brokerUrl;
-
- createConnection();
- }
-
- private void createConnection() throws AiravataException {
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(url);
- connectionFactory.setAutomaticRecoveryEnabled(true);
- connection = connectionFactory.newConnection();
- connection.addShutdownListener(new ShutdownListener() {
- public void shutdownCompleted(ShutdownSignalException cause) {
- }
- });
- log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
- channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false);
-
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + exchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public String listen(final MessageHandler handler) throws AiravataException {
- try {
- Map<String, Object> props = handler.getProperties();
- final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
- if (routing == null) {
- throw new IllegalArgumentException("The routing key must be present");
- }
-
- List<String> keys = new ArrayList<String>();
- if (routing instanceof List) {
- for (Object o : (List)routing) {
- keys.add(o.toString());
- }
- } else if (routing instanceof String) {
- keys.add((String) routing);
- }
-
- String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
- String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
- if (queueName == null) {
- if (!channel.isOpen()) {
- channel = connection.createChannel();
- channel.exchangeDeclare(exchangeName, "topic", false);
- }
- queueName = channel.queueDeclare().getQueue();
- } else {
- channel.queueDeclare(queueName, true, false, false, null);
- }
-
- final String id = getId(keys, queueName);
- if (queueDetailsMap.containsKey(id)) {
- throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
- "cannot define the same subscriber twice");
- }
-
- if (consumerTag == null) {
- consumerTag = "default";
- }
-
- // bind all the routing keys
- for (String routingKey : keys) {
- channel.queueBind(queueName, exchangeName, routingKey);
- }
-
- channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
- Message message = new Message();
-
- try {
- ThriftUtils.createThriftFromBytes(body, message);
- TBase event = null;
- String gatewayId = null;
-
- if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
- ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- experimentStatusChangeEvent.getState());
- event = experimentStatusChangeEvent;
- gatewayId = experimentStatusChangeEvent.getGatewayId();
- } else if (message.getMessageType().equals(MessageType.PROCESS)) {
- ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
- log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
- "message type " + message.getMessageType() + " with status " +
- processStatusChangeEvent.getState());
- event = processStatusChangeEvent;
- gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
- } else if (message.getMessageType().equals(MessageType.TASK)) {
- TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- taskStatusChangeEvent.getState());
- event = taskStatusChangeEvent;
- gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
- }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
- TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
- event = taskOutputChangeEvent;
- gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
- } else if (message.getMessageType().equals(MessageType.JOB)) {
- JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- jobStatusChangeEvent.getState());
- event = jobStatusChangeEvent;
- gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
- } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
- taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
- event = taskSubmitEvent;
- gatewayId = taskSubmitEvent.getGatewayId();
- } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
- TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
- taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
- event = taskTerminateEvent;
- gatewayId = null;
- }
- MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
- messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
- messageContext.setIsRedeliver(envelope.isRedeliver());
- handler.onMessage(messageContext);
- } catch (TException e) {
- String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
- log.warn(msg, e);
- }
- }
- });
- // save the name for deleting the queue
- queueDetailsMap.put(id, new QueueDetails(queueName, keys));
- return id;
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + exchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void stopListen(final String id) throws AiravataException {
- QueueDetails details = queueDetailsMap.get(id);
- if (details != null) {
- try {
- for (String key : details.getRoutingKeys()) {
- channel.queueUnbind(details.getQueueName(), exchangeName, key);
- }
- channel.queueDelete(details.getQueueName(), true, true);
- } catch (IOException e) {
- String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
- log.debug(msg);
- }
- }
- }
-
- /**
- * Private class for holding some information about the consumers registered
- */
- private class QueueDetails {
- String queueName;
-
- List<String> routingKeys;
-
- private QueueDetails(String queueName, List<String> routingKeys) {
- this.queueName = queueName;
- this.routingKeys = routingKeys;
- }
-
- public String getQueueName() {
- return queueName;
- }
-
- public List<String> getRoutingKeys() {
- return routingKeys;
- }
- }
-
- private String getId(List<String> routingKeys, String queueName) {
- String id = "";
- for (String key : routingKeys) {
- id = id + "_" + key;
- }
- return id + "_" + queueName;
- }
-
- public void close() {
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException ignore) {
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 60cb7a0..f5c4d2a 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,8 +29,12 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
-import org.apache.airavata.messaging.core.*;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.PublisherFactory;
+import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -59,7 +63,14 @@ import org.apache.airavata.orchestrator.util.OrchestratorUtils;
import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractResource;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalog;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ComputeResource;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.ReplicaCatalog;
+import org.apache.airavata.registry.cpi.ReplicaCatalogException;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -72,7 +83,12 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -83,7 +99,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private String airavataUserName;
private String gatewayName;
private Publisher publisher;
- private RabbitMQStatusConsumer statusConsumer;
+ private Subscriber statusSubscribe;
private CuratorFramework curatorClient;
/**
@@ -110,10 +126,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
appCatalog = RegistryFactory.getAppCatalog();
orchestrator.initialize();
orchestrator.getOrchestratorContext().setPublisher(this.publisher);
- String brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
- statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
- statusConsumer.listen(new ProcessStatusHandler());
+ List<String> routingKeys = new ArrayList<>();
+// routingKeys.add("*"); // listen for gateway level messages
+// routingKeys.add("*.*"); // listen for gateway/experiment level messages
+ routingKeys.add("*.*.*"); // listen for gateway/experiment/process level messages
+ statusSubscribe = MessagingFactory.getSubscriber(new ProcessStatusHandler(),routingKeys, Subscriber.Type.STATUS);
startCurator();
} catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) {
log.error(e.getMessage(), e);
@@ -481,18 +498,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
private class ProcessStatusHandler implements MessageHandler {
-
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<>();
- List<String> routingKeys = new ArrayList<>();
-// routingKeys.add("*"); // listen for gateway level messages
-// routingKeys.add("*.*"); // listen for gateway/experiment level messages
- routingKeys.add("*.*.*"); // listern for gateway/experiment/process level messages
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
-
/**
* This method only handle MessageType.PROCESS type messages.
* @param message
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
----------------------------------------------------------------------
diff --git a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
index 49af1ce..fa4c3de 100644
--- a/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
+++ b/modules/test-suite/multi-tenanted-airavata/src/main/java/org/apache/airavata/testsuite/multitenantedairavata/ExperimentExecution.java
@@ -24,9 +24,8 @@ package org.apache.airavata.testsuite.multitenantedairavata;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
@@ -56,7 +55,11 @@ import java.io.File;
import java.io.PrintWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class ExperimentExecution {
private Airavata.Client airavata;
@@ -92,7 +95,7 @@ public class ExperimentExecution {
String resultFileName = resultFileLocation + getResultFileName();
File resultFolder = new File(resultFileLocation);
- if (!resultFolder.exists()){
+ if (!resultFolder.exists()) {
resultFolder.mkdir();
}
File resultFile = new File(resultFileName);
@@ -109,11 +112,11 @@ public class ExperimentExecution {
this.resultWriter = resultWriter;
}
- protected Map<String, Map<String, String>> getApplicationMap (Map<String, String> tokenMap) throws Exception{
+ protected Map<String, Map<String, String>> getApplicationMap(Map<String, String> tokenMap) throws Exception {
appInterfaceMap = new HashMap<String, Map<String, String>>();
try {
- if (tokenMap != null && !tokenMap.isEmpty()){
- for (String gatewayId : tokenMap.keySet()){
+ if (tokenMap != null && !tokenMap.isEmpty()) {
+ for (String gatewayId : tokenMap.keySet()) {
Map<String, String> allApplicationInterfaceNames = airavata.getAllApplicationInterfaceNames(authzToken, gatewayId);
appInterfaceMap.put(gatewayId, allApplicationInterfaceNames);
}
@@ -134,19 +137,19 @@ public class ExperimentExecution {
return appInterfaceMap;
}
- protected Map<String, List<Project>> getProjects (Map<String, String> tokenMap) throws Exception{
+ protected Map<String, List<Project>> getProjects(Map<String, String> tokenMap) throws Exception {
projectsMap = new HashMap<String, List<Project>>();
try {
- if (tokenMap != null && !tokenMap.isEmpty()){
- for (String gatewayId : tokenMap.keySet()){
+ if (tokenMap != null && !tokenMap.isEmpty()) {
+ for (String gatewayId : tokenMap.keySet()) {
boolean isgatewayValid = true;
- for (String ovoidGateway : gatewaysToAvoid){
- if (gatewayId.equals(ovoidGateway)){
+ for (String ovoidGateway : gatewaysToAvoid) {
+ if (gatewayId.equals(ovoidGateway)) {
isgatewayValid = false;
break;
}
}
- if (isgatewayValid){
+ if (isgatewayValid) {
List<Project> allUserProjects = airavata.getUserProjects(authzToken, gatewayId, testUser, 5, 0);
projectsMap.put(gatewayId, allUserProjects);
}
@@ -168,127 +171,119 @@ public class ExperimentExecution {
return projectsMap;
}
- public void launchExperiments () throws Exception {
+ public void launchExperiments() throws Exception {
try {
- for (String expId : experimentsWithTokens.keySet()){
+ for (String expId : experimentsWithTokens.keySet()) {
airavata.launchExperiment(authzToken, expId, experimentsWithTokens.get(expId));
}
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("Error while launching experiment", e);
throw new Exception("Error while launching experiment", e);
}
}
- public void monitorExperiments () throws Exception {
+ public void monitorExperiments() throws Exception {
String brokerUrl = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_BROKER_URL, PropertyFileType.AIRAVATA_CLIENT);
System.out.println("broker url " + brokerUrl);
final String exchangeName = propertyReader.readProperty(TestFrameworkConstants.AiravataClientConstants.RABBIT_EXCHANGE_NAME, PropertyFileType.AIRAVATA_CLIENT);
- RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
-
- consumer.listen(new MessageHandler() {
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- List<String> routingKeys = new ArrayList<String>();
- for (String expId : experimentsWithGateway.keySet()) {
- String gatewayId = experimentsWithGateway.get(expId);
- System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId);
-
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + expId);
- routingKeys.add(gatewayId + "." + expId + ".*");
- routingKeys.add(gatewayId + "." + expId + ".*.*");
- routingKeys.add(gatewayId + "." + expId + ".*.*.*");
- }
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
+ Subscriber statusSubscriber = MessagingFactory.getSubscriber(this::processMessage, null, Subscriber.Type.STATUS);
+ }
- @Override
- public void onMessage(MessageContext message) {
-
- if (message.getType().equals(MessageType.EXPERIMENT)) {
- try {
- ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- ExperimentState expState = event.getState();
- String expId = event.getExperimentId();
- String gatewayId = event.getGatewayId();
-
- if (expState.equals(ExperimentState.COMPLETED)) {
- resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
- resultWriter.println("=====================================================================");
- resultWriter.println("Status : " + ExperimentState.COMPLETED.toString());
- // check file transfers
- List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(authzToken, expId);
- int i = 1;
- for (OutputDataObjectType output : experimentOutputs) {
- System.out.println("################ Experiment : " + expId + " COMPLETES ###################");
- System.out.println("Output " + i + " : " + output.getValue());
- resultWriter.println("Output " + i + " : " + output.getValue());
- i++;
- }
- resultWriter.println("End of Results for Experiment : " + expId );
- resultWriter.println("=====================================================================");
- } else if (expState.equals(ExperimentState.FAILED)) {
- resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
- resultWriter.println("=====================================================================");
- int j = 1;
- resultWriter.println("Status : " + ExperimentState.FAILED.toString());
- System.out.println("################ Experiment : " + expId + " FAILED ###################");
- ExperimentModel experiment = airavata.getExperiment(authzToken, expId);
- List<ErrorModel> errors = experiment.getErrors();
- if (errors != null && !errors.isEmpty()){
- for (ErrorModel errorDetails : errors) {
- System.out.println(errorDetails.getActualErrorMessage());
- resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage());
- resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage());
- }
- }
+ private List<String> getRoutingKeys() {
+ List<String> routingKeys = new ArrayList<String>();
+ for (String expId : experimentsWithGateway.keySet()) {
+ String gatewayId = experimentsWithGateway.get(expId);
+ System.out.println("experiment Id : " + expId + " gateway Id : " + gatewayId);
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + expId);
+ routingKeys.add(gatewayId + "." + expId + ".*");
+ routingKeys.add(gatewayId + "." + expId + ".*.*");
+ routingKeys.add(gatewayId + "." + expId + ".*.*.*");
+ }
+ return routingKeys;
+ }
- resultWriter.println("End of Results for Experiment : " + expId );
- resultWriter.println("=====================================================================");
+ private void processMessage(MessageContext message) {
+ if (message.getType().equals(MessageType.EXPERIMENT)) {
+ try {
+ ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ ExperimentState expState = event.getState();
+ String expId = event.getExperimentId();
+ String gatewayId = event.getGatewayId();
+
+ if (expState.equals(ExperimentState.COMPLETED)) {
+ resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
+ resultWriter.println("=====================================================================");
+ resultWriter.println("Status : " + ExperimentState.COMPLETED.toString());
+ // check file transfers
+ List<OutputDataObjectType> experimentOutputs = airavata.getExperimentOutputs(authzToken, expId);
+ int i = 1;
+ for (OutputDataObjectType output : experimentOutputs) {
+ System.out.println("################ Experiment : " + expId + " COMPLETES ###################");
+ System.out.println("Output " + i + " : " + output.getValue());
+ resultWriter.println("Output " + i + " : " + output.getValue());
+ i++;
+ }
+ resultWriter.println("End of Results for Experiment : " + expId);
+ resultWriter.println("=====================================================================");
+ } else if (expState.equals(ExperimentState.FAILED)) {
+ resultWriter.println("Results for experiment : " + expId + " of gateway Id : " + gatewayId);
+ resultWriter.println("=====================================================================");
+ int j = 1;
+ resultWriter.println("Status : " + ExperimentState.FAILED.toString());
+ System.out.println("################ Experiment : " + expId + " FAILED ###################");
+ ExperimentModel experiment = airavata.getExperiment(authzToken, expId);
+ List<ErrorModel> errors = experiment.getErrors();
+ if (errors != null && !errors.isEmpty()) {
+ for (ErrorModel errorDetails : errors) {
+ System.out.println(errorDetails.getActualErrorMessage());
+ resultWriter.println("Actual Error : " + j + " : " + errorDetails.getActualErrorMessage());
+ resultWriter.println("User Friendly Message : " + j + " : " + errorDetails.getUserFriendlyMessage());
}
+ }
+
+ resultWriter.println("End of Results for Experiment : " + expId);
+ resultWriter.println("=====================================================================");
+ }
// System.out.println(" Experiment Id : '" + expId
// + "' with state : '" + event.getState().toString() +
// " for Gateway " + event.getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- } else if (message.getType().equals(MessageType.JOB)) {
- try {
- JobStatusChangeEvent event = new JobStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ } else if (message.getType().equals(MessageType.JOB)) {
+ try {
+ JobStatusChangeEvent event = new JobStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
// System.out.println(" Job ID : '" + event.getJobIdentity().getJobId()
// + "' with state : '" + event.getState().toString() +
// " for Gateway " + event.getJobIdentity().getGatewayId());
// resultWriter.println("Job Status : " + event.getState().toString());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }
- resultWriter.flush();
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
}
- });
+ }
+ resultWriter.flush();
}
- private String getResultFileName (){
+ private String getResultFileName() {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HHmmss");
Calendar cal = Calendar.getInstance();
return dateFormat.format(cal.getTime());
}
- public void createAmberWithErrorInputs (String gatewayId,
- String token,
- String projectId,
- String hostId,
- String appId) throws Exception {
+ public void createAmberWithErrorInputs(String gatewayId,
+ String token,
+ String projectId,
+ String hostId,
+ String appId) throws Exception {
try {
List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
@@ -352,11 +347,11 @@ public class ExperimentExecution {
}
}
- public void createAmberWithErrorUserConfig (String gatewayId,
- String token,
- String projectId,
- String hostId,
- String appId) throws Exception {
+ public void createAmberWithErrorUserConfig(String gatewayId,
+ String token,
+ String projectId,
+ String hostId,
+ String appId) throws Exception {
try {
TestFrameworkProps.Error[] errors = properties.getErrors();
@@ -422,25 +417,25 @@ public class ExperimentExecution {
}
}
- public void createAmberExperiment () throws Exception{
+ public void createAmberExperiment() throws Exception {
try {
TestFrameworkProps.Application[] applications = properties.getApplications();
Map<String, String> userGivenAmberInputs = new HashMap<>();
- for (TestFrameworkProps.Application application : applications){
- if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){
+ for (TestFrameworkProps.Application application : applications) {
+ if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) {
userGivenAmberInputs = application.getInputs();
}
}
- for (String gatewayId : csTokens.keySet()){
+ for (String gatewayId : csTokens.keySet()) {
String token = csTokens.get(gatewayId);
Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
- for (String appId : appsWithNames.keySet()){
+ for (String appId : appsWithNames.keySet()) {
String appName = appsWithNames.get(appId);
- if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)){
+ if (appName.equals(TestFrameworkConstants.AppcatalogConstants.AMBER_APP_NAME)) {
List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
- for (String inputName : userGivenAmberInputs.keySet()){
+ for (String inputName : userGivenAmberInputs.keySet()) {
for (InputDataObjectType inputDataObjectType : applicationInputs) {
if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) {
inputDataObjectType.setValue(userGivenAmberInputs.get(inputName));
@@ -449,7 +444,7 @@ public class ExperimentExecution {
}
List<Project> projectsPerGateway = projectsMap.get(gatewayId);
String projectID = null;
- if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){
+ if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
projectID = projectsPerGateway.get(0).getProjectID();
}
ExperimentModel simpleExperiment =
@@ -470,7 +465,7 @@ public class ExperimentExecution {
experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
experimentsWithTokens.put(experimentId, token);
experimentsWithGateway.put(experimentId, gatewayId);
- }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -498,33 +493,33 @@ public class ExperimentExecution {
}
}
}
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("Error while creating AMBEr experiment", e);
throw new Exception("Error while creating AMBER experiment", e);
}
}
- public void createUltrascanExperiment () throws Exception{
+ public void createUltrascanExperiment() throws Exception {
try {
TestFrameworkProps.Application[] applications = properties.getApplications();
int numberOfIterations = properties.getNumberOfIterations();
Map<String, String> userGivenAmberInputs = new HashMap<>();
- for (TestFrameworkProps.Application application : applications){
- if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){
+ for (TestFrameworkProps.Application application : applications) {
+ if (application.getName().equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) {
userGivenAmberInputs = application.getInputs();
}
}
- for (int i=0; i < numberOfIterations; i++){
- for (String gatewayId : csTokens.keySet()){
+ for (int i = 0; i < numberOfIterations; i++) {
+ for (String gatewayId : csTokens.keySet()) {
String token = csTokens.get(gatewayId);
Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
- for (String appId : appsWithNames.keySet()){
+ for (String appId : appsWithNames.keySet()) {
String appName = appsWithNames.get(appId);
- if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)){
+ if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ULTRASCAN)) {
List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
- for (String inputName : userGivenAmberInputs.keySet()){
+ for (String inputName : userGivenAmberInputs.keySet()) {
for (InputDataObjectType inputDataObjectType : applicationInputs) {
if (inputDataObjectType.getName().equalsIgnoreCase(inputName)) {
inputDataObjectType.setValue(userGivenAmberInputs.get(inputName));
@@ -533,7 +528,7 @@ public class ExperimentExecution {
}
List<Project> projectsPerGateway = projectsMap.get(gatewayId);
String projectID = null;
- if (projectsPerGateway != null && !projectsPerGateway.isEmpty()){
+ if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
projectID = projectsPerGateway.get(0).getProjectID();
}
ExperimentModel simpleExperiment =
@@ -554,7 +549,7 @@ public class ExperimentExecution {
experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
experimentsWithTokens.put(experimentId, token);
experimentsWithGateway.put(experimentId, gatewayId);
- }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) {
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.ALAMO_RESOURCE_NAME)) {
ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "batch", 30, 0);
UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -564,7 +559,7 @@ public class ExperimentExecution {
experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
experimentsWithTokens.put(experimentId, token);
experimentsWithGateway.put(experimentId, gatewayId);
- }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) {
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.GORDEN_RESOURCE_NAME)) {
ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0);
UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -574,17 +569,17 @@ public class ExperimentExecution {
experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
experimentsWithTokens.put(experimentId, token);
experimentsWithGateway.put(experimentId, gatewayId);
- }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) {
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.COMET_RESOURCE_NAME)) {
ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "compute", 30, 0);
UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
userConfigurationData.setAiravataAutoSchedule(false);
userConfigurationData.setOverrideManualScheduledParams(false);
userConfigurationData.setComputationalResourceScheduling(scheduling);
simpleExperiment.setUserConfigurationData(userConfigurationData);
- experimentId = airavata.createExperiment(authzToken,gatewayId, simpleExperiment);
+ experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
experimentsWithTokens.put(experimentId, token);
experimentsWithGateway.put(experimentId, gatewayId);
- }else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) {
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.LONESTAR_RESOURCE_NAME)) {
ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 30, 0);
UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
userConfigurationData.setAiravataAutoSchedule(false);
@@ -602,89 +597,89 @@ public class ExperimentExecution {
}
}
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("Error while creating Ultrascan experiment", e);
throw new Exception("Error while creating Ultrascan experiment", e);
}
}
- public void createEchoExperiment () throws Exception{
+ public void createEchoExperiment() throws Exception {
try {
for (String gatewayId : csTokens.keySet()) {
- boolean isgatewayValid = true;
- for (String ovoidGateway : gatewaysToAvoid){
- if (gatewayId.equals(ovoidGateway)){
- isgatewayValid = false;
- break;
- }
+ boolean isgatewayValid = true;
+ for (String ovoidGateway : gatewaysToAvoid) {
+ if (gatewayId.equals(ovoidGateway)) {
+ isgatewayValid = false;
+ break;
}
- if (isgatewayValid) {
- String token = csTokens.get(gatewayId);
- Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
- for (String appId : appsWithNames.keySet()) {
- String appName = appsWithNames.get(appId);
- if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) {
- List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
- List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
- for (InputDataObjectType inputDataObjectType : applicationInputs) {
- if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) {
- inputDataObjectType.setValue("Hello World !!!");
- }
+ }
+ if (isgatewayValid) {
+ String token = csTokens.get(gatewayId);
+ Map<String, String> appsWithNames = appInterfaceMap.get(gatewayId);
+ for (String appId : appsWithNames.keySet()) {
+ String appName = appsWithNames.get(appId);
+ if (appName.equals(TestFrameworkConstants.AppcatalogConstants.ECHO_NAME)) {
+ List<InputDataObjectType> applicationInputs = airavata.getApplicationInputs(authzToken, appId);
+ List<OutputDataObjectType> appOutputs = airavata.getApplicationOutputs(authzToken, appId);
+ for (InputDataObjectType inputDataObjectType : applicationInputs) {
+ if (inputDataObjectType.getName().equalsIgnoreCase("input_to_Echo")) {
+ inputDataObjectType.setValue("Hello World !!!");
}
+ }
- List<Project> projectsPerGateway = projectsMap.get(gatewayId);
- String projectID = null;
- if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
- projectID = projectsPerGateway.get(0).getProjectID();
- }
- ExperimentModel simpleExperiment =
- ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs);
- simpleExperiment.setExperimentOutputs(appOutputs);
- String experimentId;
- Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId);
- if (computeResources != null && computeResources.size() != 0) {
- for (String id : computeResources.keySet()) {
- String resourceName = computeResources.get(id);
- if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) {
- ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
- UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
- userConfigurationData.setAiravataAutoSchedule(false);
- userConfigurationData.setOverrideManualScheduledParams(false);
- userConfigurationData.setComputationalResourceScheduling(scheduling);
- simpleExperiment.setUserConfigurationData(userConfigurationData);
- experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
- experimentsWithTokens.put(experimentId, token);
- experimentsWithGateway.put(experimentId, gatewayId);
- } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
- ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
- UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
- userConfigurationData.setAiravataAutoSchedule(false);
- userConfigurationData.setOverrideManualScheduledParams(false);
- userConfigurationData.setComputationalResourceScheduling(scheduling);
- simpleExperiment.setUserConfigurationData(userConfigurationData);
- experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
- experimentsWithTokens.put(experimentId, token);
- experimentsWithGateway.put(experimentId, gatewayId);
- } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) {
- ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0);
- UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
- userConfigurationData.setAiravataAutoSchedule(false);
- userConfigurationData.setOverrideManualScheduledParams(false);
- userConfigurationData.setComputationalResourceScheduling(scheduling);
- simpleExperiment.setUserConfigurationData(userConfigurationData);
- experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
- experimentsWithTokens.put(experimentId, token);
- experimentsWithGateway.put(experimentId, gatewayId);
- }
+ List<Project> projectsPerGateway = projectsMap.get(gatewayId);
+ String projectID = null;
+ if (projectsPerGateway != null && !projectsPerGateway.isEmpty()) {
+ projectID = projectsPerGateway.get(0).getProjectID();
+ }
+ ExperimentModel simpleExperiment =
+ ExperimentModelUtil.createSimpleExperiment(gatewayId, projectID, "admin", "Echo Experiment", "Echo Experiment run", appId, applicationInputs);
+ simpleExperiment.setExperimentOutputs(appOutputs);
+ String experimentId;
+ Map<String, String> computeResources = airavata.getAvailableAppInterfaceComputeResources(authzToken, appId);
+ if (computeResources != null && computeResources.size() != 0) {
+ for (String id : computeResources.keySet()) {
+ String resourceName = computeResources.get(id);
+ if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.TRESTLES_RESOURCE_NAME)) {
+ ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
+ UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+ userConfigurationData.setAiravataAutoSchedule(false);
+ userConfigurationData.setOverrideManualScheduledParams(false);
+ userConfigurationData.setComputationalResourceScheduling(scheduling);
+ simpleExperiment.setUserConfigurationData(userConfigurationData);
+ experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+ experimentsWithTokens.put(experimentId, token);
+ experimentsWithGateway.put(experimentId, gatewayId);
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.STAMPEDE_RESOURCE_NAME)) {
+ ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "normal", 20, 0);
+ UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+ userConfigurationData.setAiravataAutoSchedule(false);
+ userConfigurationData.setOverrideManualScheduledParams(false);
+ userConfigurationData.setComputationalResourceScheduling(scheduling);
+ simpleExperiment.setUserConfigurationData(userConfigurationData);
+ experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+ experimentsWithTokens.put(experimentId, token);
+ experimentsWithGateway.put(experimentId, gatewayId);
+ } else if (resourceName.equals(TestFrameworkConstants.AppcatalogConstants.BR2_RESOURCE_NAME)) {
+ ComputationalResourceSchedulingModel scheduling = ExperimentModelUtil.createComputationResourceScheduling(id, 4, 1, 1, "cpu", 20, 0);
+ UserConfigurationDataModel userConfigurationData = new UserConfigurationDataModel();
+ userConfigurationData.setAiravataAutoSchedule(false);
+ userConfigurationData.setOverrideManualScheduledParams(false);
+ userConfigurationData.setComputationalResourceScheduling(scheduling);
+ simpleExperiment.setUserConfigurationData(userConfigurationData);
+ experimentId = airavata.createExperiment(authzToken, gatewayId, simpleExperiment);
+ experimentsWithTokens.put(experimentId, token);
+ experimentsWithGateway.put(experimentId, gatewayId);
}
}
}
}
}
+ }
}
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("Error while creating Echo experiment", e);
throw new Exception("Error while creating Echo experiment", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
index 8339aea..a492ef2 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowEnactmentService.java
@@ -24,16 +24,18 @@ package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,16 +45,17 @@ import java.util.concurrent.Executors;
public class WorkflowEnactmentService {
private static WorkflowEnactmentService workflowEnactmentService;
- private final RabbitMQStatusConsumer statusConsumer;
+ private final Subscriber statusSubscriber;
private String consumerId;
private ExecutorService executor;
private Map<String,WorkflowInterpreter> workflowMap;
private WorkflowEnactmentService () throws AiravataException {
executor = Executors.newFixedThreadPool(getThreadPoolSize());
- workflowMap = new ConcurrentHashMap<String, WorkflowInterpreter>();
- statusConsumer = new RabbitMQStatusConsumer();
- consumerId = statusConsumer.listen(new TaskMessageHandler());
+ workflowMap = new ConcurrentHashMap<>();
+ statusSubscriber = MessagingFactory.getSubscriber((message -> executor.execute(new StatusHandler(message))),
+ getRoutingKeys(),
+ Subscriber.Type.STATUS);
// register the shutdown hook to un-bind status consumer.
Runtime.getRuntime().addShutdownHook(new EnactmentShutDownHook());
}
@@ -80,33 +83,20 @@ public class WorkflowEnactmentService {
}
- private int getThreadPoolSize() {
- return ServerSettings.getEnactmentThreadPoolSize();
- }
-
- private class TaskMessageHandler implements MessageHandler {
-
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- String gatewayId = "*";
- String experimentId = "*";
- List<String> routingKeys = new ArrayList<String>();
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + experimentId);
- routingKeys.add(gatewayId + "." + experimentId+ ".*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
-
- @Override
- public void onMessage(MessageContext msgCtx) {
- StatusHandler statusHandler = new StatusHandler(msgCtx);
- executor.execute(statusHandler);
- }
+ public List<String> getRoutingKeys() {
+ String gatewayId = "*";
+ String experimentId = "*";
+ List<String> routingKeys = new ArrayList<String>();
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId+ ".*");
+ routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+ return routingKeys;
+ }
+ private int getThreadPoolSize() {
+ return ServerSettings.getEnactmentThreadPoolSize();
}
private class StatusHandler implements Runnable{
@@ -169,7 +159,7 @@ public class WorkflowEnactmentService {
public void run() {
super.run();
try {
- statusConsumer.stopListen(consumerId);
+ statusSubscriber.stopListen(consumerId);
log.info("Successfully un-binded task status consumer");
} catch (AiravataException e) {
log.error("Error while un-bind enactment status consumer", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
index b42e7ac..ecfdeea 100644
--- a/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
+++ b/modules/workflow/workflow-core/src/main/java/org/apache/airavata/workflow/core/WorkflowInterpreter.java
@@ -23,7 +23,6 @@ package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.ComponentState;
import org.apache.airavata.model.ComponentStatus;
import org.apache.airavata.model.application.io.OutputDataObjectType;
@@ -32,14 +31,27 @@ import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.*;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.WorkflowCatalog;
+import org.apache.airavata.registry.cpi.WorkflowCatalogException;
import org.apache.airavata.workflow.core.dag.edge.Edge;
-import org.apache.airavata.workflow.core.dag.nodes.*;
+import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
+import org.apache.airavata.workflow.core.dag.nodes.InputNode;
+import org.apache.airavata.workflow.core.dag.nodes.OutputNode;
+import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
import org.apache.airavata.workflow.core.parser.WorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -64,7 +76,6 @@ class WorkflowInterpreter {
private Registry registry;
private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
private RabbitMQProcessLaunchPublisher publisher;
- private RabbitMQStatusConsumer statusConsumer;
private String consumerId;
private boolean continueWorkflow = true;