You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/20 00:55:56 UTC
airavata git commit: Save and Ack Cancel deliveryTags.
Repository: airavata
Updated Branches:
refs/heads/master 419cd6a3c -> a8a0fcea2
Save and Ack Cancel deliveryTags.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a8a0fcea
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a8a0fcea
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a8a0fcea
Branch: refs/heads/master
Commit: a8a0fcea2d1a3a4cbff27c8e0b663ae07b59eb15
Parents: 419cd6a
Author: shamrath <sh...@gmail.com>
Authored: Tue May 19 18:55:54 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Tue May 19 18:55:54 2015 -0400
----------------------------------------------------------------------
.../AiravataExperimentStatusUpdator.java | 27 +++++++++++-
.../airavata/common/utils/AiravataZKUtils.java | 16 +++++++
.../airavata/gfac/server/GfacServerHandler.java | 27 ++++++------
.../airavata/gfac/core/utils/GFacUtils.java | 44 ++++++++++++--------
4 files changed, 80 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index 934d4e5..0755052 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -133,7 +133,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws KeeperException, InterruptedException, AiravataException {
int count = 0;
- long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+ long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk,
+ experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
if(deliveryTag>0) {
if (ServerSettings.isGFacPassiveMode()) {
while (!consumer.isOpen() && count < 3) {
@@ -158,6 +159,30 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
if (zk.exists(experimentPath, false) != null) {
ZKUtil.deleteRecursive(zk, experimentPath);
}
+
+ // ack cancel operation if exist
+ long cancelDT = AiravataZKUtils.getCancelDeliveryTagIfExist(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
+ zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+ count = 0;
+ if (cancelDT > 0) {
+ while (!consumer.isOpen() && count < 3) {
+ try {
+ consumer.reconnect();
+ } catch (AiravataException e) {
+ count++;
+ }
+ }
+ try {
+ if (consumer.isOpen()) {
+ consumer.sendAck(cancelDT);
+ }
+ } catch (Exception e) {
+ logger.error("Error sending the Ack for cancel operation, cancel experiment path : " + experimentPath);
+ }
+ }
+ if (cancelDT > 0) {
+ ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+ }
}
public ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index 9b42df9..ac617d9 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -45,6 +45,8 @@ public class AiravataZKUtils implements Watcher {
public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
+ public static final String CANCEL_DELIVERY_TAG_POSTFIX = "-cancel-deliveryTag";
+
@Override
public void process(WatchedEvent event) {
@@ -219,4 +221,18 @@ public class AiravataZKUtils implements Watcher {
public static double toDouble(byte[] bytes) {
return ByteBuffer.wrap(bytes).getDouble();
}
+
+ public static long getCancelDeliveryTagIfExist(String experimentId, ZooKeeper zk, String experimentNode, String pickedChild) throws KeeperException, InterruptedException {
+
+ String cancelDeliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentId +
+ AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
+ Stat exists = zk.exists(cancelDeliveryTagPath, false);
+ if (exists == null) {
+ return -1; // no cancel deliverytag found
+ } else {
+ return bytesToLong(zk.getData(cancelDeliveryTagPath, false, exists));
+ }
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index f944d91..a7b1799 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -33,6 +33,7 @@ import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
@@ -403,21 +404,17 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
- AiravataZKUtils.getExpStatePath(event.getExperimentId());
- cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType());
- } catch (TException e) {
- logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (KeeperException e) {
+ boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+ if (saveDeliveryTagSuccess) {
+ cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType());
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ } else {
+ throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" +
+ "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled.");
+ }
+ } catch (Exception e) {
logger.error(e.getMessage(), e);
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 8966091..09adb7a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -38,6 +38,7 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacHandlerState;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.appcatalog.computeresource.*;
@@ -444,11 +445,11 @@ public class GFacUtils {
zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
CreateMode.PERSISTENT);
} else {
- log.error("ExperimentID: " + experimentID + " taskID: " + taskID
- + " was running by some Gfac instance,but it failed");
- if(newExperimentPath.equals(oldExperimentPath)){
- log.info("Re-launch experiment came to the same GFac instance");
- }else {
+ log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
+ removeCancelDeliveryTagNode(oldExperimentPath, zk); // remove previous cancel deliveryTagNode
+ if(newExperimentPath.equals(oldExperimentPath)){
+ log.info("Re-launch experiment came to the same GFac instance");
+ }else {
log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
zk.create(newExperimentPath, zk.getData(oldExperimentPath, false, exists1),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // recursively copy children
@@ -470,6 +471,13 @@ public class GFacUtils {
return true;
}
+ private static void removeCancelDeliveryTagNode(String experimentPath, ZooKeeper zk) throws KeeperException, InterruptedException {
+ Stat exists = zk.exists(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+ if (exists != null) {
+ ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+ }
+ }
+
private static void copyChildren(ZooKeeper zk, String oldPath, String newPath, int depth) throws KeeperException, InterruptedException {
for (String childNode : zk.getChildren(oldPath, false)) {
String oldChildPath = oldPath + File.separator + childNode;
@@ -537,29 +545,29 @@ public class GFacUtils {
return null;
}
- public static void setExperimentCancel(String experimentId, String taskId, ZooKeeper zk, String experimentNode,
+ public static boolean setExperimentCancel(String experimentId, String taskId, ZooKeeper zk, String experimentNode,
String pickedChild, String tokenId, long deliveryTag)throws KeeperException,
InterruptedException {
- // TODO : remove this if all went well
- /* String experimentPath = experimentNode + File.separator + pickedChild;
+ String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentId;
String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
if (experimentEntry == null) {
// This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
"This happen when experiment completed and already removed from the zookeeper");
+ return false;
} else {
- if (newExpNode.equals(experimentEntry)) {
- log.info("Cancel experiment come to ");
- }
- Stat operation = zk.exists(experimentEntry + File.separator + "operation", false);
- if (operation == null) { // if there is no entry, this will come when a user immediately cancel a job
- zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else { // if user submit the job to gfac then cancel during execution
- zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(), operation.getVersion());
+ // check cancel operation is being processed for the same experiment.
+ Stat cancelState = zk.exists(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+ if (cancelState != null) {
+ // another cancel operation is being processed. only one cancel operation can exist for a given experiment.
+ return false;
}
- }*/
+
+ zk.create(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // save cancel delivery tag to be acknowledge at the end.
+ return true;
+ }
}
public static boolean isCancelled(String experimentID, ZooKeeper zk