You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/28 17:37:38 UTC
[1/4] airavata git commit: Remove Zookeeper code and dependency from
Orchestrator.
Repository: airavata
Updated Branches:
refs/heads/master 5002df55a -> 4a0617802
Remove Zookeeper code and dependency from Orchestrator.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/915ed04d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/915ed04d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/915ed04d
Branch: refs/heads/master
Commit: 915ed04d54c61e083ef0f2b7dccb7083ef3fee9e
Parents: 5002df5
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Wed May 27 16:48:49 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Wed May 27 16:48:49 2015 -0400
----------------------------------------------------------------------
.../server/OrchestratorServerHandler.java | 149 +------------------
modules/orchestrator/orchestrator-core/pom.xml | 6 -
.../core/context/OrchestratorContext.java | 13 --
.../core/impl/GFACPassiveJobSubmitter.java | 25 ----
4 files changed, 3 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 1b3e1f3..4ef9dbc 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -76,40 +76,21 @@ import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowN
import org.apache.airavata.workflow.core.WorkflowEnactmentService;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.File;
-import java.io.IOException;
+
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-
-public class OrchestratorServerHandler implements OrchestratorService.Iface,
- Watcher {
- private static AiravataLogger log = AiravataLoggerFactory
- .getLogger(OrchestratorServerHandler.class);
+public class OrchestratorServerHandler implements OrchestratorService.Iface {
+ private static AiravataLogger log = AiravataLoggerFactory .getLogger(OrchestratorServerHandler.class);
private SimpleOrchestratorImpl orchestrator = null;
-
private Registry registry;
-
- private ZooKeeper zk;
-
private static Integer mutex = new Integer(-1);
-
private String airavataUserName;
private String gatewayName;
private Publisher publisher;
-
private RabbitMQProcessConsumer rabbitMQProcessConsumer;
private RabbitMQProcessPublisher rabbitMQProcessPublisher;
@@ -133,31 +114,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
// setGatewayName(ServerSettings.getDefaultUserGateway());
setAiravataUserName(ServerSettings.getDefaultUser());
- if(!ServerSettings.isGFacPassiveMode()) {
- try {
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is
- // required, this
- // will only use to
- // store some data
- log.info("Waiting for zookeeper to connect to the server");
-
- String OrchServer = ServerSettings
- .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
- registerOrchestratorService(airavataServerHostPort, OrchServer);
- // creating a watch in orchestrator to monitor the gfac
- // instances
- zk.getChildren(ServerSettings.getSetting(
- Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"),
- this);
- log.info("Finished starting ZK: " + zk);
- } catch (IOException|InterruptedException|KeeperException e) {
- log.error(e.getMessage(), e);
- throw new OrchestratorException("Error while initializing orchestrator service, Error in Zookeeper", e);
- }
- }
} catch (AiravataException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -169,7 +125,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
orchestrator = new SimpleOrchestratorImpl();
registry = RegistryFactory.getDefaultRegistry();
orchestrator.initialize();
- orchestrator.getOrchestratorContext().setZk(this.zk);
orchestrator.getOrchestratorContext().setPublisher(this.publisher);
startProcessConsumer();
} catch (OrchestratorException e) {
@@ -194,23 +149,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
}
- private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException {
- Stat zkStat = zk.exists(orchServer, false);
- if (zkStat == null) {
- zk.create(orchServer, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- String instantNode = orchServer
- + File.separator
- + String.valueOf(new Random()
- .nextInt(Integer.MAX_VALUE));
- zkStat = zk.exists(instantNode, false);
- if (zkStat == null) {
- zk.create(instantNode, airavataServerHostPort.getBytes(),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
- }
-
/**
* * After creating the experiment Data user have the * experimentID as the
* handler to the experiment, during the launchExperiment * We just have to
@@ -327,84 +265,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
return validateStatesAndCancel(experimentId, tokenId);
}
- /**
- * This method gracefully handler gfac node failures
- */
- synchronized public void process(WatchedEvent watchedEvent) {
- log.info(watchedEvent.getPath());
- synchronized (mutex) {
- try {
- Event.KeeperState state = watchedEvent.getState();
- switch (state) {
- case SyncConnected:
- mutex.notify();
- break;
- case Expired:
- case Disconnected:
- log.info("ZK Connection is "+ state.toString());
- break;
- }
- if (watchedEvent.getPath() != null
- && watchedEvent.getPath().startsWith(
- ServerSettings.getSetting(
- Constants.ZOOKEEPER_GFAC_SERVER_NODE,
- "/gfac-server"))) {
- List<String> children = zk.getChildren(ServerSettings
- .getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
- "/gfac-server"), true);
- for (String gfacNodes : children) {
- zk.exists(
- ServerSettings.getSetting(
- Constants.ZOOKEEPER_GFAC_SERVER_NODE,
- "/gfac-server")
- + File.separator + gfacNodes, this);
- }
- switch (watchedEvent.getType()) {
- case NodeCreated:
- mutex.notify();
- break;
- case NodeDeleted:
- // here we have to handle gfac node shutdown case
- if (children.size() == 0) {
- log.error("There are not gfac instances to route failed jobs");
- return;
- }
- // we recover one gfac node at a time
- final WatchedEvent event = watchedEvent;
- final OrchestratorServerHandler handler = this;
- /*(new Thread() { // disabling ft implementation with zk
- public void run() {
- int retry = 0;
- while (retry < 3) {
- try {
- (new OrchestratorRecoveryHandler(
- handler, event.getPath()))
- .recover();
- break;
- } catch (Exception e) {
- e.printStackTrace();
- log.error("error recovering the jobs for gfac-node: "
- + event.getPath());
- log.error("Retrying again to recover jobs and retry attempt: "
- + ++retry);
- }
- }
-
- }
- }).start();*/
- break;
- }
-
-
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
private String getAiravataUserName() {
return airavataUserName;
}
@@ -513,9 +373,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
try {
Experiment experiment = (Experiment) registry.get(
RegistryModelType.EXPERIMENT, experimentId);
- if (zk == null || !zk.getState().isConnected()){
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
- }
log.info("Waiting for zookeeper to connect to the server");
synchronized (mutex){
mutex.wait(5000);
http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 6cfc17f..a5c41ba 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -116,12 +116,6 @@ the License. -->
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
</dependency>
- <!-- zookeeper and curator dependencies -->
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zk.version}</version>
- </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index b77087b..f7f5969 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -34,13 +34,8 @@ import org.apache.zookeeper.ZooKeeper;
*/
public class OrchestratorContext {
private List<GFACInstance> gfacInstanceList;
-
private OrchestratorConfiguration orchestratorConfiguration;
-
private Registry newRegistry;
-
- private static ZooKeeper zk; // this instance can be accessed by the Validators and other components
-
private Publisher publisher;
public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
@@ -90,12 +85,4 @@ public class OrchestratorContext {
public void setGfacInstanceList(List<GFACInstance> gfacInstanceList) {
this.gfacInstanceList.addAll(gfacInstanceList);
}
-
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
-
- public static ZooKeeper getZk() {
- return zk;
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 1faef9e..36282a0 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -21,17 +21,11 @@
package org.apache.airavata.orchestrator.core.impl;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.client.GFacClientFactory;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.PublisherFactory;
@@ -41,17 +35,11 @@ import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
import java.util.UUID;
/**
@@ -59,13 +47,9 @@ import java.util.UUID;
*/
public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
private final static Logger logger = LoggerFactory.getLogger(GFACPassiveJobSubmitter.class);
-
public static final String IP = "ip";
-
private OrchestratorContext orchestratorContext;
-
private static Integer mutex = -1;
-
private Publisher publisher;
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
@@ -160,15 +144,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
}
}
- private void closeZK(OrchestratorContext orchestratorContext) {
- try {
- if(orchestratorContext!=null && orchestratorContext.getZk()!=null) {
- orchestratorContext.getZk().close();
- }
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- }
- }
synchronized public void process(WatchedEvent event) {
logger.info(getClass().getName() + event.getPath());
logger.info(getClass().getName()+event.getType());
[3/4] airavata git commit: Integrated Apache curator with Gfac. With
this fix every zookeeper call goes through CuratorFramework client.
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 2cd9ecb..200ffbe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -61,6 +61,8 @@ import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -80,57 +82,44 @@ import java.util.*;
* This is the GFac CPI class for external usage, this simply have a single method to submit a job to
* the resource, required data for the job has to be stored in registry prior to invoke this object.
*/
-public class BetterGfacImpl implements GFac,Watcher {
+public class BetterGfacImpl implements GFac {
private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
public static final String ERROR_SENT = "ErrorSent";
private Registry registry;
- // we are not storing zk instance in to jobExecution context
- private ZooKeeper zk;
+ private CuratorFramework curatorClient;
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static File gfacConfigFile;
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private static MonitorPublisher monitorPublisher;
- private static Integer mutex = -1;
/**
* Constructor for GFac
*
* @param registry
- * @param zooKeeper
+ * @param curatorClient
*/
- public BetterGfacImpl(Registry registry, AppCatalog appCatalog, ZooKeeper zooKeeper,
+ public BetterGfacImpl(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
this.registry = registry;
monitorPublisher = publisher; // This is a EventBus common for gfac
- this.zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- log.info("Waiting until zookeeper client connect to the server...");
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
+ this.curatorClient = curatorClient;
}
- public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+ public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+
+ RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
try {
String[] listenerClassList = ServerSettings.getActivityListeners();
Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-
for (String listenerClass : listenerClassList) {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher,rabbitMQTaskLaunchConsumer);
+ abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
- } catch (ClassNotFoundException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (InstantiationException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (IllegalAccessException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (ApplicationSettingsException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (AiravataException e) {
+ } catch (Exception e) {
log.error("Error loading the listener classes configured in airavata-server.properties", e);
}
}
@@ -151,26 +140,9 @@ public class BetterGfacImpl implements GFac,Watcher {
threadedHandler.initProperties(handlerConfig.getProperties());
daemonHandlers.add(threadedHandler);
}
- } catch (ParserConfigurationException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (IOException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (SAXException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (XPathExpressionException e) {
+ } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
+ InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (ClassNotFoundException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
- } catch (InstantiationException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
- } catch (GFacHandlerException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
- } catch (IllegalAccessException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
}
for (ThreadedHandler tHandler : daemonHandlers) {
(new Thread(tHandler)).start();
@@ -184,13 +156,13 @@ public class BetterGfacImpl implements GFac,Watcher {
daemonHandlers = new ArrayList<ThreadedHandler>();
startDaemonHandlers();
}
-
+
public BetterGfacImpl(Registry registry) {
- this();
- this.registry = registry;
+ this();
+ this.registry = registry;
}
-
+
/**
* This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
* And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
@@ -206,13 +178,13 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setCredentialStoreToken(tokenId);
return submitJob(jobExecutionContext);
} catch (Exception e) {
- log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
+ log.error("Error inovoking the job with experiment ID: " + experimentID + ":" + e.getMessage());
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
// FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
// task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
- if(jobExecutionContext!=null){
+ if (jobExecutionContext != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -222,8 +194,6 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(event);
}
throw new GFacException(e);
- }finally {
- closeZK(jobExecutionContext);
}
}
@@ -237,7 +207,7 @@ public class BetterGfacImpl implements GFac,Watcher {
* 1. Get the Task from the task ID and construct the Job object and save it in to registry
* 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
*/
-
+
//Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
@@ -289,11 +259,11 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setTaskData(taskData);
jobExecutionContext.setGatewayID(gatewayID);
jobExecutionContext.setAppCatalog(appCatalog);
-
-
+
+
List<JobDetails> jobDetailsList = taskData.getJobDetailsList();
//FIXME: Following for loop only set last jobDetails element to the jobExecutionContext
- for(JobDetails jDetails:jobDetailsList){
+ for (JobDetails jDetails : jobDetailsList) {
jobExecutionContext.setJobDetails(jDetails);
}
// setting the registry
@@ -314,11 +284,11 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
jobExecutionContext.setGfac(this);
- jobExecutionContext.setZk(zk);
+ jobExecutionContext.setCuratorClient(curatorClient);
// handle job submission protocol
List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
- if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
@Override
public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
@@ -327,7 +297,7 @@ public class BetterGfacImpl implements GFac,Watcher {
});
jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
- }else {
+ } else {
throw new GFacException("Compute resource should have at least one job submission interface defined...");
}
// handle data movement protocol
@@ -346,7 +316,7 @@ public class BetterGfacImpl implements GFac,Watcher {
populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
populateResourceJobManager(jobExecutionContext);
// if gateway resource preference is set
- if (gatewayResourcePreferences != null ) {
+ if (gatewayResourcePreferences != null) {
if (gatewayResourcePreferences.getScratchLocation() == null) {
gatewayResourcePreferences.setScratchLocation("/tmp");
}
@@ -365,7 +335,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- if(gatewayResourcePreferences.getLoginUserName() != null){
+ if (gatewayResourcePreferences.getLoginUserName() != null) {
jobExecutionContext.setLoginUserName(gatewayResourcePreferences.getLoginUserName());
}
@@ -375,45 +345,45 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
} else {
- // this check is to avoid NPE when job submission endpoints do
- // not contain any data movement interfaces.
- if((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
- for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
- if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
- jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
- break;
- }
- }
- }
+ // this check is to avoid NPE when job submission endpoints do
+ // not contain any data movement interfaces.
+ if ((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
+ for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
+ if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
+ jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
+ break;
+ }
+ }
+ }
}
- } else {
+ } else {
setUpWorkingLocation(jobExecutionContext, applicationInterface, "/tmp");
}
List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
- if (taskOutputs == null || taskOutputs.isEmpty() ){
+ if (taskOutputs == null || taskOutputs.isEmpty()) {
taskOutputs = applicationInterface.getApplicationOutputs();
}
- for (OutputDataObjectType objectType : taskOutputs){
- if (objectType.getType() == DataType.URI && objectType.getValue() != null){
+ for (OutputDataObjectType objectType : taskOutputs) {
+ if (objectType.getType() == DataType.URI && objectType.getValue() != null) {
String filePath = objectType.getValue();
// if output is not in working folder
if (objectType.getLocation() != null && !objectType.getLocation().isEmpty()) {
- if(objectType.getLocation().startsWith(File.separator)){
- filePath = objectType.getLocation() + File.separator + filePath;
- }else{
- filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
+ if (objectType.getLocation().startsWith(File.separator)) {
+ filePath = objectType.getLocation() + File.separator + filePath;
+ } else {
+ filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
}
- }else{
- filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
+ } else {
+ filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
}
objectType.setValue(filePath);
-
+
}
- if (objectType.getType() == DataType.STDOUT){
+ if (objectType.getType() == DataType.STDOUT) {
objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
}
- if (objectType.getType() == DataType.STDERR){
+ if (objectType.getType() == DataType.STDERR) {
objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
}
}
@@ -464,23 +434,23 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void populateResourceJobManager (JobExecutionContext jobExecutionContext) {
+ private void populateResourceJobManager(JobExecutionContext jobExecutionContext) {
try {
JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
if (submissionProtocol == JobSubmissionProtocol.SSH) {
SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
- if (sshJobSubmission != null){
+ if (sshJobSubmission != null) {
jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
}
- } else if (submissionProtocol == JobSubmissionProtocol.LOCAL){
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
- if (localJobSubmission != null){
+ if (localJobSubmission != null) {
jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
}
}
} catch (AppCatalogException e) {
- log.error("Error occured while retrieving job submission interface", e);
+ log.error("Error occured while retrieving job submission interface", e);
}
}
@@ -488,7 +458,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// We need to check whether this job is submitted as a part of a large workflow. If yes,
// we need to setup workflow tracking listerner.
try {
- GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
// Register log event listener. This is required in all scenarios.
if (isNewJob(gfacExpState)) {
// In this scenario We do everything from the beginning
@@ -497,8 +467,8 @@ public class BetterGfacImpl implements GFac,Watcher {
launch(jobExecutionContext);
} else if (isCompletedJob(gfacExpState)) {
log.info("There is nothing to recover in this job so we do not re-submit");
- ZKUtil.deleteRecursive(zk,
- AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+ AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()), true);
} else {
// Now we know this is an old Job, so we have to handle things gracefully
log.info("Re-launching the job in GFac because this is re-submitted to GFac");
@@ -547,7 +517,7 @@ public class BetterGfacImpl implements GFac,Watcher {
private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
try {
- GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
String workflowInstanceID = null;
if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
//todo implement WorkflowTrackingListener properly
@@ -575,27 +545,25 @@ public class BetterGfacImpl implements GFac,Watcher {
//
// }
return true;
- }catch(Exception e){
- log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
- throw new GFacException(e.getMessage(), e);
- }finally{
- closeZK(jobExecutionContext);
- }
+ } catch (Exception e) {
+ log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+ throw new GFacException(e.getMessage(), e);
}
+ }
- private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
- // Scheduler will decide the execution flow of handlers and provider
- // which handles
- // the job.
- String experimentID = jobExecutionContext.getExperimentID();
- try {
- Scheduler.schedule(jobExecutionContext);
+ private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
+ // Scheduler will decide the execution flow of handlers and provider
+ // which handles
+ // the job.
+ String experimentID = jobExecutionContext.getExperimentID();
+ try {
+ Scheduler.schedule(jobExecutionContext);
- // Executing in handlers in the order as they have configured in
- // GFac configuration
- // here we do not skip handler if some handler does not have to be
- // run again during re-run it can implement
- // that logic in to the handler
+ // Executing in handlers in the order as they have configured in
+ // GFac configuration
+ // here we do not skip handler if some handler does not have to be
+ // run again during re-run it can implement
+ // that logic in to the handler
// After executing the in handlers provider instance should be set
// to job execution context.
@@ -637,31 +605,31 @@ public class BetterGfacImpl implements GFac,Watcher {
default:
throw new GFacException("Un-handled GfacExperimentState : " + state.name());
}
- } catch (Exception e) {
- log.error(e.getMessage(),e);
- try {
- // we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
JobIdentifier jobIdentity = new JobIdentifier(
jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
- } catch (NullPointerException e1) {
- log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
- + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
+ monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- }
- jobExecutionContext.setProperty(ERROR_SENT, "true");
- jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
}
}
@@ -678,34 +646,34 @@ public class BetterGfacImpl implements GFac,Watcher {
}
private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
- // Scheduler will decide the execution flow of handlers and provider
- // which handles
- // the job.
- try {
- Scheduler.schedule(jobExecutionContext);
-
- // Executing in handlers in the order as they have configured in
- // GFac configuration
- // here we do not skip handler if some handler does not have to be
- // run again during re-run it can implement
- // that logic in to the handler
+ // Scheduler will decide the execution flow of handlers and provider
+ // which handles
+ // the job.
+ try {
+ Scheduler.schedule(jobExecutionContext);
+
+ // Executing in handlers in the order as they have configured in
+ // GFac configuration
+ // here we do not skip handler if some handler does not have to be
+ // run again during re-run it can implement
+ // that logic in to the handler
if (!isCancelling(jobExecutionContext)) {
invokeInFlowHandlers(jobExecutionContext); // to keep the
// consistency we always
// try to re-run to
// avoid complexity
- }else{
+ } else {
log.info("Experiment is cancelled, so launch operation is stopping immediately");
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
}
// if (experimentID != null){
- // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
- // }
+ // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+ // }
- // After executing the in handlers provider instance should be set
- // to job execution context.
- // We get the provider instance and execute it.
+ // After executing the in handlers provider instance should be set
+ // to job execution context.
+ // We get the provider instance and execute it.
if (!isCancelling(jobExecutionContext)) {
invokeProviderExecute(jobExecutionContext);
} else {
@@ -713,53 +681,53 @@ public class BetterGfacImpl implements GFac,Watcher {
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
return;
}
- } catch (Exception e) {
- try {
- // we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- // monitorPublisher.publish(new
- // ExperimentStatusChangedEvent(new
- // ExperimentIdentity(jobExecutionContext.getExperimentID()),
- // ExperimentState.FAILED));
- // Updating the task status if there's any task associated
- // monitorPublisher.publish(new TaskStatusChangeRequest(
- // new TaskIdentity(jobExecutionContext.getExperimentID(),
- // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- // jobExecutionContext.getTaskData().getTaskID()),
- // TaskState.FAILED
- // ));
+ } catch (Exception e) {
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ // monitorPublisher.publish(new
+ // ExperimentStatusChangedEvent(new
+ // ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ // ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
+ // monitorPublisher.publish(new TaskStatusChangeRequest(
+ // new TaskIdentity(jobExecutionContext.getExperimentID(),
+ // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ // jobExecutionContext.getTaskData().getTaskID()),
+ // TaskState.FAILED
+ // ));
JobIdentifier jobIdentity = new JobIdentifier(
- jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
- } catch (NullPointerException e1) {
- log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
- + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
- //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
- // Updating the task status if there's any task associated
+ monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
- }
- jobExecutionContext.setProperty(ERROR_SENT, "true");
- jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
- }
+ }
}
- private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
+ private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
@@ -767,13 +735,13 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+ private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
if (submit) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
@@ -781,18 +749,16 @@ public class BetterGfacImpl implements GFac,Watcher {
} else {
provider.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
} else {
disposeProvider(provider, jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
}
- if (GFacUtils.isSynchronousMode(jobExecutionContext))
-
- {
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
invokeOutFlowHandlers(jobExecutionContext);
}
@@ -812,12 +778,12 @@ public class BetterGfacImpl implements GFac,Watcher {
}
// TODO - Did refactoring, but need to recheck the logic again.
- private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+ private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
initProvider(provider, jobExecutionContext);
cancelProvider(provider, jobExecutionContext);
@@ -825,7 +791,7 @@ public class BetterGfacImpl implements GFac,Watcher {
} else {
provider.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
@@ -885,11 +851,11 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if(!isCancelling(jobExecutionContext)) {
+ if (!isCancelling(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
handler.initProperties(handlerClassName.getProperties());
@@ -902,12 +868,12 @@ public class BetterGfacImpl implements GFac,Watcher {
}
try {
handler.invoke(jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
// if exception thrown before that we do not make it finished
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
}
- }else{
+ } else {
log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
break;
@@ -916,38 +882,18 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKED));
} catch (Exception e) {
- throw new GFacException("Error Invoking Handlers:"+e.getMessage(), e);
+ throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
}
}
public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
String experimentPath = null;
try {
- try {
- if(jobExecutionContext.getZk()!=null){
- closeZK(jobExecutionContext);
- }
- jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
- zk = jobExecutionContext.getZk();
- log.info("Waiting until zookeeper client connect to the server...");
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
- experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
- if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
- log.error("Experiment is already finalized so no output handlers will be invoked");
- return;
- }
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- } catch (KeeperException e) {
- log.error(e.getMessage(), e);
+ experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+ if (curatorClient.checkExists().forPath(experimentPath) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
}
-
GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
List<GFacHandlerConfig> handlers = null;
if (gFacConfiguration != null) {
@@ -968,23 +914,20 @@ public class BetterGfacImpl implements GFac,Watcher {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
- GFacUtils.createHandlerZnode(jobExecutionContext.getZk(), jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
handler.initProperties(handlerClassName.getProperties());
} catch (ClassNotFoundException e) {
log.error(e.getMessage());
throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
log.error(e.getMessage());
throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
}
try {
handler.invoke(jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (Exception e) {
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
try {
@@ -1008,10 +951,8 @@ public class BetterGfacImpl implements GFac,Watcher {
} catch (Exception e) {
throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
}
- }catch (Exception e){
+ } catch (Exception e) {
throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
- } finally{
- closeZK(jobExecutionContext);
}
// At this point all the execution is finished so we update the task and experiment statuses.
@@ -1029,16 +970,6 @@ public class BetterGfacImpl implements GFac,Watcher {
}
- private void closeZK(JobExecutionContext jobExecutionContext) {
- try {
- if(jobExecutionContext!=null && jobExecutionContext.getZk()!=null) {
- jobExecutionContext.getZk().close();
- }
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- }
- }
-
/**
* If handlers ran successfully we re-run only recoverable handlers
* If handler never ran we run the normal invoke method
@@ -1058,8 +989,8 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
handler.initProperties(handlerClassName.getProperties());
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
@@ -1069,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
handler.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
} catch (ClassNotFoundException e) {
@@ -1086,7 +1017,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1117,8 +1048,8 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
handler.initProperties(handlerClassName.getProperties());
@@ -1127,12 +1058,12 @@ public class BetterGfacImpl implements GFac,Watcher {
// if these already ran we re-run only recoverable handlers
handler.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (ClassNotFoundException e) {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1142,7 +1073,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1152,7 +1083,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1163,13 +1094,11 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
throw new GFacException("Error Executing a OutFlow Handler", e);
- }finally {
- closeZK(jobExecutionContext);
}
}
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
@@ -1214,23 +1143,14 @@ public class BetterGfacImpl implements GFac,Watcher {
return registry;
}
- public ZooKeeper getZk() {
- return zk;
- }
-
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
-
-
- public boolean isCancelled(JobExecutionContext executionContext){
+ public boolean isCancelled(JobExecutionContext executionContext) {
// we should check whether experiment is cancelled using registry
try {
- ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
- if (status != null){
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null) {
ExperimentState experimentState = status.getExperimentState();
- if (experimentState != null){
- if(experimentState == ExperimentState.CANCELED){
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELED) {
return true;
}
}
@@ -1241,14 +1161,14 @@ public class BetterGfacImpl implements GFac,Watcher {
return false;
}
- public boolean isCancelling(JobExecutionContext executionContext){
+ public boolean isCancelling(JobExecutionContext executionContext) {
// check whether cancelling request came
try {
- ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
- if (status != null){
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null) {
ExperimentState experimentState = status.getExperimentState();
- if (experimentState != null){
- if(experimentState == ExperimentState.CANCELING){
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELING) {
return true;
}
}
@@ -1276,43 +1196,4 @@ public class BetterGfacImpl implements GFac,Watcher {
return false;
}
- public void process(WatchedEvent watchedEvent) {
- log.info(watchedEvent.getPath());
- if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
- // node data is changed, this means node is cancelled.
- log.info("Experiment is cancelled with this path:" + watchedEvent.getPath());
- }
- synchronized (mutex) {
- Event.KeeperState state = watchedEvent.getState();
- log.info(state.name());
- switch (state) {
- case SyncConnected:
- mutex.notify();
- break;
- case Expired:
- try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- }
-// synchronized (mutex) {
-// mutex.wait(5000); // waiting for the syncConnected event
-// }
- case Disconnected:
-// try {
-// zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-// } catch (IOException e) {
-// log.error(e.getMessage(), e);
-// } catch (ApplicationSettingsException e) {
-// log.error(e.getMessage(), e);
-// }
-// synchronized (mutex) {
-// mutex.wait(5000); // waiting for the syncConnected event
-// }
- log.info("ZK Connection is " + state.toString());
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index ee28c1d..eef0a33 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -48,7 +48,7 @@ public abstract class AbstractHandler implements GFacHandler {
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
try {
- GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+ GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
} catch (Exception e) {
logger.error("Error saving Recoverable provider state", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 58ef855..c6ada52 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -34,7 +34,7 @@ public class AppDescriptorCheckHandler implements GFacHandler {
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
logger.info("Invoking ApplicationDescriptorCheckHandler ...");
try {
- GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+ GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
} catch (Exception e) {
logger.info("Error saving plugin status to ZK");
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 836e2c6..84d72fa 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -20,31 +20,28 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
+import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProducer;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
- private ZooKeeper zk;
+ private CuratorFramework curatorClient;
private static Integer mutex = -1;
@@ -57,39 +54,20 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
+ File.separator + statusChangeRequest.getMonitorID().getExperimentID();
Stat exists = null;
if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState()))) {
- try {
- if (!zk.getState().isConnected()) {
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
- logger.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000);
- }
- }
- exists = zk.exists(experimentPath, false);
- if (exists == null) {
- logger.error("ZK path: " + experimentPath + " does not exists !!");
- logger.error("Zookeeper is in an inconsistent state !!! ");
- return;
- }
- } catch (KeeperException e) {
- logger.error("Error while updating zk", e);
- throw new Exception(e.getMessage(), e);
- } catch (InterruptedException e) {
- logger.error("Error while updating zk", e);
- throw new Exception(e.getMessage(), e);
- } catch (IOException e) {
- logger.error("Error while updating zk", e);
- throw new Exception(e.getMessage(), e);
+ exists = curatorClient.checkExists().forPath(experimentPath);
+ if (exists == null) {
+ logger.error("ZK path: " + experimentPath + " does not exists !!");
+ return;
}
- Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (state == null) {
// state znode has to be created
- zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
+ forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
} else {
- zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+ curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
}
}
switch (statusChangeRequest.getState()) {
@@ -107,8 +85,8 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
public void setup(Object... configurations) {
for (Object configuration : configurations) {
- if (configuration instanceof ZooKeeper) {
- this.zk = (ZooKeeper) configuration;
+ if (configuration instanceof CuratorFramework) {
+ this.curatorClient = (CuratorFramework) configuration;
}
}
}
[2/4] airavata git commit: Integrated Apache curator with Gfac. With
this fix every zookeeper call goes through CuratorFramework client.
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 0810bfd..f41f29c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -38,22 +38,37 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacHandlerState;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
import org.apache.airavata.model.messaging.event.JobIdentifier;
import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
import org.apache.airavata.model.messaging.event.TaskIdentifier;
import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.workspace.experiment.ActionableGroup;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.ErrorDetails;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,19 +77,33 @@ import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.xml.xpath.*;
-
-import java.io.*;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
//import org.apache.airavata.commons.gfac.type.ActualParameter;
public class GFacUtils {
private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+ public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private GFacUtils() {
}
@@ -268,11 +297,10 @@ public class GFacUtils {
return map;
}
- public static GfacExperimentState getZKExperimentState(ZooKeeper zk,
+ public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext)
- throws ApplicationSettingsException, KeeperException,
- InterruptedException {
- String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
+ throws Exception {
+ String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
.getExperimentID());
if (expState == null || expState.isEmpty()) {
return GfacExperimentState.UNKNOWN;
@@ -280,136 +308,86 @@ public class GFacUtils {
return GfacExperimentState.findByValue(Integer.valueOf(expState));
}
- public static int getZKExperimentStateValue(ZooKeeper zk,
- JobExecutionContext jobExecutionContext)
- throws ApplicationSettingsException, KeeperException,
- InterruptedException {
- String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
- .getExperimentID());
- if (expState == null) {
- return -1;
- }
- return Integer.parseInt(expState);
- }
-
- public static int getZKExperimentStateValue(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- Stat exists = zk.exists(fullPath+File.separator+"state", false);
- if (exists != null) {
- return Integer.parseInt(new String(zk.getData(fullPath+File.separator+"state", false, exists)));
- }
- return -1;
- }
-
- public static boolean createHandlerZnode(ZooKeeper zk,
+ public static boolean createHandlerZnode(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className)
- throws ApplicationSettingsException, KeeperException,
- InterruptedException {
+ throws Exception {
String expState = AiravataZKUtils.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(), className);
- Stat exists = zk.exists(expState, false);
+ Stat exists = curatorClient.checkExists().forPath(expState);
if (exists == null) {
- zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
} else {
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists == null) {
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
}
}
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists != null) {
- zk.setData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(GfacHandlerState.INVOKING.getValue())
- .getBytes(), exists.getVersion());
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
}
return true;
}
- public static boolean createHandlerZnode(ZooKeeper zk,
+ public static boolean createHandlerZnode(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className,
- GfacHandlerState state) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
+ GfacHandlerState state) throws Exception {
String expState = AiravataZKUtils.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(), className);
- Stat exists = zk.exists(expState, false);
+ Stat exists = curatorClient.checkExists().forPath(expState);
if (exists == null) {
- zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState, new byte[0]);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
} else {
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists == null) {
- zk.create(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
}
}
- exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists != null) {
- zk.setData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(state.getValue()).getBytes(),
- exists.getVersion());
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(state.getValue()).getBytes());
}
return true;
}
- public static boolean updateHandlerState(ZooKeeper zk,
+ public static boolean updateHandlerState(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className,
- GfacHandlerState state) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- if(zk.getState().isConnected()) {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
-
- Stat exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
- if (exists != null) {
- zk.setData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(state.getValue()).getBytes(),
- exists.getVersion());
- } else {
- createHandlerZnode(zk, jobExecutionContext, className, state);
- }
- return true;
+ GfacHandlerState state) throws Exception {
+ String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+ jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+ if (exists != null) {
+ curatorClient.setData().withVersion(exists.getVersion())
+ .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+ } else {
+ createHandlerZnode(curatorClient, jobExecutionContext, className, state);
}
return false;
}
- public static GfacHandlerState getHandlerState(ZooKeeper zk,
+ public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
JobExecutionContext jobExecutionContext, String className) {
try {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(
- jobExecutionContext.getExperimentID(), className);
-
- Stat exists = zk.exists(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+ Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (exists != null) {
- String stateVal = new String(zk.getData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false,
- exists));
- return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
- }
+ String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+ .forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+ return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+ }
return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
// return false
} catch (Exception e) {
@@ -420,136 +398,104 @@ public class GFacUtils {
// This method is dangerous because of moving the experiment data
public static boolean createExperimentEntryForPassive(String experimentID,
- String taskID, ZooKeeper zk, String experimentNode,
- String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
- InterruptedException, ApplicationSettingsException {
+ String taskID, CuratorFramework curatorClient, String experimentNode,
+ String pickedChild, String tokenId, long deliveryTag) throws Exception {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExperimentPath = experimentPath + File.separator + experimentID;
- Stat exists1 = zk.exists(newExperimentPath, false);
- String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, zk);
+ Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+ String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
if (oldExperimentPath == null) { // this means this is a very new experiment
// are going to create a new node
log.info("This is a new Job, so creating all the experiment docs from the scratch");
-
- zk.create(newExperimentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- String s = zk.create(newExperimentPath + File.separator + "state", String
- .valueOf(GfacExperimentState.LAUNCHED.getValue())
- .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- if(zk.exists(s,false)!=null){
- log.info("Created the node: "+s+" successfully !");
- }else{
- log.error("Error creating node: "+s+" successfully !");
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+ String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + File.separator + "state",
+ String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+
+ if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+ log.info("Created the node: " + stateNodePath + " successfully !");
+ }else {
+ log.error("Error creating node: " + stateNodePath + " successfully !");
}
- zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE, // here we store the value of delivery message
- CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
} else {
log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
- removeCancelDeliveryTagNode(oldExperimentPath, zk); // remove previous cancel deliveryTagNode
+ removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
if(newExperimentPath.equals(oldExperimentPath)){
log.info("Re-launch experiment came to the same GFac instance");
}else {
log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
- zk.create(newExperimentPath, zk.getData(oldExperimentPath, false, exists1),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // recursively copy children
- copyChildren(zk, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+ curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+ copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
- Stat exists = zk.exists(oldDeliveryTag, false);
+ Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
if(exists!=null) {
- zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
- zk.getData(oldDeliveryTag,null,exists),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- ZKUtil.deleteRecursive(zk,oldDeliveryTag);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+ curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
}
// After all the files are successfully transfered we delete the // old experiment,otherwise we do
// not delete a single file
log.info("After a successful copying of experiment data for an old experiment we delete the old data");
log.info("Deleting experiment data: " + oldExperimentPath);
- ZKUtil.deleteRecursive(zk, oldExperimentPath);
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
}
}
return true;
}
- private static void removeCancelDeliveryTagNode(String experimentPath, ZooKeeper zk) throws KeeperException, InterruptedException {
- Stat exists = zk.exists(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+ private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
+ Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
if (exists != null) {
- ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
- }
- }
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+ }
+ }
- private static void copyChildren(ZooKeeper zk, String oldPath, String newPath, int depth) throws KeeperException, InterruptedException {
- for (String childNode : zk.getChildren(oldPath, false)) {
+ private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
+ for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
String oldChildPath = oldPath + File.separator + childNode;
- Stat stat = zk.exists(oldChildPath, false); // no need to check exists
+ Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
String newChildPath = newPath + File.separator + childNode;
log.info("Creating new znode: " + newChildPath);
- zk.create(newChildPath, zk.getData(oldChildPath, false, stat), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- if (--depth > 0) {
- copyChildren(zk , oldChildPath, newChildPath, depth );
- }
- }
- }
-
- /**
- * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
- * through gfac-server nodes
- * @param experimentID
- * @param zk
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
- public static String findExperimentEntry(String experimentID, ZooKeeper zk
- ) throws KeeperException,
- InterruptedException {
- String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- List<String> children = zk.getChildren(experimentNode, false);
- for(String pickedChild:children) {
- String experimentPath = experimentNode + File.separator + pickedChild;
- String newExpNode = experimentPath + File.separator + experimentID;
- Stat exists = zk.exists(newExpNode, false);
- if(exists == null){
- continue;
- }else{
- return newExpNode;
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+ if (--depth > 0) {
+ copyChildren(curatorClient , oldChildPath, newChildPath, depth );
}
}
- return null;
}
/**
* This will return a value if the server is down because we iterate through exisiting experiment nodes, not
* through gfac-server nodes
+ *
* @param experimentID
- * @param zk
+ * @param curatorClient
* @return
* @throws KeeperException
* @throws InterruptedException
*/
- public static String findExperimentEntryPassive(String experimentID, ZooKeeper zk
- ) throws KeeperException,
- InterruptedException {
+ public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- List<String> children = zk.getChildren(experimentNode, false);
- for(String pickedChild:children) {
+ List<String> children = curatorClient.getChildren().forPath(experimentNode);
+ for (String pickedChild : children) {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID;
- Stat exists = zk.exists(newExpNode, false);
- if(exists == null){
+ Stat exists = curatorClient.checkExists().forPath(newExpNode);
+ if (exists == null) {
continue;
- }else{
+ } else {
return newExpNode;
}
}
return null;
}
- public static boolean setExperimentCancel(String experimentId, ZooKeeper zk, long deliveryTag) throws KeeperException,
- InterruptedException {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+ public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
if (experimentEntry == null) {
// This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
@@ -557,51 +503,47 @@ public class GFacUtils {
return false;
} else {
// check cancel operation is being processed for the same experiment.
- Stat cancelState = zk.exists(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+ Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
if (cancelState != null) {
// another cancel operation is being processed. only one cancel operation can exist for a given experiment.
return false;
}
- zk.create(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // save cancel delivery tag to be acknowledge at the end.
- return true;
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+ .forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+ return true;
}
}
- public static boolean isCancelled(String experimentID, ZooKeeper zk
- ) throws KeeperException,
- InterruptedException {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
-
+ public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
if(experimentEntry == null){
return false;
}else {
- Stat exists = zk.exists(experimentEntry, false);
+ Stat exists = curatorClient.checkExists().forPath(experimentEntry);
if (exists != null) {
- String operation = new String(zk.getData(experimentEntry+File.separator+"operation", false, exists));
- if ("cancel".equals(operation)) {
- return true;
- }
- }
- }
+ String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+ if ("cancel".equals(operation)) {
+ return true;
+ }
+ }
+ }
return false;
}
public static void saveHandlerData(JobExecutionContext jobExecutionContext,
StringBuffer data, String className) throws GFacHandlerException {
try {
- ZooKeeper zk = jobExecutionContext.getZk();
- if (zk != null) {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
String expZnodeHandlerPath = AiravataZKUtils
.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(),
className);
- Stat exists = zk.exists(expZnodeHandlerPath, false);
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
if (exists != null) {
- zk.setData(expZnodeHandlerPath, data.toString().getBytes(),
- exists.getVersion());
- } else {
+ curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+ } else {
log.error("Saving Handler data failed, Stat is null");
}
}
@@ -610,18 +552,15 @@ public class GFacUtils {
}
}
- public static String getHandlerData(JobExecutionContext jobExecutionContext,
- String className) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- ZooKeeper zk = jobExecutionContext.getZk();
- if (zk != null) {
+ public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+ CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+ if (curatorClient != null) {
String expZnodeHandlerPath = AiravataZKUtils
.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(),
className);
- Stat exists = zk.exists(expZnodeHandlerPath, false);
- return new String(jobExecutionContext.getZk().getData(
- expZnodeHandlerPath, false, exists));
+ Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+ return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
}
return null;
}
@@ -664,18 +603,6 @@ public class GFacUtils {
}
}
- public static GlobusJobSubmission getGlobusJobSubmission (String submissionId) throws AppCatalogException{
- return null;
-// try {
-// AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
-// return appCatalog.getComputeResource().getGlobus(submissionId);
-// }catch (Exception e){
-// String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
-// log.error(errorMsg, e);
-// throw new AppCatalogException(errorMsg, e);
-// }
- }
-
public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
try {
AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
@@ -687,17 +614,6 @@ public class GFacUtils {
}
}
- public static CloudJobSubmission getCloudJobSubmission (String submissionId) throws AppCatalogException{
- try {
- AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
- return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
- }catch (Exception e){
- String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
- log.error(errorMsg, e);
- throw new AppCatalogException(errorMsg, e);
- }
- }
-
/**
* To convert list to separated value
* @param listOfStrings
@@ -764,21 +680,21 @@ public class GFacUtils {
return false;
}
- public static boolean ackCancelRequest(String experimentId, ZooKeeper zk) throws KeeperException, InterruptedException {
- String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+ public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception {
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
if (experimentEntry == null) {
// This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
- "This happen when experiment completed and already removed from the zookeeper");
+ "This happen when experiment completed and already removed from the CuratorFramework");
} else {
// check cancel operation is being processed for the same experiment.
- Stat cancelState = zk.exists(cancelNodePath, false);
+ Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
if (cancelState != null) {
- ZKUtil.deleteRecursive(zk,cancelNodePath);
- return true;
- }
- }
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+ return true;
+ }
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index c0784fb..540eaac 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -264,11 +264,7 @@ public class GSISSHProvider extends AbstractProvider {
}
return;
}
- } catch (ApplicationSettingsException e) {
- log.error("Error while recovering provider", e);
- } catch (KeeperException e) {
- log.error("Error while recovering provider", e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
log.error("Error while recovering provider", e);
}
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 15b7241..e53fe09 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -20,10 +20,8 @@
*/
package org.apache.airavata.gfac.monitor.util;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -34,15 +32,16 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
public class CommonUtils {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class);
@@ -189,53 +188,38 @@ public class CommonUtils {
/**
* Update job count for a given set of paths.
- * @param zk - zookeeper instance
+ * @param curatorClient - CuratorFramework instance
* @param changeCountMap - map of change job count with relevant path
* @param isAdd - Should add or reduce existing job count by the given job count.
*/
- public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer> changeCountMap, boolean isAdd) {
+ public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) {
StringBuilder changeZNodePaths = new StringBuilder();
try {
- if (zk == null || !zk.getState().isConnected()) {
- try {
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- countDownLatch.countDown();
- }
- });
- countDownLatch.await();
- } catch (ApplicationSettingsException e) {
- logger.error("Error while reading zookeeper hostport string");
- } catch (IOException e) {
- logger.error("Error while reconnect attempt to zookeeper where zookeeper connection loss state");
- }
- }
-
for (String path : changeCountMap.keySet()) {
if (isAdd) {
- CommonUtils.checkAndCreateZNode(zk, path);
+ CommonUtils.checkAndCreateZNode(curatorClient, path);
}
- byte[] byteData = zk.getData(path, null, null);
+ byte[] byteData = curatorClient.getData().forPath(path);
String nodeData;
if (byteData == null) {
if (isAdd) {
- zk.setData(path, String.valueOf(changeCountMap.get(path)).getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes());
} else {
// This is not possible, but we handle in case there any data zookeeper communication failure
logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
- zk.setData(path, "0".getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes());
}
} else {
nodeData = new String(byteData);
if (isAdd) {
- zk.setData(path, String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path,
+ String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes());
} else {
int previousCount = Integer.parseInt(nodeData);
int removeCount = changeCountMap.get(path);
if (previousCount >= removeCount) {
- zk.setData(path, String.valueOf(previousCount - removeCount).getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath(path,
+ String.valueOf(previousCount - removeCount).getBytes());
} else {
// This is not possible, do we need to reset the job count to 0 ?
logger.error("Requested remove job count is " + removeCount +
@@ -250,11 +234,9 @@ public class CommonUtils {
// update stat node to trigger orchestrator watchers
if (changeCountMap.size() > 0) {
changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
- zk.setData("/" + Constants.STAT, changeZNodePaths.toString().getBytes(), -1);
+ curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes());
}
- } catch (KeeperException e) {
- logger.error("Error while writing job count to zookeeper", e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
logger.error("Error while writing job count to zookeeper", e);
}
@@ -267,7 +249,7 @@ public class CommonUtils {
public static void increaseZkJobCount(MonitorID monitorID) {
Map<String, Integer> addMap = new HashMap<String, Integer>();
addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
- updateZkWithJobCount(monitorID.getJobExecutionContext().getZk(), addMap, true);
+ updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true);
}
/**
@@ -282,17 +264,17 @@ public class CommonUtils {
/**
* Check whether znode is exist in given path if not create a new znode
- * @param zk - zookeeper instance
+ * @param curatorClient - zookeeper instance
* @param path - path to check znode
* @throws KeeperException
* @throws InterruptedException
*/
- private static void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
- if (zk.exists(path, null) == null) { // if znode doesn't exist
+ private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception {
+ if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist
if (path.lastIndexOf("/") > 1) { // recursively traverse to parent znode and check parent exist
- checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+ checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/"))));
}
- zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 19ea3ac..f967dbf 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -410,11 +410,7 @@ public class SSHProvider extends AbstractProvider {
this.execute(jobExecutionContext);
return;
}
- } catch (ApplicationSettingsException e) {
- log.error("Error while recovering provider", e);
- } catch (KeeperException e) {
- log.error("Error while recovering provider", e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
log.error("Error while recovering provider", e);
}
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
deleted file mode 100644
index 9d40557..0000000
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.util;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.server.OrchestratorServerHandler;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-public class OrchestratorRecoveryHandler implements Watcher {
- private static Logger log = LoggerFactory.getLogger(OrchestratorRecoveryHandler.class);
-
- private ZooKeeper zk;
-
- private String gfacId;
-
- private static Integer mutex = -1;
-
- private OrchestratorServerHandler serverHandler;
-
- public OrchestratorRecoveryHandler(OrchestratorServerHandler handler, String zkExpPath) {
- this.zk = zk;
- int index = zkExpPath.split(File.separator).length - 1;
- this.gfacId = zkExpPath.split(File.separator)[index];
- this.serverHandler = handler;
- }
-
- /**
- * This method return the list of experimentId
- *
- * @return
- * @throws OrchestratorException
- * @throws ApplicationSettingsException
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
- log.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000);
- }
- List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
- + File.separator + gfacId, false);
- log.info("------------------ Recovering Experiments started ----------------------------------");
- for (String expId : children) {
- log.info("Recovering Experiment: " + expId.split("\\+")[0]);
- log.info("------------------------------------------------------------------------------------");
- try {
- if(GFacUtils.isCancelled(expId.split("\\+")[0], zk)) {// during relaunching we check the operation and then launch
- serverHandler.terminateExperiment(expId.split("\\+")[0], null);
- }else {
- serverHandler.launchExperiment(expId.split("\\+")[0], null);
- }
- // we do not move the old experiment in to new gfac node, gfac will do it
- } catch (Exception e) { // we attempt all the experiments
- log.error(e.getMessage(), e);
- }
- log.info("------------------------------------------------------------------------------------");
- }
- }
-
- synchronized public void process(WatchedEvent watchedEvent) {
- log.info(watchedEvent.getPath());
- synchronized (mutex) {
- Event.KeeperState state = watchedEvent.getState();
- switch (state) {
- case SyncConnected:
- mutex.notify();
- break;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 76b2a75..d57d85a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
<axiom.version>1.2.8</axiom.version>
<surefire.version>2.18.1</surefire.version>
<junit.version>4.12</junit.version>
- <curator.version>2.7.1</curator.version>
+ <curator.version>2.8.0</curator.version>
<xmlbeans.version>2.5.0</xmlbeans.version>
<xpp3.version>1.1.6</xpp3.version>
<xpp5.version>1.2.8</xpp5.version>
@@ -426,6 +426,11 @@
<artifactId>zookeeper</artifactId>
<version>${zk.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
[4/4] airavata git commit: Integrated Apache curator with Gfac. With
this fix every zookeeper call goes through CuratorFramework client.
Posted by sh...@apache.org.
Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a061780
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a061780
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a061780
Branch: refs/heads/master
Commit: 4a06178022c0cd1a215c47e3c65a7b6ffd342bbb
Parents: 915ed04
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu May 28 11:26:36 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu May 28 11:26:36 2015 -0400
----------------------------------------------------------------------
airavata-api/airavata-api-server/pom.xml | 6 +-
.../AiravataExperimentStatusUpdator.java | 44 +-
modules/commons/utils/pom.xml | 6 +-
.../airavata/common/utils/AiravataZKUtils.java | 61 +--
modules/distribution/server/pom.xml | 5 +
modules/gfac/airavata-gfac-service/pom.xml | 5 -
.../airavata/gfac/server/GfacServerHandler.java | 153 ++----
modules/gfac/gfac-core/pom.xml | 7 +-
.../gfac/core/context/JobExecutionContext.java | 46 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 483 +++++++------------
.../gfac/core/handler/AbstractHandler.java | 2 +-
.../core/handler/AppDescriptorCheckHandler.java | 2 +-
.../core/monitor/GfacInternalStatusUpdator.java | 62 +--
.../airavata/gfac/core/utils/GFacUtils.java | 408 +++++++---------
.../gsissh/provider/impl/GSISSHProvider.java | 6 +-
.../airavata/gfac/monitor/util/CommonUtils.java | 62 +--
.../gfac/ssh/provider/impl/SSHProvider.java | 6 +-
.../util/OrchestratorRecoveryHandler.java | 109 -----
pom.xml | 7 +-
19 files changed, 509 insertions(+), 971 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index df87344..28b9ef9 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -81,7 +81,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index dd7bcd0..f59179e 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -23,11 +23,13 @@ package org.apache.airavata.api.server.listener;
import com.google.common.eventbus.Subscribe;
import org.apache.airavata.api.server.util.DataModelUtils;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.*;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
@@ -38,9 +40,8 @@ import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,18 +50,12 @@ import java.util.Calendar;
public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
-
private Registry airavataRegistry;
-
private MonitorPublisher monitorPublisher;
-
private Publisher publisher;
-
- private ZooKeeper zk;
-
+ private CuratorFramework curatorClient;
private RabbitMQTaskLaunchConsumer consumer;
-
public Registry getAiravataRegistry() {
return airavataRegistry;
}
@@ -130,9 +125,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}
}
- private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws KeeperException, InterruptedException, AiravataException {
+ private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws Exception {
int count = 0;
- long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk,
+ long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), curatorClient,
experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
if(deliveryTag>0) {
if (ServerSettings.isGFacPassiveMode()) {
@@ -152,16 +147,18 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}
}
}
- if (zk.exists(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, false) != null) {
- ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX);
+ if (curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX) != null) {
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+ experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, true);
}
- if (zk.exists(experimentPath, false) != null) {
- ZKUtil.deleteRecursive(zk, experimentPath);
+
+ if (curatorClient.checkExists().forPath(experimentPath) != null) {
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath, true);
}
// ack cancel operation if exist
long cancelDT = AiravataZKUtils.getCancelDeliveryTagIfExist(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
- zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+ curatorClient, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
count = 0;
if (cancelDT > 0) {
while (!consumer.isOpen() && count < 3) {
@@ -180,7 +177,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}
}
if (cancelDT > 0) {
- ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+ experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
}
}
@@ -211,8 +209,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
this.publisher=(Publisher) configuration;
}else if (configuration instanceof RabbitMQTaskLaunchConsumer) {
this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
- }else if (configuration instanceof ZooKeeper) {
- this.zk = (ZooKeeper) configuration;
+ }else if (configuration instanceof CuratorFramework) {
+ this.curatorClient = (CuratorFramework) configuration;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index 44d7465..09766af 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -130,9 +130,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.0</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index ac617d9..f753bc1 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -20,9 +20,10 @@
*/
package org.apache.airavata.common.utils;
-import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
@@ -36,15 +37,12 @@ import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
-import java.util.List;
public class AiravataZKUtils implements Watcher {
private final static Logger logger = LoggerFactory.getLogger(AiravataZKUtils.class);
public static final String ZK_EXPERIMENT_STATE_NODE = "state";
-
public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
-
public static final String CANCEL_DELIVERY_TAG_POSTFIX = "-cancel-deliveryTag";
@Override
@@ -81,35 +79,14 @@ public class AiravataZKUtils implements Watcher {
"state";
}
- public static String getExpState(ZooKeeper zk, String expId) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- Stat exists = zk.exists(getExpStatePath(expId), false);
+ public static String getExpState(CuratorFramework curatorClient, String expId) throws Exception {
+ Stat exists = curatorClient.checkExists().forPath(getExpStatePath(expId));
if (exists != null) {
- return new String(zk.getData(getExpStatePath(expId), false, exists));
+ return new String(curatorClient.getData().storingStatIn(exists).forPath(getExpStatePath(expId)));
}
return null;
}
-
- public static int getExpStateValueWithGivenPath(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- Stat exists = zk.exists(fullPath, false);
- if (exists != null) {
- return Integer.parseInt(new String(zk.getData(fullPath, false, exists)));
- }
- return -1;
- }
- public static List<String> getRunningGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException {
- String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_API_SERVER_NODE, "/gfac-server");
- return zk.getChildren(gfacServer, null);
- }
-
-
- public static List<String> getAllGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException {
- String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- return zk.getChildren(gfacServer, null);
- }
-
public static void runZKFromConfig(ServerConfig config,ServerCnxnFactory cnxnFactory) throws IOException {
AiravataZKUtils.logger.info("Starting Zookeeper server...");
FileTxnSnapLog txnLog = null;
@@ -177,33 +154,22 @@ public class AiravataZKUtils implements Watcher {
}
}
- public static void storeDeliveryTag(ZooKeeper zk,String newExpNode,Double deliveryTag) throws KeeperException, InterruptedException {
- String s = zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- Stat expParent = zk.exists(newExpNode, false);
- if (expParent != null) {
- zk.setData(newExpNode, toByteArray(deliveryTag),
- expParent.getVersion());
- }
- }
-
public static byte[] toByteArray(double value) {
byte[] bytes = new byte[8];
ByteBuffer.wrap(bytes).putDouble(value);
return bytes;
}
- public static long getDeliveryTag(String experimentID, ZooKeeper zk, String experimentNode,
- String pickedChild) throws KeeperException, InterruptedException,AiravataException {
+ public static long getDeliveryTag(String experimentID, CuratorFramework curatorClient, String experimentNode,
+ String pickedChild) throws Exception {
String deliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentID
+ DELIVERY_TAG_POSTFIX;
- Stat exists = zk.exists(deliveryTagPath, false);
+ Stat exists = curatorClient.checkExists().forPath(deliveryTagPath);
if(exists==null) {
logger.error("Cannot find delivery Tag in path:" + deliveryTagPath + " for this experiment");
return -1;
}
- return bytesToLong(zk.getData(deliveryTagPath, false, exists));
+ return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(deliveryTagPath));
}
public static byte[] longToBytes(long x) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
@@ -222,15 +188,16 @@ public class AiravataZKUtils implements Watcher {
return ByteBuffer.wrap(bytes).getDouble();
}
- public static long getCancelDeliveryTagIfExist(String experimentId, ZooKeeper zk, String experimentNode, String pickedChild) throws KeeperException, InterruptedException {
+ public static long getCancelDeliveryTagIfExist(String experimentId, CuratorFramework curatorClient,
+ String experimentNode, String pickedChild) throws Exception {
String cancelDeliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentId +
AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
- Stat exists = zk.exists(cancelDeliveryTagPath, false);
+ Stat exists = curatorClient.checkExists().forPath(cancelDeliveryTagPath);
if (exists == null) {
return -1; // no cancel deliverytag found
} else {
- return bytesToLong(zk.getData(cancelDeliveryTagPath, false, exists));
+ return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(cancelDeliveryTagPath));
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 9a8d2c7..6c2f213 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -605,6 +605,11 @@
<artifactId>amqp-client</artifactId>
<version>${amqp.client.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
<!-- ======================== Sample =================== -->
<dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index 5a178bd..3e8b423 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -82,11 +82,6 @@
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 5d747aa..20926d7 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -22,7 +22,6 @@ package org.apache.airavata.gfac.server;
import com.google.common.eventbus.EventBus;
import org.airavata.appcatalog.cpi.AppCatalog;
-import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -45,71 +44,65 @@ import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Lock;
-public class GfacServerHandler implements GfacService.Iface, Watcher {
+public class GfacServerHandler implements GfacService.Iface {
private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
-
private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
-
private static int requestCount=0;
-
private Registry registry;
private AppCatalog appCatalog;
-
private String gatewayName;
-
private String airavataUserName;
-
- private ZooKeeper zk;
-
+// private ZooKeeper zk;
+ private CuratorFramework curatorClient;
private static Integer mutex = -1;
-
private static Lock lock;
-
private MonitorPublisher publisher;
-
private String gfacServer;
-
private String gfacExperiments;
-
private String airavataServerHostPort;
-
-
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
public GfacServerHandler() throws Exception {
- // registering with zk
try {
+ // start curator client
String zkhostPort = AiravataZKUtils.getZKhostPort();
- airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
- + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+ curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy);
+ curatorClient.start();
gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
- logger.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
+ airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+ + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
storeServerConfig();
- logger.info("Finished starting ZK: " + zk);
publisher = new MonitorPublisher(new EventBus());
BetterGfacImpl.setMonitorPublisher(publisher);
registry = RegistryFactory.getDefaultRegistry();
@@ -121,24 +114,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
}
- BetterGfacImpl.startStatusUpdators(registry, zk, publisher, rabbitMQTaskLaunchConsumer);
- } catch (ApplicationSettingsException e) {
- logger.error("Error initialising GFAC", e);
- throw new Exception("Error initialising GFAC", e);
- } catch (InterruptedException e) {
- logger.error("Error initialising GFAC", e);
- throw new Exception("Error initialising GFAC", e);
- } catch (AppCatalogException e) {
- logger.error("Error initialising GFAC", e);
- throw new Exception("Error initialising GFAC", e);
- } catch (RegistryException e) {
- logger.error("Error initialising GFAC", e);
- throw new Exception("Error initialising GFAC", e);
- } catch (KeeperException e) {
- logger.error("Error initialising GFAC", e);
- throw new Exception("Error initialising GFAC", e);
- } catch (IOException e) {
- logger.error("Error initialising GFAC", e);
+ BetterGfacImpl.startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
+ } catch (Exception e) {
throw new Exception("Error initialising GFAC", e);
}
}
@@ -152,60 +129,27 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
logger.error(e.getMessage(), e);
}
}
- private void storeServerConfig() throws KeeperException, InterruptedException, ApplicationSettingsException {
- Stat zkStat = zk.exists(gfacServer, false);
- if (zkStat == null) {
- zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ private void storeServerConfig() throws Exception {
+ Stat stat = curatorClient.checkExists().forPath(gfacServer);
+ if (stat == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+ .forPath(gfacServer, new byte[0]);
}
String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
String instanceNode = gfacServer + File.separator + instanceId;
- zkStat = zk.exists(instanceNode, true);
- if (zkStat == null) {
- zk.create(instanceNode,
- airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL); // other component will watch these childeren creation deletion to monitor the status of the node
- zk.getChildren(instanceNode, true);
+ stat = curatorClient.checkExists().forPath(instanceNode);
+ if (stat == null) {
+ curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes());
+ curatorClient.getChildren().watched().forPath(instanceNode);
}
- zkStat = zk.exists(gfacExperiments, false);
- if (zkStat == null) {
- zk.create(gfacExperiments,
- airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ stat = curatorClient.checkExists().forPath(gfacExperiments);
+ if (stat == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes());
}
- zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
- if (zkStat == null) {
- zk.create(gfacExperiments + File.separator + instanceId,
- airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else {
- logger.error(" Zookeeper is inconsistent state !!!!!");
- }
- }
-
- synchronized public void process(WatchedEvent watchedEvent) {
- logger.info(watchedEvent.getPath());
- logger.info(watchedEvent.getType().toString());
- synchronized (mutex) {
- Event.KeeperState state = watchedEvent.getState();
- logger.info(state.name());
- switch (state){
- case SyncConnected:
- mutex.notify();
- break;
- case Expired:case Disconnected:
- logger.info("ZK Connection is "+ state.toString());
- try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- }
-// synchronized (mutex) {
-// mutex.wait(5000); // waiting for the syncConnected event
-// }
- }
+ stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId);
+ if (stat == null) {
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+ .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes());
}
}
@@ -304,7 +248,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
private GFac getGfac() throws TException {
try {
- return new BetterGfacImpl(registry, appCatalog,null , publisher);
+ return new BetterGfacImpl(registry, appCatalog, curatorClient, publisher);
} catch (IOException e) {
logger.error(e.getMessage(), e);
@@ -379,16 +323,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
registry.update(RegistryModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
try {
- GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+ GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient,
+ experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
AiravataZKUtils.getExpStatePath(event.getExperimentId());
submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
- } catch (KeeperException e) {
- logger.error(nodeName + " was interrupted.");
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (ApplicationSettingsException e) {
+ } catch (Exception e) {
logger.error(e.getMessage(), e);
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
@@ -404,7 +343,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
try {
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), zk, message.getDeliveryTag());
+ boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, message.getDeliveryTag());
if (saveDeliveryTagSuccess) {
cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
System.out.println(" Message Received with message id '" + message.getMessageId()
@@ -420,7 +359,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
// if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
} else {
try {
- if (GFacUtils.ackCancelRequest(event.getExperimentId(), zk)) {
+ if (GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) {
if (!rabbitMQTaskLaunchConsumer.isOpen()) {
rabbitMQTaskLaunchConsumer.reconnect();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 465f00e..7eede18 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -116,11 +116,10 @@
<!-- this is the dependency for amqp implementation -->
<!-- zookeeper dependencies -->
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.0</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
</dependency>
-
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 6a8dd5f..0ca3828 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,29 +51,18 @@ public class JobExecutionContext extends AbstractContext implements Serializable
private static final Logger log = LoggerFactory.getLogger(JobExecutionContext.class);
private GFacConfiguration gfacConfiguration;
-
private ApplicationContext applicationContext;
-
private MessageContext inMessageContext;
-
private MessageContext outMessageContext;
-
private GFacNotifier notifier;
-
//FIXME : not needed for gfac
private Experiment experiment;
-
private TaskDetails taskData;
-
private JobDetails jobDetails;
-
// FIXME : not needed for gfac
private WorkflowNodeDetails workflowNodeDetails;
-
private GFac gfac;
-
- private ZooKeeper zk;
-
+ private CuratorFramework curatorClient;
private String credentialStoreToken;
/**
* User defined scratch/temp directory
@@ -150,10 +140,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable
// which service description we should refer during the execution of the current job represented
// by this context instance.
private String applicationName;
-
private String experimentID;
-
private AppCatalog appCatalog;
+ private String gatewayID;
+ private String status;
+ private List<String> outputFileList;
+ private Registry registry;
public String getGatewayID() {
return gatewayID;
@@ -163,13 +155,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable
this.gatewayID = gatewayID;
}
- private String gatewayID;
-
- private String status;
-
- private List<String> outputFileList;
-
- private Registry registry;
/**
* Security context is used to handle authentication for input handlers and providers.
@@ -377,15 +362,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable
this.gfac = gfac;
}
- public ZooKeeper getZk() {
- return zk;
- }
-
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
-
- }
-
public String getCredentialStoreToken() {
return credentialStoreToken;
}
@@ -494,6 +470,14 @@ public class JobExecutionContext extends AbstractContext implements Serializable
this.preferredDataMovementInterface = preferredDataMovementInterface;
}
+ public CuratorFramework getCuratorClient() {
+ return curatorClient;
+ }
+
+ public void setCuratorClient(CuratorFramework curatorClient) {
+ this.curatorClient = curatorClient;
+ }
+
public String getExecutablePath() {
if (applicationContext == null || applicationContext.getApplicationDeploymentDescription() == null) {
return null;
@@ -502,6 +486,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
}
}
+
+
public String getLoginUserName() {
return loginUserName;
}