You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:33 UTC

[06/15] airavata git commit: initial version of passive job submission

initial version of passive job submission


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/60788efe
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/60788efe
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/60788efe

Branch: refs/heads/master
Commit: 60788efec3a808274a19e9687a669cabad84e89b
Parents: a486b67
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Feb 17 22:09:15 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Feb 17 22:09:15 2015 -0500

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |  2 +-
 .../airavata/gfac/leader/CuratorClient.java     | 79 ------------------
 .../gfac/leader/LeaderSelectorExample.java      | 80 ------------------
 .../airavata/gfac/server/GfacServerHandler.java | 85 ++++++++++----------
 .../core/impl/RabbitMQTaskLaunchConsumer.java   |  1 -
 .../core/impl/GFACPassiveJobSubmitter.java      |  5 --
 6 files changed, 43 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b7121b9..6937c25 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -95,7 +95,7 @@ public class CreateLaunchExperiment {
     public static void createAndLaunchExp() throws TException {
 //        final String expId = createEchoExperimentForFSD(airavataClient);
         try {
-            for (int i = 0; i < 1; i++) {
+            for (int i = 0; i < 10; i++) {
 //                final String expId = createExperimentForSSHHost(airavata);
 //                final String expId = createEchoExperimentForFSD(airavataClient);
 //                final String expId = createMPIExperimentForFSD(airavataClient);

http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
deleted file mode 100644
index 2db9a6f..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.gfac.leader;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * An example leader selector client. Note that {@link LeaderSelectorListenerAdapter} which
- * has the recommended handling for connection state issues
- */
-public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
-    private final String name;
-    private final LeaderSelector leaderSelector;
-    private final AtomicInteger leaderCount = new AtomicInteger();
-
-    public CuratorClient(CuratorFramework client, String path, String name) {
-        this.name = name;
-
-        // create a leader selector using the given path for management
-        // all participants in a given leader selection must use the same path
-        // ExampleClient here is also a LeaderSelectorListener but this isn't required
-        leaderSelector = new LeaderSelector(client, path, this);
-
-        // for most cases you will want your instance to requeue when it relinquishes leadership
-        leaderSelector.autoRequeue();
-    }
-
-    public void start() throws IOException {
-        // the selection for this instance doesn't start until the leader selector is started
-        // leader selection is done in the background so this call to leaderSelector.start() returns immediately
-        leaderSelector.start();
-    }
-
-    @Override
-    public void close() throws IOException {
-        leaderSelector.close();
-    }
-
-    @Override
-    public void takeLeadership(CuratorFramework client) throws Exception {
-        // we are now the leader. This method should not return until we want to relinquish leadership
-
-        final int waitSeconds = (int) (5 * Math.random()) + 1;
-
-        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
-        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
-        try {
-            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
-        } catch (InterruptedException e) {
-            System.err.println(name + " was interrupted.");
-            Thread.currentThread().interrupt();
-        } finally {
-            System.out.println(name + " relinquishing leadership.\n");
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
deleted file mode 100644
index ad02641..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.leader;
-
-import com.google.common.collect.Lists;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.utils.CloseableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.List;
-
-public class LeaderSelectorExample {
-    private final static Logger logger = LoggerFactory.getLogger(LeaderSelectorExample.class);
-    private static final int CLIENT_QTY = 10;
-
-    private static final String PATH = "/examples/leader";
-
-    public static void main(String[] args) throws Exception
-    {
-        // all of the useful sample code is in ExampleClient.java
-
-        System.out.println("Create " + CLIENT_QTY + " clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur.");
-        System.out.println("Notice that leader election is fair: all clients will become leader and will do so the same number of times.");
-
-        try
-        {
-            for ( int i = 0; i < CLIENT_QTY; ++i )
-            {
-                CuratorFramework    client = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
-
-                CuratorClient       example = new CuratorClient(client, PATH, "Client #" + i);
-
-                client.start();
-                example.start();
-            }
-
-            System.out.println("Press enter/return to quit\n");
-            new BufferedReader(new InputStreamReader(System.in)).readLine();
-        }
-        finally
-        {
-            System.out.println("Shutting down...");
-
-            /*for ( CuratorClient exampleClient : examples )
-            {
-                CloseableUtils.closeQuietly(exampleClient);
-            }
-            for ( CuratorFramework client : clients )
-            {
-                CloseableUtils.closeQuietly(client);
-            }*/
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c838703..679a5ee 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -35,7 +35,6 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
-import org.apache.airavata.gfac.leader.CuratorClient;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
@@ -60,7 +59,9 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -93,11 +94,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
 
     private List<Future> inHandlerFutures;
 
-    private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+    private String nodeName = null;
 
-    CuratorFramework curatorFramework = null;
+    private CuratorFramework curatorFramework = null;
 
+    private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
 
+    private BlockingQueue<TaskTerminateEvent> taskTerminateEvents;
+
+    private CuratorClient curatorClient;
     public GfacServerHandler() throws Exception{
         // registering with zk
         try {
@@ -107,6 +112,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
             gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
             gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
             synchronized (mutex) {
                 mutex.wait();  // waiting for the syncConnected event
             }
@@ -121,10 +127,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             BetterGfacImpl.startStatusUpdators(registry, zk, publisher);
             inHandlerFutures = new ArrayList<Future>();
 
-            if(ServerSettings.isGFacPassiveMode()) {
-                rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
-                rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+            if (ServerSettings.isGFacPassiveMode()) {
+                taskSubmitEvents = new LinkedBlockingDeque<TaskSubmitEvent>();
+                taskTerminateEvents = new LinkedBlockingDeque<TaskTerminateEvent>();
                 curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
+                curatorClient = new CuratorClient(curatorFramework, nodeName);
+
+                curatorFramework.start();
+                curatorClient.start();
             }
 
 
@@ -296,51 +306,37 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     }
 
     private class TaskLaunchMessageHandler implements MessageHandler {
-        private String experimentId;
-
-        private String nodeName;
-
+        public static final String LAUNCH_TASK = "launch.task";
+        public static final String TERMINATE_TASK = "teminate.task";
         public TaskLaunchMessageHandler(){
-            try {
-                nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
-            } catch (ApplicationSettingsException e) {
-                logger.error(e.getMessage(), e);
-            }
+
         }
 
         public Map<String, Object> getProperties() {
             Map<String, Object> props = new HashMap<String, Object>();
-            try {
-                props.put(MessagingConstants.RABBIT_ROUTING_KEY, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
-            } catch (ApplicationSettingsException e) {
-                // if we cannot find gfac node name configured we set a random id
-                logger.error("airavata-server.properties should configure: " + Constants.ZOOKEEPER_GFAC_SERVER_NAME + " value.");
-                logger.error("listening to a random generated routing key");
-                props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
-            }
+            ArrayList<String> keys = new ArrayList<String>();
+            keys.add(LAUNCH_TASK);
+            keys.add(TERMINATE_TASK);
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
             return props;
         }
 
         public void onMessage(MessageContext message) {
-            if (message.getType().equals(MessageType.LAUNCHTASK)){
+            if (message.getType().equals(MessageType.LAUNCHTASK)) {
                 try {
                     TaskSubmitEvent event = new TaskSubmitEvent();
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    CuratorClient curatorClient = new CuratorClient(curatorFramework, event, nodeName);
-                    try {
-                        curatorClient.start();
-                    } catch (IOException e) {
-                        logger.error(e.getMessage(), e);
-                    }
+                    taskSubmitEvents.add(event);
 
-                        System.out.println(" Message Received with message id '" + message.getMessageId()
-                                + "' and with message type '" + message.getType());
-                    } catch (TException e) {
+
+                    System.out.println(" Message Received with message id '" + message.getMessageId()
+                            + "' and with message type '" + message.getType());
+                } catch (TException e) {
                     logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
                 }
-            }else if(message.getType().equals(MessageType.TERMINATETASK)){
+            } else if (message.getType().equals(MessageType.TERMINATETASK)) {
                 try {
                     TaskTerminateEvent event = new TaskTerminateEvent();
                     TBase messageEvent = message.getEvent();
@@ -361,18 +357,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         private final LeaderSelector leaderSelector;
         private final AtomicInteger leaderCount = new AtomicInteger();
         private final String path;
-        private TaskSubmitEvent event;
         private String experimentNode;
 
-        public CuratorClient(CuratorFramework client, TaskSubmitEvent taskSubmitEvent, String name) {
+        public CuratorClient(CuratorFramework client, String name) {
             this.name = name;
-            this.event = taskSubmitEvent;
-            this.path = File.separator + event.getExperimentId() + "-" + event.getTaskId() + "-" + event.getGatewayId();
             // create a leader selector using the given path for management
             // all participants in a given leader selection must use the same path
             // ExampleClient here is also a LeaderSelectorListener but this isn't required
-            leaderSelector = new LeaderSelector(client, path, this);
             experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            path = experimentNode + File.separator + "leader";
+            leaderSelector = new LeaderSelector(client, path, this);
             // for most cases you will want your instance to requeue when it relinquishes leadership
             leaderSelector.autoRequeue();
         }
@@ -393,18 +387,23 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             // we are now the leader. This method should not return until we want to relinquish leadership
             final int waitSeconds = (int) (5 * Math.random()) + 1;
 
-            System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
-            System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+            logger.info(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
+            logger.info(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+            RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+            String listenId = rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
 
+            TaskSubmitEvent event = taskSubmitEvents.take();
             try {
                 GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId());
                 submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
                 Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
             } catch (InterruptedException e) {
-                System.err.println(name + " was interrupted.");
+                logger.error(name + " was interrupted.");
                 Thread.currentThread().interrupt();
             } finally {
-                System.out.println(name + " relinquishing leadership.\n");
+                Thread.sleep(5);
+                logger.info(name + " relinquishing leadership.: "+ new Date().toString());
+                rabbitMQTaskLaunchConsumer.stopListen(listenId);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 056dcac..4bc7468 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -190,7 +190,6 @@ public class RabbitMQTaskLaunchConsumer {
                 for (String key : details.getRoutingKeys()) {
                     channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
                 }
-                channel.queueDelete(details.getQueueName(), true, true);
             } catch (IOException e) {
                 String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
                 log.debug(msg);

http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 78cc6b7..b5e25b1 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -102,7 +102,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
     public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException {
 
         ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client gfacClient = null;
         try {
             if (zk == null || !zk.getState().isConnected()) {
                 String zkhostPort = AiravataZKUtils.getZKhostPort();
@@ -151,8 +150,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             throw new OrchestratorException(e);
-        }finally {
-            gfacClient.getOutputProtocol().getTransport().close();
         }
         return true;
 
@@ -167,7 +164,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
      */
     public boolean terminate(String experimentID, String taskID) throws OrchestratorException {
         ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client localhost = null;
         try {
             if (zk == null || !zk.getState().isConnected()) {
                 String zkhostPort = AiravataZKUtils.getZKhostPort();
@@ -189,7 +185,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
                 logger.info("GFAC instance node data: " + gfacNodeData);
                 String[] split = gfacNodeData.split(":");
-                localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
                 if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
                     // before submitting the job we check again the state of the node
                     if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {