You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/06/25 19:32:04 UTC
[2/5] airavata git commit: orchestrator cpi and gfac cpi changes to
adapt with process
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 8583a67..a00734a 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -23,7 +23,6 @@ package org.apache.airavata.gfac.server;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.LocalEventPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
@@ -38,14 +37,10 @@ import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
-import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
-import org.apache.airavata.model.status.ExperimentState;
-import org.apache.airavata.model.status.ExperimentStatus;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.cpi.AppCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalog;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
@@ -70,7 +65,7 @@ import java.util.concurrent.Executors;
public class GfacServerHandler implements GfacService.Iface {
private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
- private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+ private RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer;
private static int requestCount=0;
private ExperimentCatalog experimentCatalog;
private AppCatalog appCatalog;
@@ -88,15 +83,15 @@ public class GfacServerHandler implements GfacService.Iface {
initZkDataStructure();
initAMQPClient();
executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
- startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
+ startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQProcessLaunchConsumer);
} catch (Exception e) {
throw new AiravataStartupException("Gfac Server Initialization error ", e);
}
}
private void initAMQPClient() throws AiravataException {
- rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
- rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+ rabbitMQProcessLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+ rabbitMQProcessLaunchConsumer.listen(new TaskLaunchMessageHandler());
}
private void startCuratorClient() throws ApplicationSettingsException {
@@ -144,19 +139,18 @@ public class GfacServerHandler implements GfacService.Iface {
* *
* *
*
- * @param experimentId - ExperimentModel id in registry
* @param processId - processModel id in registry
* @param gatewayId - gateway Identification
*/
- public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws
+ public boolean submitProcess(String processId, String gatewayId, String tokenId) throws
TException {
requestCount++;
log.info("-----------------------------------" + requestCount + "-----------------------------------------");
- log.info(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId,
+ log.info(processId, "GFac Received submit job request for the Process: {} process: {}", processId,
processId);
try {
- executorService.execute(new GFacWorker(experimentId, processId, gatewayId, tokenId));
+ executorService.execute(new GFacWorker(processId, gatewayId, tokenId));
} catch (GFacException e) {
log.error("Failed to submit process", e);
return false;
@@ -166,29 +160,14 @@ public class GfacServerHandler implements GfacService.Iface {
return true;
}
- public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
- /* log.info(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
- try {
- if (BetterGfacImpl.getInstance().cancel(experimentId, taskId, gatewayId, tokenId)) {
- log.debug(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
- return true;
- } else {
- log.error(experimentId, "Job cancellation failed, experiment {} , task {}", experimentId, taskId);
- return false;
- }
- } catch (Exception e) {
- log.error(experimentId, "Error cancelling the experiment {}.", experimentId);
- throw new TException("Error cancelling the experiment : " + e.getMessage(), e);
- }*/
- return false;
+ @Override
+ public boolean cancelProcess(String processId, String gatewayId, String tokenId) throws TException {
+ return false;
}
-
-
-
public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher,
- RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+ RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer) {
/* try {
String[] listenerClassList = ServerSettings.getActivityListeners();
Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
@@ -227,51 +206,51 @@ public class GfacServerHandler implements GfacService.Iface {
public void onMessage(MessageContext message) {
System.out.println(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getType());
- if (message.getType().equals(MessageType.LAUNCHTASK)) {
+ if (message.getType().equals(MessageType.LAUNCHPROCESS)) {
try {
- TaskSubmitEvent event = new TaskSubmitEvent();
+ ProcessSubmitEvent event = new ProcessSubmitEvent();
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- // update experiment status to executing
- ExperimentStatus status = new ExperimentStatus();
- status.setState(ExperimentState.EXECUTING);
+ // update process status to executing
+ ProcessStatus status = new ProcessStatus();
+ status.setState(ProcessState.EXECUTING);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
+ Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
try {
- GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getExperimentId(), message.getDeliveryTag(),
+ GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getProcessId(), message.getDeliveryTag(),
event.getTokenId());
- submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+ submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
} catch (Exception e) {
log.error(e.getMessage(), e);
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
}
} catch (TException e) {
log.error(e.getMessage(), e); //nobody is listening so nothing to throw
} catch (RegistryException e) {
log.error("Error while updating experiment status", e);
}
- } else if (message.getType().equals(MessageType.TERMINATETASK)) {
- TaskTerminateEvent event = new TaskTerminateEvent();
+ } else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
+ ProcessTerminateEvent event = new ProcessTerminateEvent();
TBase messageEvent = message.getEvent();
try {
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- boolean success = GFacUtils.setExperimentCancelRequest(event.getExperimentId(), curatorClient,
+ boolean success = GFacUtils.setExperimentCancelRequest(event.getProcessId(), curatorClient,
message.getDeliveryTag());
if (success) {
- log.info("expId:{} - Experiment cancel request save successfully", event.getExperimentId());
+ log.info("processId:{} - Process cancel request save successfully", event.getProcessId());
}
} catch (Exception e) {
- log.error("expId:" + event.getExperimentId() + " - Experiment cancel reqeust failed", e);
+ log.error("processId:" + event.getProcessId() + " - Process cancel reqeust failed", e);
}finally {
try {
- if (!rabbitMQTaskLaunchConsumer.isOpen()) {
- rabbitMQTaskLaunchConsumer.reconnect();
+ if (!rabbitMQProcessLaunchConsumer.isOpen()) {
+ rabbitMQProcessLaunchConsumer.reconnect();
}
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
} catch (AiravataException e) {
- log.error("expId: " + event.getExperimentId() + " - Failed to send acknowledgement back to cancel request.", e);
+ log.error("processId: " + event.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
deleted file mode 100644
index 39b3df9..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessConsumer.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownListener;
-import com.rabbitmq.client.ShutdownSignalException;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class RabbitMQProcessConsumer {
-
- private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessConsumer.class);
-
- private String url;
- private Connection connection;
- private Channel channel;
- private int prefetchCount;
-
- public RabbitMQProcessConsumer() throws AiravataException {
- try {
- url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
- createConnection();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- }
-
- private void createConnection() throws AiravataException {
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(url);
- connectionFactory.setAutomaticRecoveryEnabled(true);
- connection = connectionFactory.newConnection();
- connection.addShutdownListener(new ShutdownListener() {
- public void shutdownCompleted(ShutdownSignalException cause) {
- }
- });
- log.info("connected to rabbitmq: " + connection + " for default");
-
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
-// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
- } catch (Exception e) {
- String msg = "could not open channel for exchange default";
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
-
- public String listen(final MessageHandler handler) throws AiravataException {
- try {
- Map<String, Object> props = handler.getProperties();
- final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
- if (routing == null) {
- throw new IllegalArgumentException("The routing key must be present");
- }
- String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
- String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
- if (queueName == null) {
- if (!channel.isOpen()) {
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
-// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
- }
- queueName = channel.queueDeclare().getQueue();
- } else {
- channel.queueDeclare(queueName, true, false, false, null);
- }
-
- if (consumerTag == null) {
- consumerTag = "default";
- }
- // autoAck=false, we will ack after task is done
- final String finalQueueName = queueName;
- channel.basicConsume(queueName, true, new QueueingConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
- Message message = new Message();
-
- try {
- ThriftUtils.createThriftFromBytes(body, message);
- TBase event = null;
- String gatewayId = null;
- ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
- log.debug("Message received with message id : " + message.getMessageId()
- + " with task id : " + processSubmitEvent.getTaskId());
- event = processSubmitEvent;
- MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), null);
- messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
- handler.onMessage(messageContext);
- } catch (TException e) {
- String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + finalQueueName;
- log.warn(msg, e);
- }
- }
- });
- return "";
- } catch (Exception e) {
- String msg = "could not open channel for exchange default";
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void stopListen(final String queueName , final String exchangeName) throws AiravataException {
- try {
- channel.queueUnbind(queueName, exchangeName, null);
- } catch (IOException e) {
- String msg = "could not un-bind queue: " + queueName + " for exchange " + exchangeName;
- log.debug(msg);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
new file mode 100644
index 0000000..850128e
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQProcessLaunchConsumer {
+ private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
+ private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
+
+ private String taskLaunchExchangeName;
+ private String url;
+ private Connection connection;
+ private Channel channel;
+ private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+ private boolean durableQueue;
+ private MessageHandler messageHandler;
+ private int prefetchCount;
+
+
+ public RabbitMQProcessLaunchConsumer() throws AiravataException {
+ try {
+ url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
+ taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+ prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+ createConnection();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ }
+
+ public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+ this.taskLaunchExchangeName = exchangeName;
+ this.url = brokerUrl;
+
+ createConnection();
+ }
+
+ private void createConnection() throws AiravataException {
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setUri(url);
+ connectionFactory.setAutomaticRecoveryEnabled(true);
+ connection = connectionFactory.newConnection();
+ connection.addShutdownListener(new ShutdownListener() {
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ }
+ });
+ log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+
+ channel = connection.createChannel();
+ channel.basicQos(prefetchCount);
+
+// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void reconnect() throws AiravataException{
+ if(messageHandler!=null) {
+ try {
+ listen(messageHandler);
+ } catch (AiravataException e) {
+ String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+
+ }
+ }
+ }
+ public String listen(final MessageHandler handler) throws AiravataException {
+ try {
+ messageHandler = handler;
+ Map<String, Object> props = handler.getProperties();
+ final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+ if (routing == null) {
+ throw new IllegalArgumentException("The routing key must be present");
+ }
+ List<String> keys = new ArrayList<String>();
+ if (routing instanceof List) {
+ for (Object o : (List)routing) {
+ keys.add(o.toString());
+ }
+ } else if (routing instanceof String) {
+ keys.add((String) routing);
+ }
+
+ String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+ String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+ if (queueName == null) {
+ if (!channel.isOpen()) {
+ channel = connection.createChannel();
+ channel.basicQos(prefetchCount);
+// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+ }
+ queueName = channel.queueDeclare().getQueue();
+ } else {
+
+ channel.queueDeclare(queueName, durableQueue, false, false, null);
+ }
+
+ final String id = getId(keys, queueName);
+ if (queueDetailsMap.containsKey(id)) {
+ throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+ "cannot define the same subscriber twice");
+ }
+
+ if (consumerTag == null) {
+ consumerTag = "default";
+ }
+
+ // bind all the routing keys
+// for (String routingKey : keys) {
+// channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+// }
+ // autoAck=false, we will ack after task is done
+ channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+ Message message = new Message();
+
+ try {
+ ThriftUtils.createThriftFromBytes(body, message);
+ TBase event = null;
+ String gatewayId = null;
+ long deliveryTag = envelope.getDeliveryTag();
+ if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+ ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
+ processSubmitEvent.getProcessId());
+ event = processSubmitEvent;
+ gatewayId = processSubmitEvent.getGatewayId();
+ }else if(message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+ ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), processTerminateEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for processId: " +
+ processTerminateEvent.getProcessId());
+ event = processTerminateEvent;
+ gatewayId = processTerminateEvent.getGatewayId();
+ }
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ handler.onMessage(messageContext);
+ } catch (TException e) {
+ String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+ log.warn(msg, e);
+ }
+ }
+
+ @Override
+ public void handleCancel(String consumerTag) throws IOException {
+ super.handleCancel(consumerTag);
+ log.info("Consumer cancelled : " + consumerTag);
+ }
+ });
+
+ // save the name for deleting the queue
+ queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+ return id;
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void stopListen(final String id) throws AiravataException {
+ QueueDetails details = queueDetailsMap.get(id);
+ if (details != null) {
+ try {
+ for (String key : details.getRoutingKeys()) {
+ channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+ }
+ } catch (IOException e) {
+ String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+ log.debug(msg);
+ }
+ }
+ }
+
+ /**
+ * Private class for holding some information about the consumers registered
+ */
+ private class QueueDetails {
+ String queueName;
+
+ List<String> routingKeys;
+
+ private QueueDetails(String queueName, List<String> routingKeys) {
+ this.queueName = queueName;
+ this.routingKeys = routingKeys;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public List<String> getRoutingKeys() {
+ return routingKeys;
+ }
+ }
+
+ private String getId(List<String> routingKeys, String queueName) {
+ String id = "";
+ for (String key : routingKeys) {
+ id = id + "_" + key;
+ }
+ return id + "_" + queueName;
+ }
+
+ public void close() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+ public boolean isOpen(){
+ if(connection!=null){
+ return connection.isOpen();
+ }
+ return false;
+ }
+
+ public void sendAck(long deliveryTag){
+ try {
+ channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
new file mode 100644
index 0000000..e488f26
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQProcessLaunchPublisher implements Publisher{
+ private final static Logger log = LoggerFactory.getLogger(RabbitMQProcessLaunchPublisher.class);
+ private String launchTask;
+
+ private RabbitMQProducer rabbitMQProducer;
+
+ public RabbitMQProcessLaunchPublisher() throws Exception {
+ String brokerUrl;
+ try {
+ brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ launchTask = ServerSettings.getLaunchQueueName();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
+ rabbitMQProducer.open();
+ }
+
+ public void publish(MessageContext msgCtx) throws AiravataException {
+ try {
+ log.info("Publishing to launch queue ...");
+ byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ message.setMessageId(msgCtx.getMessageId());
+ message.setMessageType(msgCtx.getType());
+ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
+ String routingKey = launchTask;
+ byte[] messageBody = ThriftUtils.serializeThriftObject(message);
+ rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
+ log.info("Successfully published to launch queue ...");
+ } catch (TException e) {
+ String msg = "Error while deserializing the object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
deleted file mode 100644
index 3684198..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessPublisher.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.Message;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class RabbitMQProcessPublisher implements Publisher {
-
- private static final Logger log = LoggerFactory.getLogger(RabbitMQProcessPublisher.class);
- public static final String PROCESS = "process.queue" ;
-
- private RabbitMQProducer rabbitMQProducer;
-
- public RabbitMQProcessPublisher() throws Exception {
- String brokerUrl;
- String exchangeName;
- try {
- brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
-// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, null, null);
- rabbitMQProducer.open();
- }
-
- @Override
- public void publish(MessageContext msgCtx) throws AiravataException {
- try {
- log.info("Publishing to process queue ...");
- byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
- Message message = new Message();
- message.setEvent(body);
- message.setMessageId(msgCtx.getMessageId());
- message.setMessageType(msgCtx.getType());
- message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
- String queueName = PROCESS;
- message.setMessageType(MessageType.TASK);
- byte[] messageBody = ThriftUtils.serializeThriftObject(message);
- rabbitMQProducer.sendToWorkerQueue(messageBody, queueName);
- } catch (TException e) {
- String msg = "Error while serializing the thrift object";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- } catch (Exception e) {
- String msg = "Error while sending to rabbitmq";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 4510a07..83725e2 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -177,7 +177,7 @@ public class RabbitMQStatusConsumer implements Consumer {
taskStatusChangeEvent.getState());
event = taskStatusChangeEvent;
gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
- }else if (message.getMessageType() == MessageType.TASKOUTPUT) {
+ }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
@@ -191,7 +191,7 @@ public class RabbitMQStatusConsumer implements Consumer {
jobStatusChangeEvent.getState());
event = jobStatusChangeEvent;
gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
- } else if (message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+ } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
log.debug(" Message Received with message id '" + message.getMessageId()
@@ -199,7 +199,7 @@ public class RabbitMQStatusConsumer implements Consumer {
taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
event = taskSubmitEvent;
gatewayId = taskSubmitEvent.getGatewayId();
- } else if (message.getMessageType().equals(MessageType.TERMINATETASK)) {
+ } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
log.debug(" Message Received with message id '" + message.getMessageId()
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index cebbed4..fc83199 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -74,7 +74,7 @@ public class RabbitMQStatusPublisher implements Publisher {
TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
- } else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+ } else if (msgCtx.getType() == MessageType.PROCESSOUTPUT) {
TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." +
event.getTaskIdentity().getProcessId() + "." + event.getTaskIdentity().getTaskId();
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
deleted file mode 100644
index 89a4a5d..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQTaskLaunchConsumer {
- private final static Logger logger = LoggerFactory.getLogger(RabbitMQTaskLaunchConsumer.class);
- private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
- private String taskLaunchExchangeName;
- private String url;
- private Connection connection;
- private Channel channel;
- private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
- private boolean durableQueue;
- private MessageHandler messageHandler;
- private int prefetchCount;
-
-
- public RabbitMQTaskLaunchConsumer() throws AiravataException {
- try {
- url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
- taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
- prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
- createConnection();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- }
-
- public RabbitMQTaskLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
- this.taskLaunchExchangeName = exchangeName;
- this.url = brokerUrl;
-
- createConnection();
- }
-
- private void createConnection() throws AiravataException {
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(url);
- connectionFactory.setAutomaticRecoveryEnabled(true);
- connection = connectionFactory.newConnection();
- connection.addShutdownListener(new ShutdownListener() {
- public void shutdownCompleted(ShutdownSignalException cause) {
- }
- });
- log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
-
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
-
-// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + taskLaunchExchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void reconnect() throws AiravataException{
- if(messageHandler!=null) {
- try {
- listen(messageHandler);
- } catch (AiravataException e) {
- String msg = "could not open channel for exchange " + taskLaunchExchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
-
- }
- }
- }
- public String listen(final MessageHandler handler) throws AiravataException {
- try {
- messageHandler = handler;
- Map<String, Object> props = handler.getProperties();
- final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
- if (routing == null) {
- throw new IllegalArgumentException("The routing key must be present");
- }
- List<String> keys = new ArrayList<String>();
- if (routing instanceof List) {
- for (Object o : (List)routing) {
- keys.add(o.toString());
- }
- } else if (routing instanceof String) {
- keys.add((String) routing);
- }
-
- String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
- String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
- if (queueName == null) {
- if (!channel.isOpen()) {
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
-// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
- }
- queueName = channel.queueDeclare().getQueue();
- } else {
-
- channel.queueDeclare(queueName, durableQueue, false, false, null);
- }
-
- final String id = getId(keys, queueName);
- if (queueDetailsMap.containsKey(id)) {
- throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
- "cannot define the same subscriber twice");
- }
-
- if (consumerTag == null) {
- consumerTag = "default";
- }
-
- // bind all the routing keys
-// for (String routingKey : keys) {
-// channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
-// }
- // autoAck=false, we will ack after task is done
- channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
- Message message = new Message();
-
- try {
- ThriftUtils.createThriftFromBytes(body, message);
- TBase event = null;
- String gatewayId = null;
- long deliveryTag = envelope.getDeliveryTag(); //todo store this in zookeeper, once job is done we can ack
- if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
- taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
- event = taskSubmitEvent;
- gatewayId = taskSubmitEvent.getGatewayId();
- }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
- TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
- taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
- event = taskTerminateEvent;
- gatewayId = taskTerminateEvent.getGatewayId();
- }
- System.out.println("*deliveryTag:"+deliveryTag);
- MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
- messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
- handler.onMessage(messageContext);
- /*try {
- channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }*/
- } catch (TException e) {
- String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
- log.warn(msg, e);
- }
- }
-
- @Override
- public void handleCancel(String consumerTag) throws IOException {
- super.handleCancel(consumerTag);
- log.info("Consumer cancelled : " + consumerTag);
- }
- });
-
- // save the name for deleting the queue
- queueDetailsMap.put(id, new QueueDetails(queueName, keys));
- return id;
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + taskLaunchExchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void stopListen(final String id) throws AiravataException {
- QueueDetails details = queueDetailsMap.get(id);
- if (details != null) {
- try {
- for (String key : details.getRoutingKeys()) {
- channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
- }
- } catch (IOException e) {
- String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
- log.debug(msg);
- }
- }
- }
-
- /**
- * Private class for holding some information about the consumers registered
- */
- private class QueueDetails {
- String queueName;
-
- List<String> routingKeys;
-
- private QueueDetails(String queueName, List<String> routingKeys) {
- this.queueName = queueName;
- this.routingKeys = routingKeys;
- }
-
- public String getQueueName() {
- return queueName;
- }
-
- public List<String> getRoutingKeys() {
- return routingKeys;
- }
- }
-
- private String getId(List<String> routingKeys, String queueName) {
- String id = "";
- for (String key : routingKeys) {
- id = id + "_" + key;
- }
- return id + "_" + queueName;
- }
-
- public void close() {
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException ignore) {
- }
- }
- }
- public boolean isOpen(){
- if(connection!=null){
- return connection.isOpen();
- }
- return false;
- }
-
- public void sendAck(long deliveryTag){
- try {
- channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
deleted file mode 100644
index 53bf373..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RabbitMQTaskLaunchPublisher implements Publisher{
- private final static Logger log = LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class);
- private String launchTask;
-
- private RabbitMQProducer rabbitMQProducer;
-
- public RabbitMQTaskLaunchPublisher() throws Exception {
- String brokerUrl;
- try {
- brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- launchTask = ServerSettings.getLaunchQueueName();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
- rabbitMQProducer.open();
- }
-
- public void publish(MessageContext msgCtx) throws AiravataException {
- try {
- log.info("Publishing to launch queue ...");
- byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
- Message message = new Message();
- message.setEvent(body);
- message.setMessageId(msgCtx.getMessageId());
- message.setMessageType(msgCtx.getType());
- message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
- String routingKey = launchTask;
- byte[] messageBody = ThriftUtils.serializeThriftObject(message);
- rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
- log.info("Successfully published to launch queue ...");
- } catch (TException e) {
- String msg = "Error while deserializing the object";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- } catch (Exception e) {
- String msg = "Error while sending to rabbitmq";
- log.error(msg, e);
- throw new AiravataException(msg, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
deleted file mode 100644
index e506556..0000000
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.core.impl;
-
-
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is the simplest implementation for JobSubmitter,
- * This is calling gfac invocation methods to invoke the gfac embedded mode,so this does not really implement
- * the selectGFACInstance method
- */
-public class GFACEmbeddedJobSubmitter implements JobSubmitter {
- private final static Logger logger = LoggerFactory.getLogger(GFACEmbeddedJobSubmitter.class);
-
- private OrchestratorContext orchestratorContext;
-
- private GFac gfac;
-
-
- public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
- this.orchestratorContext = orchestratorContext;
- }
-
- public GFACInstance selectGFACInstance() throws OrchestratorException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public boolean submit(String experimentID, String taskID,String tokenId) throws OrchestratorException {
- try {
- String gatewayId = null;
- CredentialReader credentialReader = GFacUtils.getCredentialReader();
- if (credentialReader != null) {
- try {
- gatewayId = credentialReader.getGatewayID(tokenId);
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- if(gatewayId == null || gatewayId.isEmpty()){
- gatewayId = ServerSettings.getDefaultUserGateway();
- }
- return gfac.submitJob(experimentID, taskID, gatewayId, tokenId);
- } catch (Exception e) {
- String error = "Error launching the job : " + experimentID;
- logger.error(error);
- throw new OrchestratorException(error);
- }
- }
-
- public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException {
- return false;
- }
-
- public GFac getGfac() {
- return gfac;
- }
-
- public void setGfac(GFac gfac) {
- this.gfac = gfac;
- }
-
- public OrchestratorContext getOrchestratorContext() {
- return orchestratorContext;
- }
-
- public void setOrchestratorContext(OrchestratorContext orchestratorContext) {
- this.orchestratorContext = orchestratorContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 21df198..f600b72 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -29,9 +29,7 @@ import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
-import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
@@ -47,14 +45,11 @@ import java.util.UUID;
*/
public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
private final static Logger logger = LoggerFactory.getLogger(GFACPassiveJobSubmitter.class);
- public static final String IP = "ip";
- private OrchestratorContext orchestratorContext;
private static Integer mutex = -1;
private Publisher publisher;
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
- this.orchestratorContext = orchestratorContext;
- if(orchestratorContext.getPublisher()!=null){ // use the same publisher this will be empty if rabbitmq.publish is not enabled in the configuraiton
+ if(orchestratorContext.getPublisher()!=null){
this.publisher = orchestratorContext.getPublisher();
}else {
try {
@@ -72,20 +67,16 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
return null;
}
- public boolean submit(String experimentID, String taskID) throws OrchestratorException {
- return submit(experimentID, taskID, null);
- }
-
/**
* Submit the job to a shared launch.queue accross multiple gfac instances
*
- * @param experimentID
- * @param taskID
+ * @param experimentId
+ * @param processId
* @param tokenId
* @return
* @throws OrchestratorException
*/
- public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException {
+ public boolean submit(String experimentId, String processId, String tokenId) throws OrchestratorException {
try {
String gatewayId = null;
CredentialReader credentialReader = GFacUtils.getCredentialReader();
@@ -99,8 +90,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
if (gatewayId == null || gatewayId.isEmpty()) {
gatewayId = ServerSettings.getDefaultUserGateway();
}
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId, tokenId);
- MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+ ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(processId, gatewayId, tokenId);
+ MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.LAUNCHPROCESS, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
} catch (Exception e) {
@@ -113,12 +104,12 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
/**
* Submit the experiment the terminate.queue job queue and remove the experiment from shared launch.queue
- * @param experimentID
- * @param taskID
+ * @param experimentId
+ * @param processId
* @return
* @throws OrchestratorException
*/
- public boolean terminate(String experimentID, String taskID, String tokenId) throws OrchestratorException {
+ public boolean terminate(String experimentId, String processId, String tokenId) throws OrchestratorException {
String gatewayId = null;
try {
CredentialReader credentialReader = GFacUtils.getCredentialReader();
@@ -134,8 +125,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
gatewayId = ServerSettings.getDefaultUserGateway();
}
- TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID, taskID, gatewayId, tokenId);
- MessageContext messageContext = new MessageContext(taskTerminateEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
+ ProcessTerminateEvent processTerminateEvent = new ProcessTerminateEvent(processId, gatewayId, tokenId);
+ MessageContext messageContext = new MessageContext(processTerminateEvent, MessageType.TERMINATEPROCESS, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
return true;
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index b885da5..12c35c8 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -45,20 +45,20 @@ public interface JobSubmitter {
/**
* This is similar to submit with expId and taskId but this has extra param called token
- * @param experimentID
- * @param taskID
+ * @param experimentId
+ * @param processId
* @param tokenId
* @return
* @throws OrchestratorException
*/
- boolean submit(String experimentID,String taskID,String tokenId) throws OrchestratorException;
+ boolean submit(String experimentId,String processId,String tokenId) throws OrchestratorException;
/**
* This can be used to terminate the experiment
- * @param experimentID
- * @param taskID
+ * @param experimentId
+ * @param processId
* @return
* @throws OrchestratorException
*/
- boolean terminate(String experimentID,String taskID, String tokenId)throws OrchestratorException;
+ boolean terminate(String experimentId,String processId, String tokenId)throws OrchestratorException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 589b707..c0d63d6 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -29,12 +29,8 @@ import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.PublisherFactory;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessConsumer;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -43,7 +39,6 @@ import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.ExperimentType;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
-import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.status.ExperimentState;
import org.apache.airavata.model.status.ExperimentStatus;
@@ -56,12 +51,14 @@ import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource;
import org.apache.airavata.registry.cpi.*;
-import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -72,8 +69,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private String airavataUserName;
private String gatewayName;
private Publisher publisher;
- private RabbitMQProcessConsumer rabbitMQProcessConsumer;
- private RabbitMQProcessPublisher rabbitMQProcessPublisher;
/**
* Query orchestrator server to fetch the CPI version
@@ -99,7 +94,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
appCatalog = RegistryFactory.getAppCatalog();
orchestrator.initialize();
orchestrator.getOrchestratorContext().setPublisher(this.publisher);
- startProcessConsumer();
} catch (OrchestratorException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -112,18 +106,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
}
- private void startProcessConsumer() throws OrchestratorException {
- try {
- rabbitMQProcessConsumer = new RabbitMQProcessConsumer();
- ProcessConsumer processConsumer = new ProcessConsumer();
- Thread thread = new Thread(processConsumer);
- thread.start();
- } catch (AiravataException e) {
- throw new OrchestratorException("Error while starting process consumer", e);
- }
-
- }
-
/**
* * After creating the experiment Data user have the * experimentID as the
* handler to the experiment, during the launchExperiment * We just have to
@@ -133,10 +115,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
* @param experimentId
*/
public boolean launchExperiment(String experimentId, String token) throws TException {
- ExperimentModel experiment = null; // this will inside the bottom catch statement
+ ExperimentModel experiment = null;
try {
- experiment = (ExperimentModel) experimentCatalog.get(
- ExperimentCatalogModelType.EXPERIMENT, experimentId);
+ experiment = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
if (experiment == null) {
log.error(experimentId, "Error retrieving the Experiment by the given experimentID: {} ", experimentId);
return false;
@@ -430,14 +411,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
// }
}
- public synchronized RabbitMQProcessPublisher getRabbitMQProcessPublisher() throws Exception {
- if (rabbitMQProcessPublisher == null) {
- rabbitMQProcessPublisher = new RabbitMQProcessPublisher();
- }
- return rabbitMQProcessPublisher;
- }
-
-
private class SingleAppExperimentRunner implements Runnable {
String experimentId;
@@ -497,39 +470,4 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
return true;
}
}
-
- private class ProcessConsumer implements Runnable, MessageHandler{
-
-
- @Override
- public void run() {
- try {
- rabbitMQProcessConsumer.listen(this);
- } catch (AiravataException e) {
- log.error("Error while listen to the RabbitMQProcessConsumer");
- }
- }
-
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- props.put(MessagingConstants.RABBIT_QUEUE, RabbitMQProcessPublisher.PROCESS);
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, RabbitMQProcessPublisher.PROCESS);
- return props;
- }
-
- @Override
- public void onMessage(MessageContext msgCtx) {
- TBase event = msgCtx.getEvent();
- if (event instanceof ProcessSubmitEvent) {
- ProcessSubmitEvent processSubmitEvent = (ProcessSubmitEvent) event;
- try {
- launchProcess(processSubmitEvent.getProcessId(), processSubmitEvent.getCredentialToken());
- } catch (TException e) {
- log.error("Error while launching task : " + processSubmitEvent.getProcessId());
- }
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
deleted file mode 100644
index f9f8115..0000000
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/DataModelUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.orchestrator.util;
-
-import java.util.List;
-
-import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.AppCatalogException;
-import org.apache.airavata.registry.cpi.ApplicationInterface;
-import org.apache.airavata.model.util.ExecutionType;
-import org.apache.airavata.model.experiment.ExperimentModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DataModelUtils {
-
- private final static Logger logger = LoggerFactory.getLogger(DataModelUtils.class);
- public static ExecutionType getExecutionType(String gatewayId, ExperimentModel experiment){
- try {
- ApplicationInterface applicationInterface = RegistryFactory.getAppCatalog().getApplicationInterface();
- List<String> allApplicationInterfaceIds = applicationInterface.getAllApplicationInterfaceIds();
- String applicationId = experiment.getExecutionId();
- if (allApplicationInterfaceIds.contains(applicationId)){
- return ExecutionType.SINGLE_APP;
- } else {
- List<String> allWorkflows = RegistryFactory.getAppCatalog().getWorkflowCatalog().getAllWorkflows(gatewayId);
- if (allWorkflows.contains(applicationId)){
- return ExecutionType.WORKFLOW;
- }
- }
- } catch (AppCatalogException e) {
- logger.error(e.getMessage(), e);
- }
- return ExecutionType.UNKNOWN;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/ed5a1720/thrift-interface-descriptions/airavata-api/messaging_events.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/airavata-api/messaging_events.thrift b/thrift-interface-descriptions/airavata-api/messaging_events.thrift
index a863edc..1d2a537 100644
--- a/thrift-interface-descriptions/airavata-api/messaging_events.thrift
+++ b/thrift-interface-descriptions/airavata-api/messaging_events.thrift
@@ -40,9 +40,9 @@ enum MessageType {
TASK,
PROCESS,
JOB,
- LAUNCHTASK,
- TERMINATETASK,
- TASKOUTPUT
+ LAUNCHPROCESS,
+ TERMINATEPROCESS,
+ PROCESSOUTPUT
}
struct ExperimentStatusChangeEvent {
@@ -110,22 +110,15 @@ struct JobIdentifier {
// }
struct ProcessSubmitEvent{
- 1: required string processId;
- 2: required string credentialToken;
-}
-
-struct TaskSubmitEvent{
- 1: required string experimentId,
- 2: required string taskId,
- 3: required string gatewayId,
- 4: required string tokenId
+ 1: required string processId,
+ 2: required string gatewayId,
+ 3: required string tokenId
}
-struct TaskTerminateEvent{
- 1: required string experimentId,
- 2: required string taskId,
- 3: required string gatewayId,
- 4: required string tokenId
+struct ProcessTerminateEvent{
+ 1: required string processId,
+ 2: required string gatewayId,
+ 3: required string tokenId
}
struct JobStatusChangeEvent {