You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:40 UTC

[13/15] airavata git commit: deleting delivery token

deleting delivery token


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5310bb4b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5310bb4b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5310bb4b

Branch: refs/heads/master
Commit: 5310bb4b570bd0b09208e710f121e88418261c5f
Parents: cb6c4cc
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Fri Mar 20 16:14:07 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Fri Mar 20 16:14:07 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |   3 +
 .../core/monitor/GfacInternalStatusUpdator.java |   2 +
 .../airavata/gfac/core/utils/GFacUtils.java     | 134 ++++++++-----------
 3 files changed, 61 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index d45710e..855bfc5 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -59,6 +59,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 
 
 public class GfacServerHandler implements GfacService.Iface, Watcher {
@@ -79,6 +80,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
 
     private static Integer mutex = -1;
 
+    private static Lock lock;
+
     private MonitorPublisher publisher;
 
     private String gfacServer;

http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index eaa3c5f..6c456b0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -98,6 +98,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
                     consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
                             monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
                 }
+                ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
                 ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             case FAILED:
@@ -107,6 +108,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
                     consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
                             monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
                 }
+                ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
                 ZKUtil.deleteRecursive(zk, experimentPath);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 707cf97..a0fc2cb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1157,94 +1157,72 @@ public class GFacUtils {
 
 	// This method is dangerous because of moving the experiment data
 	public static boolean createExperimentEntryForPassive(String experimentID,
-													  String taskID, ZooKeeper zk, String experimentNode,
-													  String pickedChild, String tokenId,long deliveryTag) throws KeeperException,
+														  String taskID, ZooKeeper zk, String experimentNode,
+														  String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
 			InterruptedException, ApplicationSettingsException {
 		String experimentPath = experimentNode + File.separator + pickedChild;
 		String newExpNode = experimentPath + File.separator + experimentID
 				+ "+" + taskID;
 		Stat exists1 = zk.exists(newExpNode, false);
 		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
-		if (exists1 == null && experimentEntry == null) {  // this means this is a very new experiment
-				// are going to create a new node
-				log.info("This is a new Job, so creating all the experiment docs from the scratch");
-				Stat expParent = zk.exists(newExpNode, false);
-				zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
-
-				if (tokenId != null && expParent != null) {
-					zk.setData(newExpNode, tokenId.getBytes(),
-							expParent.getVersion());
-				}
-				String s = zk.create(newExpNode + File.separator + "state", String
-								.valueOf(GfacExperimentState.LAUNCHED.getValue())
-								.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
-				String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
-				zk.exists(s1, true);// we want to know when this node get deleted
-				String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
-						CreateMode.PERSISTENT);
-		}else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){
-			// this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment
-			// node to gfac node specific location, because original request execution will fail with errors
-			log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !");
+		if (exists1 != null) {
+			log.error("This request is wrong because its already running in the same instance");
 			return false;
-		} else if(experimentEntry != null && !GFacUtils.isCancelled(experimentID,taskID,zk)){
-			if(ServerSettings.isGFacPassiveMode()){
-				log.error("ExperimentID: " + experimentID + " taskID: " + taskID
-						+ " was running by some Gfac instance,but it failed");
-				log.info("This is an old Job, so copying data from old experiment location");
-				zk.create(newExpNode,
-						zk.getData(experimentEntry, false, exists1),
-						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+		} else if (experimentEntry == null) {  // this means this is a very new experiment
+			// are going to create a new node
+			log.info("This is a new Job, so creating all the experiment docs from the scratch");
+			Stat expParent = zk.exists(newExpNode, false);
+			zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+					CreateMode.PERSISTENT);
 
-				List<String> children = zk.getChildren(experimentEntry,
-						false);
-				for (String childNode1 : children) {
-					String level1 = experimentEntry + File.separator
-							+ childNode1;
-					Stat exists2 = zk.exists(level1, false); // no need to check exists
-					String newLeve1 = newExpNode + File.separator + childNode1;
-					log.info("Creating new znode: " + newLeve1); // these has to be info logs
-					zk.create(newLeve1, zk.getData(level1, false, exists2),
-							ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-					for (String childNode2 : zk.getChildren(level1, false)) {
-						String level2 = level1 + File.separator + childNode2;
-						Stat exists3 = zk.exists(level2, false); // no need to check exists
-						String newLeve2 = newLeve1 + File.separator
-								+ childNode2;
-						log.info("Creating new znode: " + newLeve2);
-						zk.create(newLeve2, zk.getData(level2, false, exists3),
-								ZooDefs.Ids.OPEN_ACL_UNSAFE,
-								CreateMode.PERSISTENT);
-					}
-				}
-				// After all the files are successfully transfered we delete the
-				// old experiment,otherwise we do
-				// not delete a single file
-				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
-				log.info("Deleting experiment data: " + experimentEntry);
-				ZKUtil.deleteRecursive(zk, experimentEntry);
-			}else {
-				log.error("ExperimentID: " + experimentID + " taskID: " + taskID
-						+ " is already running by this Gfac instance");
-				List<String> runningGfacNodeNames = AiravataZKUtils
-						.getAllGfacNodeNames(zk); // here we take old gfac servers
-				// too
-				for (String gfacServerNode : runningGfacNodeNames) {
-					if (!gfacServerNode.equals(pickedChild)) {
-						experimentEntry = experimentNode + File.separator
-								+ gfacServerNode + File.separator + experimentID
-								+ "+" + taskID;
-						break;
-					}
-				}
-				if(experimentEntry!=null) {
-					ZKUtil.deleteRecursive(zk, experimentEntry);
-				}
+			if (tokenId != null && expParent != null) {
+				zk.setData(newExpNode, tokenId.getBytes(),
+						expParent.getVersion());
 			}
+			String s = zk.create(newExpNode + File.separator + "state", String
+							.valueOf(GfacExperimentState.LAUNCHED.getValue())
+							.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+					CreateMode.PERSISTENT);
+			String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+					CreateMode.PERSISTENT);
+			zk.exists(s1, true);// we want to know when this node get deleted
+			String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
+					CreateMode.PERSISTENT);
+		} else {
+			log.error("ExperimentID: " + experimentID + " taskID: " + taskID
+					+ " was running by some Gfac instance,but it failed");
+			log.info("This is an old Job, so copying data from old experiment location");
+			zk.create(newExpNode,
+					zk.getData(experimentEntry, false, exists1),
+					ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
+			List<String> children = zk.getChildren(experimentEntry,
+					false);
+			for (String childNode1 : children) {
+				String level1 = experimentEntry + File.separator
+						+ childNode1;
+				Stat exists2 = zk.exists(level1, false); // no need to check exists
+				String newLeve1 = newExpNode + File.separator + childNode1;
+				log.info("Creating new znode: " + newLeve1); // these has to be info logs
+				zk.create(newLeve1, zk.getData(level1, false, exists2),
+						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+				for (String childNode2 : zk.getChildren(level1, false)) {
+					String level2 = level1 + File.separator + childNode2;
+					Stat exists3 = zk.exists(level2, false); // no need to check exists
+					String newLeve2 = newLeve1 + File.separator
+							+ childNode2;
+					log.info("Creating new znode: " + newLeve2);
+					zk.create(newLeve2, zk.getData(level2, false, exists3),
+							ZooDefs.Ids.OPEN_ACL_UNSAFE,
+							CreateMode.PERSISTENT);
+				}
+			}
+			// After all the files are successfully transfered we delete the
+			// old experiment,otherwise we do
+			// not delete a single file
+			log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+			log.info("Deleting experiment data: " + experimentEntry);
+			ZKUtil.deleteRecursive(zk, experimentEntry);
 		}
 		return true;
 	}