You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/20 00:55:56 UTC

airavata git commit: Save and Ack Cancel deliveryTags.

Repository: airavata
Updated Branches:
  refs/heads/master 419cd6a3c -> a8a0fcea2


Save and Ack Cancel deliveryTags.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a8a0fcea
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a8a0fcea
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a8a0fcea

Branch: refs/heads/master
Commit: a8a0fcea2d1a3a4cbff27c8e0b663ae07b59eb15
Parents: 419cd6a
Author: shamrath <sh...@gmail.com>
Authored: Tue May 19 18:55:54 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Tue May 19 18:55:54 2015 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        | 27 +++++++++++-
 .../airavata/common/utils/AiravataZKUtils.java  | 16 +++++++
 .../airavata/gfac/server/GfacServerHandler.java | 27 ++++++------
 .../airavata/gfac/core/utils/GFacUtils.java     | 44 ++++++++++++--------
 4 files changed, 80 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index 934d4e5..0755052 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -133,7 +133,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 
     private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws KeeperException, InterruptedException, AiravataException {
         int count = 0;
-        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk,
+                experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
         if(deliveryTag>0) {
             if (ServerSettings.isGFacPassiveMode()) {
                 while (!consumer.isOpen() && count < 3) {
@@ -158,6 +159,30 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
         if (zk.exists(experimentPath, false) != null) {
             ZKUtil.deleteRecursive(zk, experimentPath);
         }
+
+        // ack cancel operation if exist
+        long cancelDT = AiravataZKUtils.getCancelDeliveryTagIfExist(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
+                zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+        count  = 0;
+        if (cancelDT > 0) {
+            while (!consumer.isOpen() && count < 3) {
+                try {
+                    consumer.reconnect();
+                } catch (AiravataException e) {
+                    count++;
+                }
+            }
+            try {
+                if (consumer.isOpen()) {
+                    consumer.sendAck(cancelDT);
+                }
+            } catch (Exception e) {
+                logger.error("Error sending the Ack for cancel operation, cancel experiment path : " + experimentPath);
+            }
+        }
+        if (cancelDT > 0) {
+            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+        }
     }
 
     public  ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index 9b42df9..ac617d9 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -45,6 +45,8 @@ public class AiravataZKUtils implements Watcher {
 
     public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
 
+    public static final String CANCEL_DELIVERY_TAG_POSTFIX = "-cancel-deliveryTag";
+
     @Override
     public void process(WatchedEvent event) {
 
@@ -219,4 +221,18 @@ public class AiravataZKUtils implements Watcher {
     public static double toDouble(byte[] bytes) {
         return ByteBuffer.wrap(bytes).getDouble();
     }
+
+    public static long getCancelDeliveryTagIfExist(String experimentId, ZooKeeper zk, String experimentNode, String pickedChild) throws KeeperException, InterruptedException {
+
+        String cancelDeliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentId +
+                AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
+        Stat exists = zk.exists(cancelDeliveryTagPath, false);
+        if (exists == null) {
+            return -1; // no cancel deliverytag found
+        } else {
+            return bytesToLong(zk.getData(cancelDeliveryTagPath, false, exists));
+        }
+
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index f944d91..a7b1799 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -33,6 +33,7 @@ import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
@@ -403,21 +404,17 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
-                    AiravataZKUtils.getExpStatePath(event.getExperimentId());
-                    cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getType());
-                } catch (TException e) {
-                    logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
-                    rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                } catch (InterruptedException e) {
-                    logger.error(e.getMessage(), e);
-                    rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                } catch (ApplicationSettingsException e) {
-                    logger.error(e.getMessage(), e);
-                    rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                } catch (KeeperException e) {
+                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+                    if (saveDeliveryTagSuccess) {
+                        cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
+                        System.out.println(" Message Received with message id '" + message.getMessageId()
+                                + "' and with message type '" + message.getType());
+                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                    } else {
+                        throw new GFacException("Terminate Task fail to save delivery tag : " + String.valueOf(message.getDeliveryTag()) + " \n" +
+                                "This happens when another cancel operation is being processed or experiment is in one of final states, complete|failed|cancelled.");
+                    }
+                } catch (Exception e) {
                     logger.error(e.getMessage(), e);
                     rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a8a0fcea/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 8966091..09adb7a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -38,6 +38,7 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.states.GfacHandlerState;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
 import org.apache.airavata.model.appcatalog.computeresource.*;
@@ -444,11 +445,11 @@ public class GFacUtils {
 			zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
 					CreateMode.PERSISTENT);
 		} else {
-			log.error("ExperimentID: " + experimentID + " taskID: " + taskID
-					+ " was running by some Gfac instance,but it failed");
-			if(newExperimentPath.equals(oldExperimentPath)){
-				log.info("Re-launch experiment came to the same GFac instance");
-			}else {
+			log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
+            removeCancelDeliveryTagNode(oldExperimentPath, zk); // remove previous cancel deliveryTagNode
+            if(newExperimentPath.equals(oldExperimentPath)){
+                log.info("Re-launch experiment came to the same GFac instance");
+            }else {
 				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
 				zk.create(newExperimentPath, zk.getData(oldExperimentPath, false, exists1),
 						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // recursively copy children
@@ -470,6 +471,13 @@ public class GFacUtils {
 		return true;
 	}
 
+    private static void removeCancelDeliveryTagNode(String experimentPath, ZooKeeper zk) throws KeeperException, InterruptedException {
+        Stat exists = zk.exists(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+        if (exists != null) {
+            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+        }
+    }
+
     private static void copyChildren(ZooKeeper zk, String oldPath, String newPath, int depth) throws KeeperException, InterruptedException {
         for (String childNode : zk.getChildren(oldPath, false)) {
             String oldChildPath = oldPath + File.separator + childNode;
@@ -537,29 +545,29 @@ public class GFacUtils {
 		return null;
 	}
 
-    public static void setExperimentCancel(String experimentId, String taskId, ZooKeeper zk, String experimentNode,
+    public static boolean setExperimentCancel(String experimentId, String taskId, ZooKeeper zk, String experimentNode,
                                            String pickedChild, String tokenId, long deliveryTag)throws KeeperException,
             InterruptedException {
-        // TODO : remove this if all went well
- /*       String experimentPath = experimentNode + File.separator + pickedChild;
+        String experimentPath = experimentNode + File.separator + pickedChild;
         String newExpNode = experimentPath + File.separator + experimentId;
         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
         if (experimentEntry == null) {
             // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
             log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
                     "This happen when experiment completed and already removed from the zookeeper");
+            return false;
         } else {
-            if (newExpNode.equals(experimentEntry)) {
-                log.info("Cancel experiment come to ");
-            }
-            Stat operation = zk.exists(experimentEntry + File.separator + "operation", false);
-            if (operation == null) { // if there is no entry, this will come when a user immediately cancel a job
-                zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-            } else { // if user submit the job to gfac then cancel during execution
-                zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(), operation.getVersion());
+            // check cancel operation is being processed for the same experiment.
+            Stat cancelState = zk.exists(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+            if (cancelState != null) {
+                // another cancel operation is being processed. only one cancel operation can exist for a given experiment.
+                return false;
             }
-        }*/
+
+            zk.create(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // save cancel delivery tag to be acknowledge at the end.
+            return true;
+        }
 
     }
     public static boolean isCancelled(String experimentID, ZooKeeper zk