You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/11 21:52:25 UTC
airavata git commit: saving data in zookeeper when terminate
Repository: airavata
Updated Branches:
refs/heads/master e9a451dc0 -> 4a978d4f1
saving data in zookeeper when terminate
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a978d4f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a978d4f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a978d4f
Branch: refs/heads/master
Commit: 4a978d4f1347aa683189f7ea5e814d125daaecb1
Parents: e9a451d
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Mon May 11 15:52:17 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Mon May 11 15:52:17 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/server/GfacServerHandler.java | 11 +++
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 84 +++++++-------------
2 files changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 76497ba..b90c731 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -403,12 +403,23 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
+ GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk);
+ AiravataZKUtils.getExpStatePath(event.getExperimentId());
cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
System.out.println(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getType());
} catch (TException e) {
logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ } catch (ApplicationSettingsException e) {
+ logger.error(e.getMessage(), e);
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+ } catch (KeeperException e) {
+ logger.error(e.getMessage(), e);
+ rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 6eeef28..dd82fa7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -564,97 +564,69 @@ public class BetterGfacImpl implements GFac,Watcher {
}
private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
- // We need to check whether this job is submitted as a part of a large workflow. If yes,
- // we need to setup workflow tracking listener.
try {
- // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node
-// String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
-// int stateVal = 0;
-// if(expPath != null){
-// Stat exists = zk.exists(expPath + File.separator + "operation", false);
-// zk.getData(expPath + File.separator + "operation", this, exists);
-// stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
-// }
+ // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node
+ String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
+ Stat exists = zk.exists(expPath + File.separator + "operation", false);
+ zk.getData(expPath + File.separator + "operation", this, exists);
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status
String workflowInstanceID = null;
if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
- // This mean we need to register workflow tracking listener.
//todo implement WorkflowTrackingListener properly
-// registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
}
// Register log event listener. This is required in all scenarios.
jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-// if (stateVal < 2) {
-// // In this scenario We do everything from the beginning
-// log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " +
-// " and stop the execution chain");
-// } else if (stateVal >= 8) {
-// log.error("This experiment is almost finished, so cannot cancel this experiment");
-// ZKUtil.deleteRecursive(zk,
-// AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID()));
-// } else {
+ if (isNewJob(gfacExpState)) {
+ log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " +
+ " and stop the execution chain");
+ } else if (isCompletedJob(gfacExpState)) {
+ log.error("This experiment is almost finished, so cannot cancel this experiment");
+ ZKUtil.deleteRecursive(zk,
+ AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+ } else {
log.info("Job is in a position to perform a proper cancellation");
try {
Scheduler.schedule(jobExecutionContext);
-
invokeProviderCancel(jobExecutionContext);
-
} catch (Exception e) {
try {
// we make the experiment as failed due to exception scenario
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- // monitorPublisher.publish(new
- // ExperimentStatusChangedEvent(new
- // ExperimentIdentity(jobExecutionContext.getExperimentID()),
- // ExperimentState.FAILED));
- // Updating the task status if there's any task associated
- // monitorPublisher.publish(new TaskStatusChangeRequest(
- // new TaskIdentity(jobExecutionContext.getExperimentID(),
- // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- // jobExecutionContext.getTaskData().getTaskID()),
- // TaskState.FAILED
- // ));
JobStatusChangeRequestEvent changeRequestEvent = new JobStatusChangeRequestEvent();
changeRequestEvent.setState(JobState.FAILED);
JobIdentifier jobIdentifier = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
- jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
+ jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
changeRequestEvent.setJobIdentity(jobIdentifier);
monitorPublisher.publish(changeRequestEvent);
} catch (NullPointerException e1) {
log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
- //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
// Updating the task status if there's any task associated
monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED,
- new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID())));
+ new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID())));
}
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
}
-// }
+ }
return true;
-// } catch (ApplicationSettingsException e) {
-// log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-// throw new GFacException(e.getMessage(), e);
-// } catch (KeeperException e) {
-// log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-// throw new GFacException(e.getMessage(), e);
- } catch (Exception e) {
- log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
- throw new GFacException(e.getMessage(), e);
- }finally {
- closeZK(jobExecutionContext);
+ }catch(Exception e){
+ log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+ throw new GFacException(e.getMessage(), e);
+ }finally{
+ closeZK(jobExecutionContext);
+ }
}
- }
private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider