You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:36 UTC
[09/15] airavata git commit: adding support to proper acking for
messages
adding support to proper acking for messages
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1231c014
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1231c014
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1231c014
Branch: refs/heads/master
Commit: 1231c014bebd1d23c2bdd340b7d721abe279d45a
Parents: ffbb1b9
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Feb 25 00:59:09 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Feb 25 00:59:09 2015 -0500
----------------------------------------------------------------------
.../airavata/api/server/AiravataAPIServer.java | 1 +
.../client/samples/CreateLaunchExperiment.java | 23 +++--
.../airavata/gfac/server/GfacServerHandler.java | 48 +++++++++--
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 1 +
.../core/monitor/GfacInternalStatusUpdator.java | 3 +
.../airavata/gfac/core/utils/GFacUtils.java | 21 +++--
.../handlers/GridPullMonitorHandler.java | 1 +
.../messaging/client/RabbitMQListner.java | 4 +-
.../airavata/messaging/core/MessageContext.java | 17 ++++
.../core/impl/RabbitMQTaskLaunchConsumer.java | 10 ++-
.../server/OrchestratorServerHandler.java | 90 ++++++++++----------
.../util/OrchestratorRecoveryHandler.java | 1 +
.../core/impl/GFACPassiveJobSubmitter.java | 10 +--
13 files changed, 151 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
index 0e6da90..da42ce0 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/AiravataAPIServer.java
@@ -299,6 +299,7 @@ public class AiravataAPIServer implements IServer, Watcher{
@Override
synchronized public void process(WatchedEvent watchedEvent) {
+ logger.info(watchedEvent.getPath());
synchronized (mutex) {
Event.KeeperState state = watchedEvent.getState();
logger.info(state.name());
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c4c303f..78c2d71 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -47,17 +47,17 @@ import java.util.*;
public class CreateLaunchExperiment {
//FIXME: Read from a config file
-// public static final String THRIFT_SERVER_HOST = "localhost";
-// public static final int THRIFT_SERVER_PORT = 8930;
- public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
- public static final int THRIFT_SERVER_PORT = 9930;
+ public static final String THRIFT_SERVER_HOST = "localhost";
+ public static final int THRIFT_SERVER_PORT = 8930;
+// public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
+// public static final int THRIFT_SERVER_PORT = 9930;
private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
private static final String DEFAULT_USER = "default.registry.user";
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
+ private static String echoAppId = "Echo_1365a7fd-eae1-4575-b447-99afb4d79c82";
private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00";
@@ -93,7 +93,7 @@ public class CreateLaunchExperiment {
// final String expId = createEchoExperimentForFSD(airavataClient);
List<String> experimentIds = new ArrayList<String>();
try {
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 1; i++) {
// final String expId = createExperimentForSSHHost(airavata);
// final String expId = createEchoExperimentForFSD(airavataClient);
// final String expId = createMPIExperimentForFSD(airavataClient);
@@ -120,12 +120,11 @@ public class CreateLaunchExperiment {
launchExperiment(airavataClient, expId);
}
- Thread.sleep(10000);
-
- for(String exId:experimentIds) {
- Experiment experiment = airavataClient.getExperiment(exId);
- System.out.println(experiment.getExperimentStatus().toString());
- }
+ Thread.sleep(100);
+ for (String exId : experimentIds) {
+ Experiment experiment = airavataClient.getExperiment(exId);
+ System.out.println(experiment.getExperimentStatus().toString());
+ }
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1c0f095..cca793e 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -65,8 +65,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
private Registry registry;
private AppCatalog appCatalog;
- private String registryURL;
-
private String gatewayName;
private String airavataUserName;
@@ -144,12 +142,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
CreateMode.PERSISTENT);
}
String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
- String instantNode = gfacServer + File.separator + instanceId;
- zkStat = zk.exists(instantNode, true);
+ String instanceNode = gfacServer + File.separator + instanceId;
+ zkStat = zk.exists(instanceNode, true);
if (zkStat == null) {
- zk.create(instantNode,
+ zk.create(instanceNode,
airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node
+ zk.getChildren(instanceNode, true);
}
zkStat = zk.exists(gfacExperiments, false);
if (zkStat == null) {
@@ -168,6 +167,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
}
synchronized public void process(WatchedEvent watchedEvent) {
+ logger.info(watchedEvent.getPath());
+ logger.info(watchedEvent.getType().toString());
synchronized (mutex) {
Event.KeeperState state = watchedEvent.getState();
logger.info(state.name());
@@ -191,10 +192,39 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
} catch (KeeperException e) {
logger.error(e.getMessage(), e);
}
+ } else if (Event.EventType.NodeDeleted.equals(watchedEvent.getType())) {
+ String path = watchedEvent.getPath();
+ String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ if (path.startsWith(experimentNode)) {
+ // we got a watch when experiment is removed
+ String deliveryPath = path + GFacUtils.DELIVERY_TAG_POSTFIX;
+ try {
+ Stat exists = zk.exists(deliveryPath, false);
+ byte[] data = zk.getData(path + GFacUtils.DELIVERY_TAG_POSTFIX, false, exists);
+ long value = ByateArrayToLong(data);
+ logger.info("ExperimentId+taskId" + path);
+ logger.info("Sending Ack back to the Queue, because task is over");
+ rabbitMQTaskLaunchConsumer.sendAck(value);
+ ZKUtil.deleteRecursive(zk,deliveryPath);
+ } catch (KeeperException e) {
+ logger.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
}
}
}
+ private long ByateArrayToLong(byte[] data) {
+ long value = 0;
+ for (int i = 0; i < data.length; i++)
+ {
+ value += ((long) data[i] & 0xffL) << (8 * i);
+ }
+ return value;
+ }
+
public String getGFACServiceVersion() throws TException {
return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
}
@@ -314,12 +344,18 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
try {
- GFacUtils.createExperimentEntryForRPC(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId());
+ GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+ AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId());
submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
} catch (KeeperException e) {
logger.error(nodeName + " was interrupted.");
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ } catch (ApplicationSettingsException e) {
+ logger.error(e.getMessage(), e);
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
System.out.println(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getType());
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index bb612a6..00930e5 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -1158,6 +1158,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
public void process(WatchedEvent watchedEvent) {
+ log.info(watchedEvent.getPath());
if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
// node data is changed, this means node is cancelled.
log.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 7818da0..26902e7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -49,6 +50,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
MonitorID monitorID = statusChangeRequest.getMonitorID();
String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments") +
File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID();
+ String deliveryTagPath = experimentPath + GFacUtils.DELIVERY_TAG_POSTFIX;
Stat exists = null;
try {
if (!zk.getState().isConnected()) {
@@ -107,6 +109,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
}
public void process(WatchedEvent watchedEvent) {
+ logger.info(watchedEvent.getPath());
synchronized (mutex) {
Event.KeeperState state = watchedEvent.getState();
if (state == Event.KeeperState.SyncConnected) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9f104fa..c825ffd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -60,12 +60,14 @@ import java.io.*;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.*;
//import org.apache.airavata.commons.gfac.type.ActualParameter;
public class GFacUtils {
private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+ public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
private GFacUtils() {
}
@@ -1156,7 +1158,7 @@ public class GFacUtils {
// This method is dangerous because of moving the experiment data
public static boolean createExperimentEntryForPassive(String experimentID,
String taskID, ZooKeeper zk, String experimentNode,
- String pickedChild, String tokenId) throws KeeperException,
+ String pickedChild, String tokenId,long deliveryTag) throws KeeperException,
InterruptedException {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID
@@ -1165,15 +1167,14 @@ public class GFacUtils {
String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
String foundExperimentPath = null;
if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment
- List<String> runningGfacNodeNames = AiravataZKUtils
- .getAllGfacNodeNames(zk); // here we take old gfac servers
- // too
+ List<String> runningGfacNodeNames = AiravataZKUtils.getAllGfacNodeNames(zk); // here we take old gfac servers
+
for (String gfacServerNode : runningGfacNodeNames) {
if (!gfacServerNode.equals(pickedChild)) {
foundExperimentPath = experimentNode + File.separator
+ gfacServerNode + File.separator + experimentID
+ "+" + taskID;
- exists1 = zk.exists(foundExperimentPath, false);
+ exists1 = zk.exists(foundExperimentPath, true);
if (exists1 != null) { // when the experiment is found we
// break the loop
break;
@@ -1183,21 +1184,23 @@ public class GFacUtils {
if (exists1 == null) { // OK this is a pretty new experiment so we
// are going to create a new node
log.info("This is a new Job, so creating all the experiment docs from the scratch");
+ Stat expParent = zk.exists(newExpNode, false);
zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- Stat expParent = zk.exists(newExpNode, false);
if (tokenId != null && expParent != null) {
zk.setData(newExpNode, tokenId.getBytes(),
expParent.getVersion());
}
- zk.create(newExpNode + File.separator + "state", String
+ String s = zk.create(newExpNode + File.separator + "state", String
.valueOf(GfacExperimentState.LAUNCHED.getValue())
.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zk.exists(s1, true);// we want to know when this node get deleted
+ String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, ByteBuffer.allocate(8).putLong(deliveryTag).array(), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
CreateMode.PERSISTENT);
-
} else {
// ohhh this node exists in some other failed gfac folder, we
// have to move it to this gfac experiment list,safely
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index e64f596..d5f9f90 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -125,6 +125,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
public void process(WatchedEvent watchedEvent) {
+ logger.info(watchedEvent.getPath());
if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
// node data is changed, this means node is cancelled.
logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
index 601497a..48edbe8 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListner.java
@@ -28,7 +28,7 @@ import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.messaging.event.*;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.commons.cli.*;
@@ -67,7 +67,7 @@ public class RabbitMQListner {
String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
System.out.println("broker url " + brokerUrl);
final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
- RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
+ RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
consumer.listen(new MessageHandler() {
@Override
public Map<String, Object> getProperties() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index 0a39d92..272f413 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -32,6 +32,7 @@ public class MessageContext {
private final String messageId;
private final String gatewayId;
private Timestamp updatedTime;
+ private long deliveryTag;
public MessageContext(TBase message, MessageType type, String messageId, String gatewayId) {
@@ -41,6 +42,14 @@ public class MessageContext {
this.gatewayId = gatewayId;
}
+ public MessageContext(TBase event, MessageType type, String messageId, String gatewayId, long deliveryTag) {
+ this.event = event;
+ this.type = type;
+ this.messageId = messageId;
+ this.gatewayId = gatewayId;
+ this.deliveryTag = deliveryTag;
+ }
+
public TBase getEvent() {
return event;
}
@@ -64,4 +73,12 @@ public class MessageContext {
public String getGatewayId() {
return gatewayId;
}
+
+ public long getDeliveryTag() {
+ return deliveryTag;
+ }
+
+ public void setDeliveryTag(long deliveryTag) {
+ this.deliveryTag = deliveryTag;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 1c7b0e8..7c88a25 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -165,7 +165,7 @@ public class RabbitMQTaskLaunchConsumer {
event = taskTerminateEvent;
gatewayId = null;
}
- MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId,deliveryTag);
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
handler.onMessage(messageContext);
try {
@@ -241,4 +241,12 @@ public class RabbitMQTaskLaunchConsumer {
}
}
}
+
+ public void sendAck(long deliveryTag){
+ try {
+ channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b200468..f430bc9 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -292,43 +292,45 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
* This method gracefully handler gfac node failures
*/
synchronized public void process(WatchedEvent watchedEvent) {
+ log.info(watchedEvent.getPath());
synchronized (mutex) {
try {
Event.KeeperState state = watchedEvent.getState();
switch (state) {
- case SyncConnected:
- mutex.notify();
- break;
- case Expired:case Disconnected:
- try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
- synchronized (mutex) {
- mutex.wait(); // waiting for the syncConnected event
- }
- String airavataServerHostPort = ServerSettings
- .getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
- + ":"
- + ServerSettings
- .getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
- String OrchServer = ServerSettings
- .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
- registerOrchestratorService(airavataServerHostPort, OrchServer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (ApplicationSettingsException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- break;
- }
+ case SyncConnected:
+ mutex.notify();
+ break;
+ case Expired:
+ case Disconnected:
+ try {
+ zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, this);
+ synchronized (mutex) {
+ mutex.wait(); // waiting for the syncConnected event
+ }
+ String airavataServerHostPort = ServerSettings
+ .getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+ + ":"
+ + ServerSettings
+ .getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+ String OrchServer = ServerSettings
+ .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
+ registerOrchestratorService(airavataServerHostPort, OrchServer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+ break;
+ }
if (watchedEvent.getPath() != null
&& watchedEvent.getPath().startsWith(
- ServerSettings.getSetting(
- Constants.ZOOKEEPER_GFAC_SERVER_NODE,
- "/gfac-server"))) {
+ ServerSettings.getSetting(
+ Constants.ZOOKEEPER_GFAC_SERVER_NODE,
+ "/gfac-server"))) {
List<String> children = zk.getChildren(ServerSettings
.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
"/gfac-server"), true);
@@ -340,18 +342,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
+ File.separator + gfacNodes, this);
}
switch (watchedEvent.getType()) {
- case NodeCreated:
- mutex.notify();
- break;
- case NodeDeleted:
- // here we have to handle gfac node shutdown case
- if (children.size() == 0) {
- log.error("There are not gfac instances to route failed jobs");
- return;
- }
- // we recover one gfac node at a time
- final WatchedEvent event = watchedEvent;
- final OrchestratorServerHandler handler = this;
+ case NodeCreated:
+ mutex.notify();
+ break;
+ case NodeDeleted:
+ // here we have to handle gfac node shutdown case
+ if (children.size() == 0) {
+ log.error("There are not gfac instances to route failed jobs");
+ return;
+ }
+ // we recover one gfac node at a time
+ final WatchedEvent event = watchedEvent;
+ final OrchestratorServerHandler handler = this;
/*(new Thread() { // disabling ft implementation with zk
public void run() {
int retry = 0;
@@ -372,7 +374,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
}
}).start();*/
- break;
+ break;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index fb3bd51..f19b949 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -95,6 +95,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
}
synchronized public void process(WatchedEvent watchedEvent) {
+ log.info(watchedEvent.getPath());
synchronized (mutex) {
Event.KeeperState state = watchedEvent.getState();
switch (state) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/1231c014/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index b5e25b1..8066113 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -187,11 +187,9 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
String[] split = gfacNodeData.split(":");
if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
// before submitting the job we check again the state of the node
- if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null,null);
- MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null);
- publisher.publish(messageContext);
- }
+ TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null, null);
+ MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), null);
+ publisher.publish(messageContext);
}
}
} catch (InterruptedException e) {
@@ -217,6 +215,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
}
synchronized public void process(WatchedEvent event) {
+ logger.info(getClass().getName() + event.getPath());
+ logger.info(getClass().getName()+event.getType());
synchronized (mutex) {
switch (event.getState()) {
case SyncConnected: