You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:33 UTC
[06/15] airavata git commit: initial version of passive job submission
initial version of passive job submission
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/60788efe
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/60788efe
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/60788efe
Branch: refs/heads/master
Commit: 60788efec3a808274a19e9687a669cabad84e89b
Parents: a486b67
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Tue Feb 17 22:09:15 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Tue Feb 17 22:09:15 2015 -0500
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 2 +-
.../airavata/gfac/leader/CuratorClient.java | 79 ------------------
.../gfac/leader/LeaderSelectorExample.java | 80 ------------------
.../airavata/gfac/server/GfacServerHandler.java | 85 ++++++++++----------
.../core/impl/RabbitMQTaskLaunchConsumer.java | 1 -
.../core/impl/GFACPassiveJobSubmitter.java | 5 --
6 files changed, 43 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index b7121b9..6937c25 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -95,7 +95,7 @@ public class CreateLaunchExperiment {
public static void createAndLaunchExp() throws TException {
// final String expId = createEchoExperimentForFSD(airavataClient);
try {
- for (int i = 0; i < 1; i++) {
+ for (int i = 0; i < 10; i++) {
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createEchoExperimentForFSD(airavataClient);
// final String expId = createMPIExperimentForFSD(airavataClient);
http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
deleted file mode 100644
index 2db9a6f..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.gfac.leader;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * An example leader selector client. Note that {@link LeaderSelectorListenerAdapter} which
- * has the recommended handling for connection state issues
- */
-public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
- private final String name;
- private final LeaderSelector leaderSelector;
- private final AtomicInteger leaderCount = new AtomicInteger();
-
- public CuratorClient(CuratorFramework client, String path, String name) {
- this.name = name;
-
- // create a leader selector using the given path for management
- // all participants in a given leader selection must use the same path
- // ExampleClient here is also a LeaderSelectorListener but this isn't required
- leaderSelector = new LeaderSelector(client, path, this);
-
- // for most cases you will want your instance to requeue when it relinquishes leadership
- leaderSelector.autoRequeue();
- }
-
- public void start() throws IOException {
- // the selection for this instance doesn't start until the leader selector is started
- // leader selection is done in the background so this call to leaderSelector.start() returns immediately
- leaderSelector.start();
- }
-
- @Override
- public void close() throws IOException {
- leaderSelector.close();
- }
-
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- // we are now the leader. This method should not return until we want to relinquish leadership
-
- final int waitSeconds = (int) (5 * Math.random()) + 1;
-
- System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
- System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
- } catch (InterruptedException e) {
- System.err.println(name + " was interrupted.");
- Thread.currentThread().interrupt();
- } finally {
- System.out.println(name + " relinquishing leadership.\n");
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
deleted file mode 100644
index ad02641..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.leader;
-
-import com.google.common.collect.Lists;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.utils.CloseableUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.List;
-
-public class LeaderSelectorExample {
- private final static Logger logger = LoggerFactory.getLogger(LeaderSelectorExample.class);
- private static final int CLIENT_QTY = 10;
-
- private static final String PATH = "/examples/leader";
-
- public static void main(String[] args) throws Exception
- {
- // all of the useful sample code is in ExampleClient.java
-
- System.out.println("Create " + CLIENT_QTY + " clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur.");
- System.out.println("Notice that leader election is fair: all clients will become leader and will do so the same number of times.");
-
- try
- {
- for ( int i = 0; i < CLIENT_QTY; ++i )
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
-
- CuratorClient example = new CuratorClient(client, PATH, "Client #" + i);
-
- client.start();
- example.start();
- }
-
- System.out.println("Press enter/return to quit\n");
- new BufferedReader(new InputStreamReader(System.in)).readLine();
- }
- finally
- {
- System.out.println("Shutting down...");
-
- /*for ( CuratorClient exampleClient : examples )
- {
- CloseableUtils.closeQuietly(exampleClient);
- }
- for ( CuratorFramework client : clients )
- {
- CloseableUtils.closeQuietly(client);
- }*/
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c838703..679a5ee 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -35,7 +35,6 @@ import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
-import org.apache.airavata.gfac.leader.CuratorClient;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
@@ -60,7 +59,9 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -93,11 +94,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private List<Future> inHandlerFutures;
- private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+ private String nodeName = null;
- CuratorFramework curatorFramework = null;
+ private CuratorFramework curatorFramework = null;
+ private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
+ private BlockingQueue<TaskTerminateEvent> taskTerminateEvents;
+
+ private CuratorClient curatorClient;
public GfacServerHandler() throws Exception{
// registering with zk
try {
@@ -107,6 +112,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data
gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
@@ -121,10 +127,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
BetterGfacImpl.startStatusUpdators(registry, zk, publisher);
inHandlerFutures = new ArrayList<Future>();
- if(ServerSettings.isGFacPassiveMode()) {
- rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
- rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+ if (ServerSettings.isGFacPassiveMode()) {
+ taskSubmitEvents = new LinkedBlockingDeque<TaskSubmitEvent>();
+ taskTerminateEvents = new LinkedBlockingDeque<TaskTerminateEvent>();
curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
+ curatorClient = new CuratorClient(curatorFramework, nodeName);
+
+ curatorFramework.start();
+ curatorClient.start();
}
@@ -296,51 +306,37 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
}
private class TaskLaunchMessageHandler implements MessageHandler {
- private String experimentId;
-
- private String nodeName;
-
+ public static final String LAUNCH_TASK = "launch.task";
+ public static final String TERMINATE_TASK = "teminate.task";
public TaskLaunchMessageHandler(){
- try {
- nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- }
+
}
public Map<String, Object> getProperties() {
Map<String, Object> props = new HashMap<String, Object>();
- try {
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
- } catch (ApplicationSettingsException e) {
- // if we cannot find gfac node name configured we set a random id
- logger.error("airavata-server.properties should configure: " + Constants.ZOOKEEPER_GFAC_SERVER_NAME + " value.");
- logger.error("listening to a random generated routing key");
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
- }
+ ArrayList<String> keys = new ArrayList<String>();
+ keys.add(LAUNCH_TASK);
+ keys.add(TERMINATE_TASK);
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
return props;
}
public void onMessage(MessageContext message) {
- if (message.getType().equals(MessageType.LAUNCHTASK)){
+ if (message.getType().equals(MessageType.LAUNCHTASK)) {
try {
TaskSubmitEvent event = new TaskSubmitEvent();
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- CuratorClient curatorClient = new CuratorClient(curatorFramework, event, nodeName);
- try {
- curatorClient.start();
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
+ taskSubmitEvents.add(event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType());
- } catch (TException e) {
+
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType());
+ } catch (TException e) {
logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
}
- }else if(message.getType().equals(MessageType.TERMINATETASK)){
+ } else if (message.getType().equals(MessageType.TERMINATETASK)) {
try {
TaskTerminateEvent event = new TaskTerminateEvent();
TBase messageEvent = message.getEvent();
@@ -361,18 +357,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
private final String path;
- private TaskSubmitEvent event;
private String experimentNode;
- public CuratorClient(CuratorFramework client, TaskSubmitEvent taskSubmitEvent, String name) {
+ public CuratorClient(CuratorFramework client, String name) {
this.name = name;
- this.event = taskSubmitEvent;
- this.path = File.separator + event.getExperimentId() + "-" + event.getTaskId() + "-" + event.getGatewayId();
// create a leader selector using the given path for management
// all participants in a given leader selection must use the same path
// ExampleClient here is also a LeaderSelectorListener but this isn't required
- leaderSelector = new LeaderSelector(client, path, this);
experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ path = experimentNode + File.separator + "leader";
+ leaderSelector = new LeaderSelector(client, path, this);
// for most cases you will want your instance to requeue when it relinquishes leadership
leaderSelector.autoRequeue();
}
@@ -393,18 +387,23 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
// we are now the leader. This method should not return until we want to relinquish leadership
final int waitSeconds = (int) (5 * Math.random()) + 1;
- System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
- System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+ logger.info(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
+ logger.info(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+ RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+ String listenId = rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+ TaskSubmitEvent event = taskSubmitEvents.take();
try {
GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId());
submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
- System.err.println(name + " was interrupted.");
+ logger.error(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
- System.out.println(name + " relinquishing leadership.\n");
+ Thread.sleep(5);
+ logger.info(name + " relinquishing leadership.: "+ new Date().toString());
+ rabbitMQTaskLaunchConsumer.stopListen(listenId);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 056dcac..4bc7468 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -190,7 +190,6 @@ public class RabbitMQTaskLaunchConsumer {
for (String key : details.getRoutingKeys()) {
channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
}
- channel.queueDelete(details.getQueueName(), true, true);
} catch (IOException e) {
String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
log.debug(msg);
http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 78cc6b7..b5e25b1 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -102,7 +102,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException {
ZooKeeper zk = orchestratorContext.getZk();
- GfacService.Client gfacClient = null;
try {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
@@ -151,8 +150,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new OrchestratorException(e);
- }finally {
- gfacClient.getOutputProtocol().getTransport().close();
}
return true;
@@ -167,7 +164,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
*/
public boolean terminate(String experimentID, String taskID) throws OrchestratorException {
ZooKeeper zk = orchestratorContext.getZk();
- GfacService.Client localhost = null;
try {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
@@ -189,7 +185,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
logger.info("GFAC instance node data: " + gfacNodeData);
String[] split = gfacNodeData.split(":");
- localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
// before submitting the job we check again the state of the node
if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {