You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/20 21:14:12 UTC
airavata git commit: deleting delivery token
Repository: airavata
Updated Branches:
refs/heads/queue-gfac-rabbitmq cb6c4ccf2 -> 5310bb4b5
deleting delivery token
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5310bb4b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5310bb4b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5310bb4b
Branch: refs/heads/queue-gfac-rabbitmq
Commit: 5310bb4b570bd0b09208e710f121e88418261c5f
Parents: cb6c4cc
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Fri Mar 20 16:14:07 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Fri Mar 20 16:14:07 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/server/GfacServerHandler.java | 3 +
.../core/monitor/GfacInternalStatusUpdator.java | 2 +
.../airavata/gfac/core/utils/GFacUtils.java | 134 ++++++++-----------
3 files changed, 61 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index d45710e..855bfc5 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -59,6 +59,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
public class GfacServerHandler implements GfacService.Iface, Watcher {
@@ -79,6 +80,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
private static Integer mutex = -1;
+ private static Lock lock;
+
private MonitorPublisher publisher;
private String gfacServer;
http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index eaa3c5f..6c456b0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -98,6 +98,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
}
+ ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
ZKUtil.deleteRecursive(zk, experimentPath);
break;
case FAILED:
@@ -107,6 +108,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
consumer.sendAck(GFacUtils.getDeliveryTag(statusChangeRequest.getMonitorID().getExperimentID(),
monitorID.getTaskID(), zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)));
}
+ ZKUtil.deleteRecursive(zk,experimentPath+GFacUtils.DELIVERY_TAG_POSTFIX);
ZKUtil.deleteRecursive(zk, experimentPath);
break;
default:
http://git-wip-us.apache.org/repos/asf/airavata/blob/5310bb4b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 707cf97..a0fc2cb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1157,94 +1157,72 @@ public class GFacUtils {
// This method is dangerous because of moving the experiment data
public static boolean createExperimentEntryForPassive(String experimentID,
- String taskID, ZooKeeper zk, String experimentNode,
- String pickedChild, String tokenId,long deliveryTag) throws KeeperException,
+ String taskID, ZooKeeper zk, String experimentNode,
+ String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
InterruptedException, ApplicationSettingsException {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID
+ "+" + taskID;
Stat exists1 = zk.exists(newExpNode, false);
String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
- if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment
- // are going to create a new node
- log.info("This is a new Job, so creating all the experiment docs from the scratch");
- Stat expParent = zk.exists(newExpNode, false);
- zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- if (tokenId != null && expParent != null) {
- zk.setData(newExpNode, tokenId.getBytes(),
- expParent.getVersion());
- }
- String s = zk.create(newExpNode + File.separator + "state", String
- .valueOf(GfacExperimentState.LAUNCHED.getValue())
- .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- zk.exists(s1, true);// we want to know when this node get deleted
- String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
- CreateMode.PERSISTENT);
- }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){
- // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment
- // node to gfac node specific location, because original request execution will fail with errors
- log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !");
+ if (exists1 != null) {
+ log.error("This request is wrong because its already running in the same instance");
return false;
- } else if(experimentEntry != null && !GFacUtils.isCancelled(experimentID,taskID,zk)){
- if(ServerSettings.isGFacPassiveMode()){
- log.error("ExperimentID: " + experimentID + " taskID: " + taskID
- + " was running by some Gfac instance,but it failed");
- log.info("This is an old Job, so copying data from old experiment location");
- zk.create(newExpNode,
- zk.getData(experimentEntry, false, exists1),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } else if (experimentEntry == null) { // this means this is a very new experiment
+ // are going to create a new node
+ log.info("This is a new Job, so creating all the experiment docs from the scratch");
+ Stat expParent = zk.exists(newExpNode, false);
+ zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
- List<String> children = zk.getChildren(experimentEntry,
- false);
- for (String childNode1 : children) {
- String level1 = experimentEntry + File.separator
- + childNode1;
- Stat exists2 = zk.exists(level1, false); // no need to check exists
- String newLeve1 = newExpNode + File.separator + childNode1;
- log.info("Creating new znode: " + newLeve1); // these has to be info logs
- zk.create(newLeve1, zk.getData(level1, false, exists2),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- for (String childNode2 : zk.getChildren(level1, false)) {
- String level2 = level1 + File.separator + childNode2;
- Stat exists3 = zk.exists(level2, false); // no need to check exists
- String newLeve2 = newLeve1 + File.separator
- + childNode2;
- log.info("Creating new znode: " + newLeve2);
- zk.create(newLeve2, zk.getData(level2, false, exists3),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- }
- // After all the files are successfully transfered we delete the
- // old experiment,otherwise we do
- // not delete a single file
- log.info("After a successful copying of experiment data for an old experiment we delete the old data");
- log.info("Deleting experiment data: " + experimentEntry);
- ZKUtil.deleteRecursive(zk, experimentEntry);
- }else {
- log.error("ExperimentID: " + experimentID + " taskID: " + taskID
- + " is already running by this Gfac instance");
- List<String> runningGfacNodeNames = AiravataZKUtils
- .getAllGfacNodeNames(zk); // here we take old gfac servers
- // too
- for (String gfacServerNode : runningGfacNodeNames) {
- if (!gfacServerNode.equals(pickedChild)) {
- experimentEntry = experimentNode + File.separator
- + gfacServerNode + File.separator + experimentID
- + "+" + taskID;
- break;
- }
- }
- if(experimentEntry!=null) {
- ZKUtil.deleteRecursive(zk, experimentEntry);
- }
+ if (tokenId != null && expParent != null) {
+ zk.setData(newExpNode, tokenId.getBytes(),
+ expParent.getVersion());
}
+ String s = zk.create(newExpNode + File.separator + "state", String
+ .valueOf(GfacExperimentState.LAUNCHED.getValue())
+ .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ String s1 = zk.create(newExpNode + File.separator + "operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zk.exists(s1, true);// we want to know when this node get deleted
+ String s2 = zk.create(newExpNode + DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
+ CreateMode.PERSISTENT);
+ } else {
+ log.error("ExperimentID: " + experimentID + " taskID: " + taskID
+ + " was running by some Gfac instance,but it failed");
+ log.info("This is an old Job, so copying data from old experiment location");
+ zk.create(newExpNode,
+ zk.getData(experimentEntry, false, exists1),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ List<String> children = zk.getChildren(experimentEntry,
+ false);
+ for (String childNode1 : children) {
+ String level1 = experimentEntry + File.separator
+ + childNode1;
+ Stat exists2 = zk.exists(level1, false); // no need to check exists
+ String newLeve1 = newExpNode + File.separator + childNode1;
+ log.info("Creating new znode: " + newLeve1); // these has to be info logs
+ zk.create(newLeve1, zk.getData(level1, false, exists2),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ for (String childNode2 : zk.getChildren(level1, false)) {
+ String level2 = level1 + File.separator + childNode2;
+ Stat exists3 = zk.exists(level2, false); // no need to check exists
+ String newLeve2 = newLeve1 + File.separator
+ + childNode2;
+ log.info("Creating new znode: " + newLeve2);
+ zk.create(newLeve2, zk.getData(level2, false, exists3),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ }
+ // After all the files are successfully transfered we delete the
+ // old experiment,otherwise we do
+ // not delete a single file
+ log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+ log.info("Deleting experiment data: " + experimentEntry);
+ ZKUtil.deleteRecursive(zk, experimentEntry);
}
return true;
}