You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/28 17:37:40 UTC
[3/4] airavata git commit: Integrated Apache curator with Gfac. With
this fix every zookeeper call goes through CuratorFramework client.
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 2cd9ecb..200ffbe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -61,6 +61,8 @@ import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -80,57 +82,44 @@ import java.util.*;
* This is the GFac CPI class for external usage, this simply have a single method to submit a job to
* the resource, required data for the job has to be stored in registry prior to invoke this object.
*/
-public class BetterGfacImpl implements GFac,Watcher {
+public class BetterGfacImpl implements GFac {
private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
public static final String ERROR_SENT = "ErrorSent";
private Registry registry;
- // we are not storing zk instance in to jobExecution context
- private ZooKeeper zk;
+ private CuratorFramework curatorClient;
private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
private static File gfacConfigFile;
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private static MonitorPublisher monitorPublisher;
- private static Integer mutex = -1;
/**
* Constructor for GFac
*
* @param registry
- * @param zooKeeper
+ * @param curatorClient
*/
- public BetterGfacImpl(Registry registry, AppCatalog appCatalog, ZooKeeper zooKeeper,
+ public BetterGfacImpl(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
this.registry = registry;
monitorPublisher = publisher; // This is a EventBus common for gfac
- this.zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- log.info("Waiting until zookeeper client connect to the server...");
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
+ this.curatorClient = curatorClient;
}
- public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+ public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+
+ RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
try {
String[] listenerClassList = ServerSettings.getActivityListeners();
Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-
for (String listenerClass : listenerClassList) {
Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
AbstractActivityListener abstractActivityListener = aClass.newInstance();
activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher,rabbitMQTaskLaunchConsumer);
+ abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
log.info("Registering listener: " + listenerClass);
publisher.registerListener(abstractActivityListener);
}
- } catch (ClassNotFoundException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (InstantiationException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (IllegalAccessException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (ApplicationSettingsException e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- } catch (AiravataException e) {
+ } catch (Exception e) {
log.error("Error loading the listener classes configured in airavata-server.properties", e);
}
}
@@ -151,26 +140,9 @@ public class BetterGfacImpl implements GFac,Watcher {
threadedHandler.initProperties(handlerConfig.getProperties());
daemonHandlers.add(threadedHandler);
}
- } catch (ParserConfigurationException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (IOException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (SAXException e) {
- log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (XPathExpressionException e) {
+ } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
+ InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
- } catch (ClassNotFoundException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
- } catch (InstantiationException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
- } catch (GFacHandlerException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
- } catch (IllegalAccessException e) {
- log.error("Error initializing the handler: " + className);
- log.error(className + " class has to implement " + ThreadedHandler.class);
}
for (ThreadedHandler tHandler : daemonHandlers) {
(new Thread(tHandler)).start();
@@ -184,13 +156,13 @@ public class BetterGfacImpl implements GFac,Watcher {
daemonHandlers = new ArrayList<ThreadedHandler>();
startDaemonHandlers();
}
-
+
public BetterGfacImpl(Registry registry) {
- this();
- this.registry = registry;
+ this();
+ this.registry = registry;
}
-
+
/**
* This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
* And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
@@ -206,13 +178,13 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setCredentialStoreToken(tokenId);
return submitJob(jobExecutionContext);
} catch (Exception e) {
- log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
+ log.error("Error inovoking the job with experiment ID: " + experimentID + ":" + e.getMessage());
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
// FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
// task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
- if(jobExecutionContext!=null){
+ if (jobExecutionContext != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -222,8 +194,6 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(event);
}
throw new GFacException(e);
- }finally {
- closeZK(jobExecutionContext);
}
}
@@ -237,7 +207,7 @@ public class BetterGfacImpl implements GFac,Watcher {
* 1. Get the Task from the task ID and construct the Job object and save it in to registry
* 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
*/
-
+
//Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
@@ -289,11 +259,11 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setTaskData(taskData);
jobExecutionContext.setGatewayID(gatewayID);
jobExecutionContext.setAppCatalog(appCatalog);
-
-
+
+
List<JobDetails> jobDetailsList = taskData.getJobDetailsList();
//FIXME: Following for loop only set last jobDetails element to the jobExecutionContext
- for(JobDetails jDetails:jobDetailsList){
+ for (JobDetails jDetails : jobDetailsList) {
jobExecutionContext.setJobDetails(jDetails);
}
// setting the registry
@@ -314,11 +284,11 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
jobExecutionContext.setGfac(this);
- jobExecutionContext.setZk(zk);
+ jobExecutionContext.setCuratorClient(curatorClient);
// handle job submission protocol
List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
- if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
+ if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
@Override
public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
@@ -327,7 +297,7 @@ public class BetterGfacImpl implements GFac,Watcher {
});
jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
- }else {
+ } else {
throw new GFacException("Compute resource should have at least one job submission interface defined...");
}
// handle data movement protocol
@@ -346,7 +316,7 @@ public class BetterGfacImpl implements GFac,Watcher {
populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
populateResourceJobManager(jobExecutionContext);
// if gateway resource preference is set
- if (gatewayResourcePreferences != null ) {
+ if (gatewayResourcePreferences != null) {
if (gatewayResourcePreferences.getScratchLocation() == null) {
gatewayResourcePreferences.setScratchLocation("/tmp");
}
@@ -365,7 +335,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- if(gatewayResourcePreferences.getLoginUserName() != null){
+ if (gatewayResourcePreferences.getLoginUserName() != null) {
jobExecutionContext.setLoginUserName(gatewayResourcePreferences.getLoginUserName());
}
@@ -375,45 +345,45 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
} else {
- // this check is to avoid NPE when job submission endpoints do
- // not contain any data movement interfaces.
- if((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
- for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
- if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
- jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
- break;
- }
- }
- }
+ // this check is to avoid NPE when job submission endpoints do
+ // not contain any data movement interfaces.
+ if ((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
+ for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
+ if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
+ jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
+ break;
+ }
+ }
+ }
}
- } else {
+ } else {
setUpWorkingLocation(jobExecutionContext, applicationInterface, "/tmp");
}
List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
- if (taskOutputs == null || taskOutputs.isEmpty() ){
+ if (taskOutputs == null || taskOutputs.isEmpty()) {
taskOutputs = applicationInterface.getApplicationOutputs();
}
- for (OutputDataObjectType objectType : taskOutputs){
- if (objectType.getType() == DataType.URI && objectType.getValue() != null){
+ for (OutputDataObjectType objectType : taskOutputs) {
+ if (objectType.getType() == DataType.URI && objectType.getValue() != null) {
String filePath = objectType.getValue();
// if output is not in working folder
if (objectType.getLocation() != null && !objectType.getLocation().isEmpty()) {
- if(objectType.getLocation().startsWith(File.separator)){
- filePath = objectType.getLocation() + File.separator + filePath;
- }else{
- filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
+ if (objectType.getLocation().startsWith(File.separator)) {
+ filePath = objectType.getLocation() + File.separator + filePath;
+ } else {
+ filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
}
- }else{
- filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
+ } else {
+ filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
}
objectType.setValue(filePath);
-
+
}
- if (objectType.getType() == DataType.STDOUT){
+ if (objectType.getType() == DataType.STDOUT) {
objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
}
- if (objectType.getType() == DataType.STDERR){
+ if (objectType.getType() == DataType.STDERR) {
objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
}
}
@@ -464,23 +434,23 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void populateResourceJobManager (JobExecutionContext jobExecutionContext) {
+ private void populateResourceJobManager(JobExecutionContext jobExecutionContext) {
try {
JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
if (submissionProtocol == JobSubmissionProtocol.SSH) {
SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
- if (sshJobSubmission != null){
+ if (sshJobSubmission != null) {
jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
}
- } else if (submissionProtocol == JobSubmissionProtocol.LOCAL){
+ } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
- if (localJobSubmission != null){
+ if (localJobSubmission != null) {
jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
}
}
} catch (AppCatalogException e) {
- log.error("Error occured while retrieving job submission interface", e);
+ log.error("Error occured while retrieving job submission interface", e);
}
}
@@ -488,7 +458,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// We need to check whether this job is submitted as a part of a large workflow. If yes,
// we need to setup workflow tracking listerner.
try {
- GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
// Register log event listener. This is required in all scenarios.
if (isNewJob(gfacExpState)) {
// In this scenario We do everything from the beginning
@@ -497,8 +467,8 @@ public class BetterGfacImpl implements GFac,Watcher {
launch(jobExecutionContext);
} else if (isCompletedJob(gfacExpState)) {
log.info("There is nothing to recover in this job so we do not re-submit");
- ZKUtil.deleteRecursive(zk,
- AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+ ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+ AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()), true);
} else {
// Now we know this is an old Job, so we have to handle things gracefully
log.info("Re-launching the job in GFac because this is re-submitted to GFac");
@@ -547,7 +517,7 @@ public class BetterGfacImpl implements GFac,Watcher {
private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
try {
- GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
String workflowInstanceID = null;
if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
//todo implement WorkflowTrackingListener properly
@@ -575,27 +545,25 @@ public class BetterGfacImpl implements GFac,Watcher {
//
// }
return true;
- }catch(Exception e){
- log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
- throw new GFacException(e.getMessage(), e);
- }finally{
- closeZK(jobExecutionContext);
- }
+ } catch (Exception e) {
+ log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+ throw new GFacException(e.getMessage(), e);
}
+ }
- private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
- // Scheduler will decide the execution flow of handlers and provider
- // which handles
- // the job.
- String experimentID = jobExecutionContext.getExperimentID();
- try {
- Scheduler.schedule(jobExecutionContext);
+ private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
+ // Scheduler will decide the execution flow of handlers and provider
+ // which handles
+ // the job.
+ String experimentID = jobExecutionContext.getExperimentID();
+ try {
+ Scheduler.schedule(jobExecutionContext);
- // Executing in handlers in the order as they have configured in
- // GFac configuration
- // here we do not skip handler if some handler does not have to be
- // run again during re-run it can implement
- // that logic in to the handler
+ // Executing in handlers in the order as they have configured in
+ // GFac configuration
+ // here we do not skip handler if some handler does not have to be
+ // run again during re-run it can implement
+ // that logic in to the handler
// After executing the in handlers provider instance should be set
// to job execution context.
@@ -637,31 +605,31 @@ public class BetterGfacImpl implements GFac,Watcher {
default:
throw new GFacException("Un-handled GfacExperimentState : " + state.name());
}
- } catch (Exception e) {
- log.error(e.getMessage(),e);
- try {
- // we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
JobIdentifier jobIdentity = new JobIdentifier(
jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
- } catch (NullPointerException e1) {
- log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
- + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
+ monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- }
- jobExecutionContext.setProperty(ERROR_SENT, "true");
- jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
}
}
@@ -678,34 +646,34 @@ public class BetterGfacImpl implements GFac,Watcher {
}
private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
- // Scheduler will decide the execution flow of handlers and provider
- // which handles
- // the job.
- try {
- Scheduler.schedule(jobExecutionContext);
-
- // Executing in handlers in the order as they have configured in
- // GFac configuration
- // here we do not skip handler if some handler does not have to be
- // run again during re-run it can implement
- // that logic in to the handler
+ // Scheduler will decide the execution flow of handlers and provider
+ // which handles
+ // the job.
+ try {
+ Scheduler.schedule(jobExecutionContext);
+
+ // Executing in handlers in the order as they have configured in
+ // GFac configuration
+ // here we do not skip handler if some handler does not have to be
+ // run again during re-run it can implement
+ // that logic in to the handler
if (!isCancelling(jobExecutionContext)) {
invokeInFlowHandlers(jobExecutionContext); // to keep the
// consistency we always
// try to re-run to
// avoid complexity
- }else{
+ } else {
log.info("Experiment is cancelled, so launch operation is stopping immediately");
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
}
// if (experimentID != null){
- // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
- // }
+ // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+ // }
- // After executing the in handlers provider instance should be set
- // to job execution context.
- // We get the provider instance and execute it.
+ // After executing the in handlers provider instance should be set
+ // to job execution context.
+ // We get the provider instance and execute it.
if (!isCancelling(jobExecutionContext)) {
invokeProviderExecute(jobExecutionContext);
} else {
@@ -713,53 +681,53 @@ public class BetterGfacImpl implements GFac,Watcher {
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
return;
}
- } catch (Exception e) {
- try {
- // we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- // monitorPublisher.publish(new
- // ExperimentStatusChangedEvent(new
- // ExperimentIdentity(jobExecutionContext.getExperimentID()),
- // ExperimentState.FAILED));
- // Updating the task status if there's any task associated
- // monitorPublisher.publish(new TaskStatusChangeRequest(
- // new TaskIdentity(jobExecutionContext.getExperimentID(),
- // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- // jobExecutionContext.getTaskData().getTaskID()),
- // TaskState.FAILED
- // ));
+ } catch (Exception e) {
+ try {
+ // we make the experiment as failed due to exception scenario
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ // monitorPublisher.publish(new
+ // ExperimentStatusChangedEvent(new
+ // ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ // ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
+ // monitorPublisher.publish(new TaskStatusChangeRequest(
+ // new TaskIdentity(jobExecutionContext.getExperimentID(),
+ // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ // jobExecutionContext.getTaskData().getTaskID()),
+ // TaskState.FAILED
+ // ));
JobIdentifier jobIdentity = new JobIdentifier(
- jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
- } catch (NullPointerException e1) {
- log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
- + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
- //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
- // Updating the task status if there's any task associated
+ monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+ } catch (NullPointerException e1) {
+ log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+ //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+ // Updating the task status if there's any task associated
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
jobExecutionContext.getGatewayID());
monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
- }
- jobExecutionContext.setProperty(ERROR_SENT, "true");
- jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+ }
+ jobExecutionContext.setProperty(ERROR_SENT, "true");
+ jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
- }
+ }
}
- private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
+ private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
@@ -767,13 +735,13 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+ private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
if (submit) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
@@ -781,18 +749,16 @@ public class BetterGfacImpl implements GFac,Watcher {
} else {
provider.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
} else {
disposeProvider(provider, jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
}
- if (GFacUtils.isSynchronousMode(jobExecutionContext))
-
- {
+ if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
invokeOutFlowHandlers(jobExecutionContext);
}
@@ -812,12 +778,12 @@ public class BetterGfacImpl implements GFac,Watcher {
}
// TODO - Did refactoring, but need to recheck the logic again.
- private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+ private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
initProvider(provider, jobExecutionContext);
cancelProvider(provider, jobExecutionContext);
@@ -825,7 +791,7 @@ public class BetterGfacImpl implements GFac,Watcher {
} else {
provider.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
@@ -885,11 +851,11 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if(!isCancelling(jobExecutionContext)) {
+ if (!isCancelling(jobExecutionContext)) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
handler.initProperties(handlerClassName.getProperties());
@@ -902,12 +868,12 @@ public class BetterGfacImpl implements GFac,Watcher {
}
try {
handler.invoke(jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
// if exception thrown before that we do not make it finished
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
}
- }else{
+ } else {
log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
break;
@@ -916,38 +882,18 @@ public class BetterGfacImpl implements GFac,Watcher {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.INHANDLERSINVOKED));
} catch (Exception e) {
- throw new GFacException("Error Invoking Handlers:"+e.getMessage(), e);
+ throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
}
}
public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
String experimentPath = null;
try {
- try {
- if(jobExecutionContext.getZk()!=null){
- closeZK(jobExecutionContext);
- }
- jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
- zk = jobExecutionContext.getZk();
- log.info("Waiting until zookeeper client connect to the server...");
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
- experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
- if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
- log.error("Experiment is already finalized so no output handlers will be invoked");
- return;
- }
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- } catch (KeeperException e) {
- log.error(e.getMessage(), e);
+ experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+ if (curatorClient.checkExists().forPath(experimentPath) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
}
-
GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
List<GFacHandlerConfig> handlers = null;
if (gFacConfiguration != null) {
@@ -968,23 +914,20 @@ public class BetterGfacImpl implements GFac,Watcher {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
- GFacUtils.createHandlerZnode(jobExecutionContext.getZk(), jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
handler.initProperties(handlerClassName.getProperties());
} catch (ClassNotFoundException e) {
log.error(e.getMessage());
throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
+ } catch (InstantiationException | IllegalAccessException e) {
log.error(e.getMessage());
throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
}
try {
handler.invoke(jobExecutionContext);
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (Exception e) {
GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
try {
@@ -1008,10 +951,8 @@ public class BetterGfacImpl implements GFac,Watcher {
} catch (Exception e) {
throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
}
- }catch (Exception e){
+ } catch (Exception e) {
throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
- } finally{
- closeZK(jobExecutionContext);
}
// At this point all the execution is finished so we update the task and experiment statuses.
@@ -1029,16 +970,6 @@ public class BetterGfacImpl implements GFac,Watcher {
}
- private void closeZK(JobExecutionContext jobExecutionContext) {
- try {
- if(jobExecutionContext!=null && jobExecutionContext.getZk()!=null) {
- jobExecutionContext.getZk().close();
- }
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- }
- }
-
/**
* If handlers ran successfully we re-run only recoverable handlers
* If handler never ran we run the normal invoke method
@@ -1058,8 +989,8 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
handler.initProperties(handlerClassName.getProperties());
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
@@ -1069,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
handler.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
} catch (ClassNotFoundException e) {
@@ -1086,7 +1017,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1117,8 +1048,8 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
- GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+ GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
handler.initProperties(handlerClassName.getProperties());
@@ -1127,12 +1058,12 @@ public class BetterGfacImpl implements GFac,Watcher {
// if these already ran we re-run only recoverable handlers
handler.recover(jobExecutionContext);
}
- GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+ GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
} catch (ClassNotFoundException e) {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1142,7 +1073,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1152,7 +1083,7 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
@@ -1163,13 +1094,11 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
throw new GFacException("Error Executing a OutFlow Handler", e);
- }finally {
- closeZK(jobExecutionContext);
}
}
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
@@ -1214,23 +1143,14 @@ public class BetterGfacImpl implements GFac,Watcher {
return registry;
}
- public ZooKeeper getZk() {
- return zk;
- }
-
- public void setZk(ZooKeeper zk) {
- this.zk = zk;
- }
-
-
- public boolean isCancelled(JobExecutionContext executionContext){
+ public boolean isCancelled(JobExecutionContext executionContext) {
// we should check whether experiment is cancelled using registry
try {
- ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
- if (status != null){
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null) {
ExperimentState experimentState = status.getExperimentState();
- if (experimentState != null){
- if(experimentState == ExperimentState.CANCELED){
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELED) {
return true;
}
}
@@ -1241,14 +1161,14 @@ public class BetterGfacImpl implements GFac,Watcher {
return false;
}
- public boolean isCancelling(JobExecutionContext executionContext){
+ public boolean isCancelling(JobExecutionContext executionContext) {
// check whether cancelling request came
try {
- ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
- if (status != null){
+ ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+ if (status != null) {
ExperimentState experimentState = status.getExperimentState();
- if (experimentState != null){
- if(experimentState == ExperimentState.CANCELING){
+ if (experimentState != null) {
+ if (experimentState == ExperimentState.CANCELING) {
return true;
}
}
@@ -1276,43 +1196,4 @@ public class BetterGfacImpl implements GFac,Watcher {
return false;
}
- public void process(WatchedEvent watchedEvent) {
- log.info(watchedEvent.getPath());
- if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
- // node data is changed, this means node is cancelled.
- log.info("Experiment is cancelled with this path:" + watchedEvent.getPath());
- }
- synchronized (mutex) {
- Event.KeeperState state = watchedEvent.getState();
- log.info(state.name());
- switch (state) {
- case SyncConnected:
- mutex.notify();
- break;
- case Expired:
- try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- }
-// synchronized (mutex) {
-// mutex.wait(5000); // waiting for the syncConnected event
-// }
- case Disconnected:
-// try {
-// zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-// } catch (IOException e) {
-// log.error(e.getMessage(), e);
-// } catch (ApplicationSettingsException e) {
-// log.error(e.getMessage(), e);
-// }
-// synchronized (mutex) {
-// mutex.wait(5000); // waiting for the syncConnected event
-// }
- log.info("ZK Connection is " + state.toString());
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index ee28c1d..eef0a33 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -48,7 +48,7 @@ public abstract class AbstractHandler implements GFacHandler {
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
try {
- GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+ GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
} catch (Exception e) {
logger.error("Error saving Recoverable provider state", e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 58ef855..c6ada52 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -34,7 +34,7 @@ public class AppDescriptorCheckHandler implements GFacHandler {
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
logger.info("Invoking ApplicationDescriptorCheckHandler ...");
try {
- GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+ GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
} catch (Exception e) {
logger.info("Error saving plugin status to ZK");
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 836e2c6..84d72fa 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -20,31 +20,28 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
+import com.google.common.eventbus.Subscribe;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProducer;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
- private ZooKeeper zk;
+ private CuratorFramework curatorClient;
private static Integer mutex = -1;
@@ -57,39 +54,20 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
+ File.separator + statusChangeRequest.getMonitorID().getExperimentID();
Stat exists = null;
if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState()))) {
- try {
- if (!zk.getState().isConnected()) {
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
- logger.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000);
- }
- }
- exists = zk.exists(experimentPath, false);
- if (exists == null) {
- logger.error("ZK path: " + experimentPath + " does not exists !!");
- logger.error("Zookeeper is in an inconsistent state !!! ");
- return;
- }
- } catch (KeeperException e) {
- logger.error("Error while updating zk", e);
- throw new Exception(e.getMessage(), e);
- } catch (InterruptedException e) {
- logger.error("Error while updating zk", e);
- throw new Exception(e.getMessage(), e);
- } catch (IOException e) {
- logger.error("Error while updating zk", e);
- throw new Exception(e.getMessage(), e);
+ exists = curatorClient.checkExists().forPath(experimentPath);
+ if (exists == null) {
+ logger.error("ZK path: " + experimentPath + " does not exists !!");
+ return;
}
- Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
if (state == null) {
// state znode has to be created
- zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
+ forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
} else {
- zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+ curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
}
}
switch (statusChangeRequest.getState()) {
@@ -107,8 +85,8 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
public void setup(Object... configurations) {
for (Object configuration : configurations) {
- if (configuration instanceof ZooKeeper) {
- this.zk = (ZooKeeper) configuration;
+ if (configuration instanceof CuratorFramework) {
+ this.curatorClient = (CuratorFramework) configuration;
}
}
}