You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:34 UTC
[07/15] airavata git commit: implementing queue submission without
curator
implementing queue submission without curator
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/840e627b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/840e627b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/840e627b
Branch: refs/heads/master
Commit: 840e627b4e24baef8dbf62df8da1042380cb8af1
Parents: 60788ef
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Feb 17 23:58:04 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Feb 17 23:58:04 2015 -0500
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 17 ++-
.../airavata/common/utils/AiravataZKUtils.java | 22 +++
.../airavata/gfac/server/GfacServerHandler.java | 133 +++++--------------
.../messaging/core/impl/RabbitMQProducer.java | 18 ++-
.../core/impl/RabbitMQTaskLaunchConsumer.java | 22 +--
.../core/impl/RabbitMQTaskLaunchPublisher.java | 5 +-
6 files changed, 102 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 6937c25..1e9d983 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -94,14 +94,16 @@ public class CreateLaunchExperiment {
public static void createAndLaunchExp() throws TException {
// final String expId = createEchoExperimentForFSD(airavataClient);
+ List<String> experimentIds = new ArrayList<String>();
try {
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 100; i++) {
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createEchoExperimentForFSD(airavataClient);
// final String expId = createMPIExperimentForFSD(airavataClient);
// final String expId = createEchoExperimentForStampede(airavataClient);
// final String expId = createEchoExperimentForTrestles(airavataClient);
final String expId = createExperimentEchoForLocalHost(airavataClient);
+ experimentIds.add(expId);
// final String expId = createExperimentWRFTrestles(airavataClient);
// final String expId = createExperimentForBR2(airavataClient);
// final String expId = createExperimentForBR2Amber(airavataClient);
@@ -115,11 +117,20 @@ public class CreateLaunchExperiment {
// final String expId = createExperimentTRINITYStampede(airavataClient);
// final String expId = createExperimentAUTODOCKStampede(airavataClient); // this is not working , we need to register AutoDock app on stampede
// final String expId = "Ultrascan_ln_eb029947-391a-4ccf-8ace-9bafebe07cc0";
- System.out.println("Experiment ID : " + expId);
+ System.out.println("Experiment ID : " + expId);
// updateExperiment(airavata, expId);
-
+
launchExperiment(airavataClient, expId);
}
+
+ Thread.sleep(10000);
+
+ for(String exId:experimentIds) {
+ Experiment experiment = airavataClient.getExperiment(exId);
+ System.out.println(experiment.getExperimentStatus().toString());
+ }
+
+
} catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index f91fc3c..46f06f1 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.util.List;
public class AiravataZKUtils {
@@ -172,4 +173,25 @@ public class AiravataZKUtils {
logger.info("Skipping Zookeeper embedded startup ...");
}
}
+
+ public static void storeDeliveryTag(ZooKeeper zk,String newExpNode,Double deliveryTag) throws KeeperException, InterruptedException {
+ String s = zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ Stat expParent = zk.exists(newExpNode, false);
+ if (expParent != null) {
+ zk.setData(newExpNode, toByteArray(deliveryTag),
+ expParent.getVersion());
+ }
+ }
+
+ public static byte[] toByteArray(double value) {
+ byte[] bytes = new byte[8];
+ ByteBuffer.wrap(bytes).putDouble(value);
+ return bytes;
+ }
+
+ public static double toDouble(byte[] bytes) {
+ return ByteBuffer.wrap(bytes).getDouble();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 679a5ee..1c0f095 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -45,30 +45,23 @@ import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
-import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-public class GfacServerHandler implements GfacService.Iface, Watcher{
+public class GfacServerHandler implements GfacService.Iface, Watcher {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
+ private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+
private Registry registry;
private AppCatalog appCatalog;
@@ -80,8 +73,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private ZooKeeper zk;
- private boolean connected = false;
-
private static Integer mutex = -1;
private MonitorPublisher publisher;
@@ -94,16 +85,10 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private List<Future> inHandlerFutures;
- private String nodeName = null;
-
- private CuratorFramework curatorFramework = null;
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
- private BlockingQueue<TaskTerminateEvent> taskTerminateEvents;
-
- private CuratorClient curatorClient;
- public GfacServerHandler() throws Exception{
+ public GfacServerHandler() throws Exception {
// registering with zk
try {
String zkhostPort = AiravataZKUtils.getZKhostPort();
@@ -112,7 +97,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
@@ -128,17 +112,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
inHandlerFutures = new ArrayList<Future>();
if (ServerSettings.isGFacPassiveMode()) {
- taskSubmitEvents = new LinkedBlockingDeque<TaskSubmitEvent>();
- taskTerminateEvents = new LinkedBlockingDeque<TaskTerminateEvent>();
- curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
- curatorClient = new CuratorClient(curatorFramework, nodeName);
+ rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+ rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
- curatorFramework.start();
- curatorClient.start();
}
-
-
- } catch (ApplicationSettingsException e) {
+ } catch (ApplicationSettingsException e) {
logger.error("Error initialising GFAC", e);
throw new Exception("Error initialising GFAC", e);
} catch (InterruptedException e) {
@@ -184,7 +162,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
zk.create(gfacExperiments + File.separator + instanceId,
airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- }else{
+ } else {
logger.error(" Zookeeper is inconsistent state !!!!!");
}
}
@@ -195,9 +173,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
logger.info(state.name());
if (state == Event.KeeperState.SyncConnected) {
mutex.notify();
- connected = true;
- } else if(state == Event.KeeperState.Expired ||
- state == Event.KeeperState.Disconnected){
+ } else if (state == Event.KeeperState.Expired ||
+ state == Event.KeeperState.Disconnected) {
try {
mutex = -1;
zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
@@ -292,24 +269,29 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
public void setAiravataUserName(String airavataUserName) {
this.airavataUserName = airavataUserName;
}
+
protected void setGatewayProperties() throws ApplicationSettingsException {
- setAiravataUserName(ServerSettings.getDefaultUser());
- setGatewayName(ServerSettings.getDefaultUserGateway());
- }
+ setAiravataUserName(ServerSettings.getDefaultUser());
+ setGatewayName(ServerSettings.getDefaultUserGateway());
+ }
- private GFac getGfac()throws TException{
+ private GFac getGfac() throws TException {
try {
- return new BetterGfacImpl(registry, appCatalog, zk,publisher);
+ return new BetterGfacImpl(registry, appCatalog, zk, publisher);
} catch (Exception e) {
- throw new TException("Error initializing gfac instance",e);
+ throw new TException("Error initializing gfac instance", e);
}
}
private class TaskLaunchMessageHandler implements MessageHandler {
public static final String LAUNCH_TASK = "launch.task";
public static final String TERMINATE_TASK = "teminate.task";
- public TaskLaunchMessageHandler(){
+ private String experimentNode;
+ private String nodeName;
+ public TaskLaunchMessageHandler() {
+ experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0");
}
public Map<String, Object> getProperties() {
@@ -318,6 +300,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
keys.add(LAUNCH_TASK);
keys.add(TERMINATE_TASK);
props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
+ props.put(MessagingConstants.RABBIT_QUEUE, LAUNCH_TASK);
return props;
}
@@ -328,9 +311,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- taskSubmitEvents.add(event);
-
-
+ experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+
+ try {
+ GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId());
+ submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+ } catch (KeeperException e) {
+ logger.error(nodeName + " was interrupted.");
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
System.out.println(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getType());
} catch (TException e) {
@@ -351,61 +341,4 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
}
}
}
-
- public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
- private final String name;
- private final LeaderSelector leaderSelector;
- private final AtomicInteger leaderCount = new AtomicInteger();
- private final String path;
- private String experimentNode;
-
- public CuratorClient(CuratorFramework client, String name) {
- this.name = name;
- // create a leader selector using the given path for management
- // all participants in a given leader selection must use the same path
- // ExampleClient here is also a LeaderSelectorListener but this isn't required
- experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- path = experimentNode + File.separator + "leader";
- leaderSelector = new LeaderSelector(client, path, this);
- // for most cases you will want your instance to requeue when it relinquishes leadership
- leaderSelector.autoRequeue();
- }
-
- public void start() throws IOException {
- // the selection for this instance doesn't start until the leader selector is started
- // leader selection is done in the background so this call to leaderSelector.start() returns immediately
- leaderSelector.start();
- }
-
- @Override
- public void close() throws IOException {
- leaderSelector.close();
- }
-
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- // we are now the leader. This method should not return until we want to relinquish leadership
- final int waitSeconds = (int) (5 * Math.random()) + 1;
-
- logger.info(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
- logger.info(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
- RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
- String listenId = rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
-
- TaskSubmitEvent event = taskSubmitEvents.take();
- try {
- GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId());
- submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
- Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
- } catch (InterruptedException e) {
- logger.error(name + " was interrupted.");
- Thread.currentThread().interrupt();
- } finally {
- Thread.sleep(5);
- logger.info(name + " relinquishing leadership.: "+ new Date().toString());
- rabbitMQTaskLaunchConsumer.stopListen(listenId);
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index 570b17f..fffeece 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -113,8 +113,10 @@ public class RabbitMQProducer {
log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
channel.basicQos(prefetchCount);
}
- channel.exchangeDeclare(exchangeName, getExchangeType, false);
- } catch (Exception e) {
+ if(exchangeName!=null) {
+ channel.exchangeDeclare(exchangeName, getExchangeType, false);
+ }
+ } catch (Exception e) {
reset();
String msg = "could not open channel for exchange " + exchangeName;
log.error(msg);
@@ -132,6 +134,18 @@ public class RabbitMQProducer {
}
}
+ public void sendToWorkerQueue(byte []message, String routingKey) throws Exception {
+ try {
+ channel.basicPublish( "", routingKey,
+ MessageProperties.PERSISTENT_TEXT_PLAIN,
+ message);
+ } catch (IOException e) {
+ String msg = "Failed to publish message to exchange: " + exchangeName;
+ log.error(msg, e);
+ throw new Exception(msg, e);
+ }
+ }
+
private Connection createConnection() throws IOException {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 4bc7468..1c7b0e8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -24,6 +24,7 @@ import com.rabbitmq.client.*;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
@@ -82,7 +83,7 @@ public class RabbitMQTaskLaunchConsumer {
log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
channel = connection.createChannel();
- channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
} catch (Exception e) {
String msg = "could not open channel for exchange " + taskLaunchExchangeName;
@@ -98,7 +99,6 @@ public class RabbitMQTaskLaunchConsumer {
if (routing == null) {
throw new IllegalArgumentException("The routing key must be present");
}
-
List<String> keys = new ArrayList<String>();
if (routing instanceof List) {
for (Object o : (List)routing) {
@@ -113,7 +113,7 @@ public class RabbitMQTaskLaunchConsumer {
if (queueName == null) {
if (!channel.isOpen()) {
channel = connection.createChannel();
- channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
}
queueName = channel.queueDeclare().getQueue();
} else {
@@ -131,11 +131,11 @@ public class RabbitMQTaskLaunchConsumer {
}
// bind all the routing keys
- for (String routingKey : keys) {
- channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
- }
-
- channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+// for (String routingKey : keys) {
+// channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+// }
+ // autoAck=false, we will ack after task is done
+ channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
@@ -147,6 +147,7 @@ public class RabbitMQTaskLaunchConsumer {
ThriftUtils.createThriftFromBytes(body, message);
TBase event = null;
String gatewayId = null;
+ long deliveryTag = envelope.getDeliveryTag(); //todo store this in zookeeper, once job is done we can ack
if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
@@ -167,6 +168,11 @@ public class RabbitMQTaskLaunchConsumer {
MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
handler.onMessage(messageContext);
+ try {
+ channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
} catch (TException e) {
String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
log.warn(msg, e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index 23b2379..0f95fbf 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.messaging.core.impl;
+import com.rabbitmq.client.MessageProperties;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
@@ -50,7 +51,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
log.error(message, e);
throw new AiravataException(message, e);
}
- rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName,"fanout");
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
rabbitMQProducer.open();
}
@@ -70,7 +71,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
routingKey = TERMINATE_TASK;
}
byte[] messageBody = ThriftUtils.serializeThriftObject(message);
- rabbitMQProducer.send(messageBody, routingKey);
+ rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
} catch (TException e) {
String msg = "Error while deserializing the object";
log.error(msg, e);