You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/28 17:37:39 UTC
[2/4] airavata git commit: Integrated Apache curator with Gfac. With
this fix every zookeeper call goes through CuratorFramework client.
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 0810bfd..f41f29c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -38,22 +38,37 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacHandlerState;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.messaging.event.TaskIdentifier;
import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.workspace.experiment.ActionableGroup;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.ErrorDetails;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,19 +77,33 @@ import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.xml.xpath.*;
-
-import java.io.*;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
//import org.apache.airavata.commons.gfac.type.ActualParameter;
public class GFacUtils {
private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+ public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private GFacUtils() {
}
@@ -268,11 +297,10 @@ public class GFacUtils {
return map;
}
- public static GfacExperimentState getZKExperimentState(ZooKeeper zk,
+ public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext)
- throws ApplicationSettingsException, KeeperException,
- InterruptedException {
- String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
+ throws Exception {
+ String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
.getExperimentID());
if (expState == null || expState.isEmpty()) {
return GfacExperimentState.UNKNOWN;
@@ -280,136 +308,86 @@ public class GFacUtils {
return GfacExperimentState.findByValue(Integer.valueOf(expState));
}
- public static int getZKExperimentStateValue(ZooKeeper zk,
- JobExecutionContext jobExecutionContext)
- throws ApplicationSettingsException, KeeperException,
- InterruptedException {
- String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
- .getExperimentID());
- if (expState == null) {
- return -1;
- }
- return Integer.parseInt(expState);
- }
-
- public static int getZKExperimentStateValue(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- Stat exists = zk.exists(fullPath+File.separator+"state", false);
- if (exists != null) {
- return Integer.parseInt(new String(zk.getData(fullPath+File.separator+"state", false, exists)));
- }
- return -1;
- }
-
- public static boolean createHandlerZnode(ZooKeeper zk,
+ public static boolean createHandlerZnode(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className)
- throws ApplicationSettingsException, KeeperException,
- InterruptedException {
+ throws Exception {
String expState = AiravataZKUtils.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(), className);
- Stat exists = zk.exists(expState, false);
+ Stat exists = curatorClient.checkExists().forPath(expState);
if (exists == null) {
- zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
} else {
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists == null) {
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
}
}
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists != null) {
- zk.setData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(GfacHandlerState.INVOKING.getValue())
- .getBytes(), exists.getVersion());
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
}
return true;
}
- public static boolean createHandlerZnode(ZooKeeper zk,
+ public static boolean createHandlerZnode(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className,
- GfacHandlerState state) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
+ GfacHandlerState state) throws Exception {
String expState = AiravataZKUtils.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(), className);
- Stat exists = zk.exists(expState, false);
+ Stat exists = curatorClient.checkExists().forPath(expState);
if (exists == null) {
- zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState, new byte[0]);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
} else {
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists == null) {
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
}
}
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists != null) {
- zk.setData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(state.getValue()).getBytes(),
- exists.getVersion());
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(state.getValue()).getBytes());
}
return true;
}
- public static boolean updateHandlerState(ZooKeeper zk,
+ public static boolean updateHandlerState(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className,
- GfacHandlerState state) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- if(zk.getState().isConnected()) {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
-
- Stat exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
- if (exists != null) {
- zk.setData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(state.getValue()).getBytes(),
- exists.getVersion());
- } else {
- createHandlerZnode(zk, jobExecutionContext, className, state);
- }
- return true;
+ GfacHandlerState state) throws Exception {
+ String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists != null) {
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+ } else {
+ createHandlerZnode(curatorClient, jobExecutionContext, className, state);
}
return false;
}
- public static GfacHandlerState getHandlerState(ZooKeeper zk,
+ public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className) {
try {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
-
- Stat exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists != null) {
- String stateVal = new String(zk.getData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false,
- exists));
- return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
- }
+ String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+ .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+ return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+ }
return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
// return false
} catch (Exception e) {
@@ -420,136 +398,104 @@ public class GFacUtils {
// This method is dangerous because of moving the experiment data
public static boolean createExperimentEntryForPassive(String experimentID,
- String taskID, ZooKeeper zk, String experimentNode,
- String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
- InterruptedException, ApplicationSettingsException {
+ String taskID, CuratorFramework curatorClient, String experimentNode,
+ String pickedChild, String tokenId, long deliveryTag) throws Exception {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExperimentPath = experimentPath + File.separator + experimentID;
- Stat exists1 = zk.exists(newExperimentPath, false);
- String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, zk);
+ Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+ String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
if (oldExperimentPath == null) { // this means this is a very new experiment
// are going to create a new node
log.info("This is a new Job, so creating all the experiment docs from the scratch");
-
- zk.create(newExperimentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- String s = zk.create(newExperimentPath + File.separator + "state", String
- .valueOf(GfacExperimentState.LAUNCHED.getValue())
- .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- if(zk.exists(s,false)!=null){
- log.info("Created the node: "+s+" successfully !");
- }else{
- log.error("Error creating node: "+s+" successfully !");
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+ String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + File.separator + "state",
+ String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+
+ if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+ log.info("Created the node: " + stateNodePath + " successfully !");
+ }else {
+ log.error("Error creating node: " + stateNodePath + " successfully !");
}
- zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
- CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
} else {
log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
- removeCancelDeliveryTagNode(oldExperimentPath, zk); // remove previous cancel deliveryTagNode
+ removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
if(newExperimentPath.equals(oldExperimentPath)){
log.info("Re-launch experiment came to the same GFac instance");
}else {
log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
- zk.create(newExperimentPath, zk.getData(oldExperimentPath, false, exists1),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // recursively copy children
- copyChildren(zk, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+ curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+ copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
- Stat exists = zk.exists(oldDeliveryTag, false);
+ Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
if(exists!=null) {
- zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
- zk.getData(oldDeliveryTag,null,exists),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- ZKUtil.deleteRecursive(zk,oldDeliveryTag);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+ curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
}
// After all the files are successfully transfered we delete the // old experiment,otherwise we do
// not delete a single file
log.info("After a successful copying of experiment data for an old experiment we delete the old data");
log.info("Deleting experiment data: " + oldExperimentPath);
- ZKUtil.deleteRecursive(zk, oldExperimentPath);
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
}
}
return true;
}
- private static void removeCancelDeliveryTagNode(String experimentPath, ZooKeeper zk) throws KeeperException, InterruptedException {
- Stat exists = zk.exists(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+ private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
+ Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
if (exists != null) {
- ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
- }
- }
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+ }
+ }
- private static void copyChildren(ZooKeeper zk, String oldPath, String newPath, int depth) throws KeeperException, InterruptedException {
- for (String childNode : zk.getChildren(oldPath, false)) {
+ private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
+ for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
String oldChildPath = oldPath + File.separator + childNode;
- Stat stat = zk.exists(oldChildPath, false); // no need to check exists
+ Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
String newChildPath = newPath + File.separator + childNode;
log.info("Creating new znode: " + newChildPath);
- zk.create(newChildPath, zk.getData(oldChildPath, false, stat), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- if (--depth > 0) {
- copyChildren(zk , oldChildPath, newChildPath, depth );
- }
- }
- }
-
- /**
- * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
- * through gfac-server nodes
- * @param experimentID
- * @param zk
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- public static String findExperimentEntry(String experimentID, ZooKeeper zk
- ) throws KeeperException,
- InterruptedException {
- String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- List<String> children = zk.getChildren(experimentNode, false);
- for(String pickedChild:children) {
- String experimentPath = experimentNode + File.separator + pickedChild;
- String newExpNode = experimentPath + File.separator + experimentID;
- Stat exists = zk.exists(newExpNode, false);
- if(exists == null){
- continue;
- }else{
- return newExpNode;
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+ if (--depth > 0) {
+ copyChildren(curatorClient , oldChildPath, newChildPath, depth );
}
}
- return null;
}
/**
* This will return a value if the server is down because we iterate through exisiting experiment nodes, not
* through gfac-server nodes
+ *
* @param experimentID
- * @param zk
+ * @param curatorClient
* @return
* @throws KeeperException
* @throws InterruptedException
*/
- public static String findExperimentEntryPassive(String experimentID, ZooKeeper zk
- ) throws KeeperException,
- InterruptedException {
+ public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- List<String> children = zk.getChildren(experimentNode, false);
- for(String pickedChild:children) {
+ List<String> children = curatorClient.getChildren().forPath(experimentNode);
+ for (String pickedChild : children) {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID;
- Stat exists = zk.exists(newExpNode, false);
- if(exists == null){
+ Stat exists = curatorClient.checkExists().forPath(newExpNode);
+ if (exists == null) {
continue;
- }else{
+ } else {
return newExpNode;
}
}
return null;
}
- public static boolean setExperimentCancel(String experimentId, ZooKeeper zk, long deliveryTag) throws KeeperException,
- InterruptedException {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+ public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
if (experimentEntry == null) {
// This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
@@ -557,51 +503,47 @@ public class GFacUtils {
return false;
} else {
// check cancel operation is being processed for the same experiment.
- Stat cancelState = zk.exists(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+ Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
if (cancelState != null) {
// another cancel operation is being processed. only one cancel operation can exist for a given experiment.
return false;
}
- zk.create(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // save cancel delivery tag to be acknowledge at the end.
- return true;
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+ return true;
}
}
- public static boolean isCancelled(String experimentID, ZooKeeper zk
- ) throws KeeperException,
- InterruptedException {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
-
+ public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
if(experimentEntry == null){
return false;
}else {
- Stat exists = zk.exists(experimentEntry, false);
+ Stat exists = curatorClient.checkExists().forPath(experimentEntry);
if (exists != null) {
- String operation = new String(zk.getData(experimentEntry+File.separator+"operation", false, exists));
- if ("cancel".equals(operation)) {
- return true;
- }
- }
- }
+ String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+ if ("cancel".equals(operation)) {
+ return true;
+ }
+ }
+ }
return false;
}
public static void saveHandlerData(JobExecutionContext jobExecutionContext,
StringBuffer data, String className) throws GFacHandlerException {
try {
- ZooKeeper zk = jobExecutionContext.getZk();
- if (zk != null) {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
String expZnodeHandlerPath = AiravataZKUtils
.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(),
className);
- Stat exists = zk.exists(expZnodeHandlerPath, false);
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
if (exists != null) {
- zk.setData(expZnodeHandlerPath, data.toString().getBytes(),
- exists.getVersion());
- } else {
+ curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+ } else {
log.error("Saving Handler data failed, Stat is null");
}
}
@@ -610,18 +552,15 @@ public class GFacUtils {
}
}
- public static String getHandlerData(JobExecutionContext jobExecutionContext,
- String className) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- ZooKeeper zk = jobExecutionContext.getZk();
- if (zk != null) {
+ public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
String expZnodeHandlerPath = AiravataZKUtils
.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(),
className);
- Stat exists = zk.exists(expZnodeHandlerPath, false);
- return new String(jobExecutionContext.getZk().getData(
- expZnodeHandlerPath, false, exists));
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+ return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
}
return null;
}
@@ -664,18 +603,6 @@ public class GFacUtils {
}
}
- public static GlobusJobSubmission getGlobusJobSubmission (String submissionId) throws AppCatalogException{
- return null;
-// try {
-// AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
-// return appCatalog.getComputeResource().getGlobus(submissionId);
-// }catch (Exception e){
-// String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
-// log.error(errorMsg, e);
-// throw new AppCatalogException(errorMsg, e);
-// }
- }
-
public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
try {
AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
@@ -687,17 +614,6 @@ public class GFacUtils {
}
}
- public static CloudJobSubmission getCloudJobSubmission (String submissionId) throws AppCatalogException{
- try {
- AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
- return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
- }catch (Exception e){
- String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
- log.error(errorMsg, e);
- throw new AppCatalogException(errorMsg, e);
- }
- }
-
/**
* To convert list to separated value
* @param listOfStrings
@@ -764,21 +680,21 @@ public class GFacUtils {
return false;
}
- public static boolean ackCancelRequest(String experimentId, ZooKeeper zk) throws KeeperException, InterruptedException {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+ public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
if (experimentEntry == null) {
// This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
- "This happen when experiment completed and already removed from the zookeeper");
+ "This happen when experiment completed and already removed from the CuratorFramework");
} else {
// check cancel operation is being processed for the same experiment.
- Stat cancelState = zk.exists(cancelNodePath, false);
+ Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
if (cancelState != null) {
- ZKUtil.deleteRecursive(zk,cancelNodePath);
- return true;
- }
- }
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+ return true;
+ }
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index c0784fb..540eaac 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -264,11 +264,7 @@ public class GSISSHProvider extends AbstractProvider {
}
return;
}
- } catch (ApplicationSettingsException e) {
- log.error("Error while recovering provider", e);
- } catch (KeeperException e) {
- log.error("Error while recovering provider", e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
log.error("Error while recovering provider", e);
}
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 15b7241..e53fe09 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -20,10 +20,8 @@
*/
package org.apache.airavata.gfac.monitor.util;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -34,15 +32,16 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
public class CommonUtils {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class);
@@ -189,53 +188,38 @@ public class CommonUtils {
/**
* Update job count for a given set of paths.
- * @param zk - zookeeper instance
+ * @param curatorClient - CuratorFramework instance
* @param changeCountMap - map of change job count with relevant path
* @param isAdd - Should add or reduce existing job count by the given job count.
*/
- public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer> changeCountMap, boolean isAdd) {
+ public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) {
StringBuilder changeZNodePaths = new StringBuilder();
try {
- if (zk == null || !zk.getState().isConnected()) {
- try {
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- countDownLatch.countDown();
- }
- });
- countDownLatch.await();
- } catch (ApplicationSettingsException e) {
- logger.error("Error while reading zookeeper hostport string");
- } catch (IOException e) {
- logger.error("Error while reconnect attempt to zookeeper where zookeeper connection loss state");
- }
- }
-
for (String path : changeCountMap.keySet()) {
if (isAdd) {
- CommonUtils.checkAndCreateZNode(zk, path);
+ CommonUtils.checkAndCreateZNode(curatorClient, path);
}
- byte[] byteData = zk.getData(path, null, null);
+ byte[] byteData = curatorClient.getData().forPath(path);
String nodeData;
if (byteData == null) {
if (isAdd) {
- zk.setData(path, String.valueOf(changeCountMap.get(path)).getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes());
} else {
// This is not possible, but we handle in case there any data zookeeper communication failure
logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
- zk.setData(path, "0".getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes());
}
} else {
nodeData = new String(byteData);
if (isAdd) {
- zk.setData(path, String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path,
+ String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes());
} else {
int previousCount = Integer.parseInt(nodeData);
int removeCount = changeCountMap.get(path);
if (previousCount >= removeCount) {
- zk.setData(path, String.valueOf(previousCount - removeCount).getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path,
+ String.valueOf(previousCount - removeCount).getBytes());
} else {
// This is not possible, do we need to reset the job count to 0 ?
logger.error("Requested remove job count is " + removeCount +
@@ -250,11 +234,9 @@ public class CommonUtils {
// update stat node to trigger orchestrator watchers
if (changeCountMap.size() > 0) {
changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
- zk.setData("/" + Constants.STAT, changeZNodePaths.toString().getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes());
}
- } catch (KeeperException e) {
- logger.error("Error while writing job count to zookeeper", e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
logger.error("Error while writing job count to zookeeper", e);
}
@@ -267,7 +249,7 @@ public class CommonUtils {
public static void increaseZkJobCount(MonitorID monitorID) {
Map<String, Integer> addMap = new HashMap<String, Integer>();
addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
- updateZkWithJobCount(monitorID.getJobExecutionContext().getZk(), addMap, true);
+ updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true);
}
/**
@@ -282,17 +264,17 @@ public class CommonUtils {
/**
* Check whether znode is exist in given path if not create a new znode
- * @param zk - zookeeper instance
+ * @param curatorClient - zookeeper instance
* @param path - path to check znode
* @throws KeeperException
* @throws InterruptedException
*/
- private static void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
- if (zk.exists(path, null) == null) { // if znode doesn't exist
+ private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception {
+ if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist
if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist
- checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+ checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/"))));
}
- zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 19ea3ac..f967dbf 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -410,11 +410,7 @@ public class SSHProvider extends AbstractProvider {
this.execute(jobExecutionContext);
return;
}
- } catch (ApplicationSettingsException e) {
- log.error("Error while recovering provider", e);
- } catch (KeeperException e) {
- log.error("Error while recovering provider", e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
log.error("Error while recovering provider", e);
}
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
deleted file mode 100644
index 9d40557..0000000
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.util;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.server.OrchestratorServerHandler;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-public class OrchestratorRecoveryHandler implements Watcher {
- private static Logger log = LoggerFactory.getLogger(OrchestratorRecoveryHandler.class);
-
- private ZooKeeper zk;
-
- private String gfacId;
-
- private static Integer mutex = -1;
-
- private OrchestratorServerHandler serverHandler;
-
- public OrchestratorRecoveryHandler(OrchestratorServerHandler handler, String zkExpPath) {
- this.zk = zk;
- int index = zkExpPath.split(File.separator).length - 1;
- this.gfacId = zkExpPath.split(File.separator)[index];
- this.serverHandler = handler;
- }
-
- /**
- * This method return the list of experimentId
- *
- * @return
- * @throws OrchestratorException
- * @throws ApplicationSettingsException
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
- log.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000);
- }
- List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
- + File.separator + gfacId, false);
- log.info("------------------ Recovering Experiments started ----------------------------------");
- for (String expId : children) {
- log.info("Recovering Experiment: " + expId.split("\\+")[0]);
- log.info("------------------------------------------------------------------------------------");
- try {
- if(GFacUtils.isCancelled(expId.split("\\+")[0], zk)) {// during relaunching we check the operation and then launch
- serverHandler.terminateExperiment(expId.split("\\+")[0], null);
- }else {
- serverHandler.launchExperiment(expId.split("\\+")[0], null);
- }
- // we do not move the old experiment in to new gfac node, gfac will do it
- } catch (Exception e) { // we attempt all the experiments
- log.error(e.getMessage(), e);
- }
- log.info("------------------------------------------------------------------------------------");
- }
- }
-
- synchronized public void process(WatchedEvent watchedEvent) {
- log.info(watchedEvent.getPath());
- synchronized (mutex) {
- Event.KeeperState state = watchedEvent.getState();
- switch (state) {
- case SyncConnected:
- mutex.notify();
- break;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 76b2a75..d57d85a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
<axiom.version>1.2.8</axiom.version>
<surefire.version>2.18.1</surefire.version>
<junit.version>4.12</junit.version>
- <curator.version>2.7.1</curator.version>
+ <curator.version>2.8.0</curator.version>
<xmlbeans.version>2.5.0</xmlbeans.version>
<xpp3.version>1.1.6</xpp3.version>
<xpp5.version>1.2.8</xpp5.version>
@@ -426,6 +426,11 @@
<artifactId>zookeeper</artifactId>
<version>${zk.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>