You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:34 UTC

[07/15] airavata git commit: implementing queue submission without curator

implementing queue submission without curator


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/840e627b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/840e627b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/840e627b

Branch: refs/heads/master
Commit: 840e627b4e24baef8dbf62df8da1042380cb8af1
Parents: 60788ef
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Feb 17 23:58:04 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Feb 17 23:58:04 2015 -0500

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |  17 ++-
 .../airavata/common/utils/AiravataZKUtils.java  |  22 +++
 .../airavata/gfac/server/GfacServerHandler.java | 133 +++++--------------
 .../messaging/core/impl/RabbitMQProducer.java   |  18 ++-
 .../core/impl/RabbitMQTaskLaunchConsumer.java   |  22 +--
 .../core/impl/RabbitMQTaskLaunchPublisher.java  |   5 +-
 6 files changed, 102 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 6937c25..1e9d983 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -94,14 +94,16 @@ public class CreateLaunchExperiment {
 
     public static void createAndLaunchExp() throws TException {
 //        final String expId = createEchoExperimentForFSD(airavataClient);
+        List<String> experimentIds = new ArrayList<String>();
         try {
-            for (int i = 0; i < 10; i++) {
+            for (int i = 0; i < 100; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
 //                final String expId = createEchoExperimentForFSD(airavataClient);
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //               final String expId = createEchoExperimentForStampede(airavataClient);
 //                final String expId = createEchoExperimentForTrestles(airavataClient);
                 final String expId = createExperimentEchoForLocalHost(airavataClient);
+                experimentIds.add(expId);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
@@ -115,11 +117,20 @@ public class CreateLaunchExperiment {
 //                final String expId = createExperimentTRINITYStampede(airavataClient);
 //                final String expId = createExperimentAUTODOCKStampede(airavataClient); // this is not working , we need to register AutoDock app on stampede
 //            	  final String expId = "Ultrascan_ln_eb029947-391a-4ccf-8ace-9bafebe07cc0";
-            	System.out.println("Experiment ID : " + expId);
+                System.out.println("Experiment ID : " + expId);
 //                updateExperiment(airavata, expId);
-                
+
                 launchExperiment(airavataClient, expId);
             }
+
+            Thread.sleep(10000);
+
+            for(String exId:experimentIds) {
+                Experiment experiment = airavataClient.getExperiment(exId);
+                System.out.println(experiment.getExperimentStatus().toString());
+            }
+
+
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index f91fc3c..46f06f1 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 public class AiravataZKUtils {
@@ -172,4 +173,25 @@ public class AiravataZKUtils {
             logger.info("Skipping Zookeeper embedded startup ...");
         }
     }
+
+    public static void storeDeliveryTag(ZooKeeper zk,String newExpNode,Double deliveryTag) throws KeeperException, InterruptedException {
+        String s = zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        Stat expParent = zk.exists(newExpNode, false);
+        if (expParent != null) {
+            zk.setData(newExpNode, toByteArray(deliveryTag),
+                    expParent.getVersion());
+        }
+    }
+
+    public static byte[] toByteArray(double value) {
+        byte[] bytes = new byte[8];
+        ByteBuffer.wrap(bytes).putDouble(value);
+        return bytes;
+    }
+
+    public static double toDouble(byte[] bytes) {
+        return ByteBuffer.wrap(bytes).getDouble();
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 679a5ee..1c0f095 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -45,30 +45,23 @@ import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 
-public class GfacServerHandler implements GfacService.Iface, Watcher{
+public class GfacServerHandler implements GfacService.Iface, Watcher {
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
 
+    private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+
     private Registry registry;
     private AppCatalog appCatalog;
 
@@ -80,8 +73,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
 
     private ZooKeeper zk;
 
-    private boolean connected = false;
-
     private static Integer mutex = -1;
 
     private MonitorPublisher publisher;
@@ -94,16 +85,10 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
 
     private List<Future> inHandlerFutures;
 
-    private String nodeName = null;
-
-    private CuratorFramework curatorFramework = null;
 
     private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
 
-    private BlockingQueue<TaskTerminateEvent> taskTerminateEvents;
-
-    private CuratorClient curatorClient;
-    public GfacServerHandler() throws Exception{
+    public GfacServerHandler() throws Exception {
         // registering with zk
         try {
             String zkhostPort = AiravataZKUtils.getZKhostPort();
@@ -112,7 +97,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
             gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
             gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-            nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
             synchronized (mutex) {
                 mutex.wait();  // waiting for the syncConnected event
             }
@@ -128,17 +112,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             inHandlerFutures = new ArrayList<Future>();
 
             if (ServerSettings.isGFacPassiveMode()) {
-                taskSubmitEvents = new LinkedBlockingDeque<TaskSubmitEvent>();
-                taskTerminateEvents = new LinkedBlockingDeque<TaskTerminateEvent>();
-                curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
-                curatorClient = new CuratorClient(curatorFramework, nodeName);
+                rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+                rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
 
-                curatorFramework.start();
-                curatorClient.start();
             }
-
-
-            } catch (ApplicationSettingsException e) {
+        } catch (ApplicationSettingsException e) {
             logger.error("Error initialising GFAC", e);
             throw new Exception("Error initialising GFAC", e);
         } catch (InterruptedException e) {
@@ -184,7 +162,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             zk.create(gfacExperiments + File.separator + instanceId,
                     airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT);
-        }else{
+        } else {
             logger.error(" Zookeeper is inconsistent state  !!!!!");
         }
     }
@@ -195,9 +173,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             logger.info(state.name());
             if (state == Event.KeeperState.SyncConnected) {
                 mutex.notify();
-                connected = true;
-            } else if(state == Event.KeeperState.Expired ||
-                    state == Event.KeeperState.Disconnected){
+            } else if (state == Event.KeeperState.Expired ||
+                    state == Event.KeeperState.Disconnected) {
                 try {
                     mutex = -1;
                     zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
@@ -292,24 +269,29 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     public void setAiravataUserName(String airavataUserName) {
         this.airavataUserName = airavataUserName;
     }
+
     protected void setGatewayProperties() throws ApplicationSettingsException {
-         setAiravataUserName(ServerSettings.getDefaultUser());
-         setGatewayName(ServerSettings.getDefaultUserGateway());
-     }
+        setAiravataUserName(ServerSettings.getDefaultUser());
+        setGatewayName(ServerSettings.getDefaultUserGateway());
+    }
 
-    private GFac getGfac()throws TException{
+    private GFac getGfac() throws TException {
         try {
-            return new BetterGfacImpl(registry, appCatalog, zk,publisher);
+            return new BetterGfacImpl(registry, appCatalog, zk, publisher);
         } catch (Exception e) {
-            throw new TException("Error initializing gfac instance",e);
+            throw new TException("Error initializing gfac instance", e);
         }
     }
 
     private class TaskLaunchMessageHandler implements MessageHandler {
         public static final String LAUNCH_TASK = "launch.task";
         public static final String TERMINATE_TASK = "teminate.task";
-        public TaskLaunchMessageHandler(){
+        private String experimentNode;
+        private String nodeName;
 
+        public TaskLaunchMessageHandler() {
+            experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0");
         }
 
         public Map<String, Object> getProperties() {
@@ -318,6 +300,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             keys.add(LAUNCH_TASK);
             keys.add(TERMINATE_TASK);
             props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
+            props.put(MessagingConstants.RABBIT_QUEUE, LAUNCH_TASK);
             return props;
         }
 
@@ -328,9 +311,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    taskSubmitEvents.add(event);
-
-
+                    experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+
+                    try {
+                        GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId());
+                        submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+                    } catch (KeeperException e) {
+                        logger.error(nodeName + " was interrupted.");
+                    } catch (InterruptedException e) {
+                        logger.error(e.getMessage(), e);
+                    }
                     System.out.println(" Message Received with message id '" + message.getMessageId()
                             + "' and with message type '" + message.getType());
                 } catch (TException e) {
@@ -351,61 +341,4 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             }
         }
     }
-
-    public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
-        private final String name;
-        private final LeaderSelector leaderSelector;
-        private final AtomicInteger leaderCount = new AtomicInteger();
-        private final String path;
-        private String experimentNode;
-
-        public CuratorClient(CuratorFramework client, String name) {
-            this.name = name;
-            // create a leader selector using the given path for management
-            // all participants in a given leader selection must use the same path
-            // ExampleClient here is also a LeaderSelectorListener but this isn't required
-            experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-            path = experimentNode + File.separator + "leader";
-            leaderSelector = new LeaderSelector(client, path, this);
-            // for most cases you will want your instance to requeue when it relinquishes leadership
-            leaderSelector.autoRequeue();
-        }
-
-        public void start() throws IOException {
-            // the selection for this instance doesn't start until the leader selector is started
-            // leader selection is done in the background so this call to leaderSelector.start() returns immediately
-            leaderSelector.start();
-        }
-
-        @Override
-        public void close() throws IOException {
-            leaderSelector.close();
-        }
-
-        @Override
-        public void takeLeadership(CuratorFramework client) throws Exception {
-            // we are now the leader. This method should not return until we want to relinquish leadership
-            final int waitSeconds = (int) (5 * Math.random()) + 1;
-
-            logger.info(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
-            logger.info(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
-            RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
-            String listenId = rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
-
-            TaskSubmitEvent event = taskSubmitEvents.take();
-            try {
-                GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId());
-                submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
-                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
-            } catch (InterruptedException e) {
-                logger.error(name + " was interrupted.");
-                Thread.currentThread().interrupt();
-            } finally {
-                Thread.sleep(5);
-                logger.info(name + " relinquishing leadership.: "+ new Date().toString());
-                rabbitMQTaskLaunchConsumer.stopListen(listenId);
-            }
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index 570b17f..fffeece 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -113,8 +113,10 @@ public class RabbitMQProducer {
                 log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName);
                 channel.basicQos(prefetchCount);
             }
-            channel.exchangeDeclare(exchangeName, getExchangeType, false);
-        } catch (Exception e) {
+            if(exchangeName!=null) {
+                channel.exchangeDeclare(exchangeName, getExchangeType, false);
+            }
+            } catch (Exception e) {
             reset();
             String msg = "could not open channel for exchange " + exchangeName;
             log.error(msg);
@@ -132,6 +134,18 @@ public class RabbitMQProducer {
         }
     }
 
+    public void sendToWorkerQueue(byte []message, String routingKey) throws Exception {
+        try {
+            channel.basicPublish( "", routingKey,
+                    MessageProperties.PERSISTENT_TEXT_PLAIN,
+                    message);
+        } catch (IOException e) {
+            String msg = "Failed to publish message to exchange: " + exchangeName;
+            log.error(msg, e);
+            throw new Exception(msg, e);
+        }
+    }
+
     private Connection createConnection() throws IOException {
         try {
             ConnectionFactory connectionFactory = new ConnectionFactory();

http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 4bc7468..1c7b0e8 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -24,6 +24,7 @@ import com.rabbitmq.client.*;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.messaging.core.MessageContext;
@@ -82,7 +83,7 @@ public class RabbitMQTaskLaunchConsumer {
             log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
 
             channel = connection.createChannel();
-            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//            channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
 
         } catch (Exception e) {
             String msg = "could not open channel for exchange " + taskLaunchExchangeName;
@@ -98,7 +99,6 @@ public class RabbitMQTaskLaunchConsumer {
             if (routing == null) {
                 throw new IllegalArgumentException("The routing key must be present");
             }
-
             List<String> keys = new ArrayList<String>();
             if (routing instanceof List) {
                 for (Object o : (List)routing) {
@@ -113,7 +113,7 @@ public class RabbitMQTaskLaunchConsumer {
             if (queueName == null) {
                 if (!channel.isOpen()) {
                     channel = connection.createChannel();
-                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//                    channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
                 }
                 queueName = channel.queueDeclare().getQueue();
             } else {
@@ -131,11 +131,11 @@ public class RabbitMQTaskLaunchConsumer {
             }
 
             // bind all the routing keys
-            for (String routingKey : keys) {
-                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
-            }
-
-            channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+//            for (String routingKey : keys) {
+//                channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+//            }
+            // autoAck=false, we will ack after task is done
+            channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
                 @Override
                 public void handleDelivery(String consumerTag,
                                            Envelope envelope,
@@ -147,6 +147,7 @@ public class RabbitMQTaskLaunchConsumer {
                         ThriftUtils.createThriftFromBytes(body, message);
                         TBase event = null;
                         String gatewayId = null;
+                        long deliveryTag = envelope.getDeliveryTag(); //todo store this in zookeeper, once job is done we can ack
                         if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
                             TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
                             ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
@@ -167,6 +168,11 @@ public class RabbitMQTaskLaunchConsumer {
                         MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
                         messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
                         handler.onMessage(messageContext);
+                        try {
+                            channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
+                        } catch (IOException e) {
+                            logger.error(e.getMessage(), e);
+                        }
                     } catch (TException e) {
                         String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
                         log.warn(msg, e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/840e627b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index 23b2379..0f95fbf 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.messaging.core.impl;
 
+import com.rabbitmq.client.MessageProperties;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -50,7 +51,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
             log.error(message, e);
             throw new AiravataException(message, e);
         }
-        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName,"fanout");
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, null,null);
         rabbitMQProducer.open();
     }
 
@@ -70,7 +71,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
                 routingKey = TERMINATE_TASK;
             }
             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
-            rabbitMQProducer.send(messageBody, routingKey);
+            rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
         } catch (TException e) {
             String msg = "Error while deserializing the object";
             log.error(msg, e);