You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/11 21:52:25 UTC

airavata git commit: saving data in zookeeper when terminate

Repository: airavata
Updated Branches:
  refs/heads/master e9a451dc0 -> 4a978d4f1


saving data in zookeeper when terminate


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a978d4f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a978d4f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a978d4f

Branch: refs/heads/master
Commit: 4a978d4f1347aa683189f7ea5e814d125daaecb1
Parents: e9a451d
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Mon May 11 15:52:17 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Mon May 11 15:52:17 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java | 11 +++
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 84 +++++++-------------
 2 files changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 76497ba..b90c731 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -403,12 +403,23 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
+                    GFacUtils.setExperimentCancel(event.getExperimentId(), event.getTaskId(), zk);
+                    AiravataZKUtils.getExpStatePath(event.getExperimentId());
                     cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
                     System.out.println(" Message Received with message id '" + message.getMessageId()
                             + "' and with message type '" + message.getType());
                 } catch (TException e) {
                     logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
                     rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage(), e);
+                    rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                } catch (ApplicationSettingsException e) {
+                    logger.error(e.getMessage(), e);
+                    rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                } catch (KeeperException e) {
+                    logger.error(e.getMessage(), e);
+                    rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 6eeef28..dd82fa7 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -564,97 +564,69 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
-        // We need to check whether this job is submitted as a part of a large workflow. If yes,
-        // we need to setup workflow tracking listener.
         try {
-        	// we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node
-//            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
-//            int stateVal = 0;
-//            if(expPath != null){
-//            Stat exists = zk.exists(expPath + File.separator + "operation", false);
-//            zk.getData(expPath + File.separator + "operation", this, exists);
-//            stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
-//            }
+            // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node
+            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
+            Stat exists = zk.exists(expPath + File.separator + "operation", false);
+            zk.getData(expPath + File.separator + "operation", this, exists);
+            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // immediately we get the request we update the status
             String workflowInstanceID = null;
             if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
-                // This mean we need to register workflow tracking listener.
                 //todo implement WorkflowTrackingListener properly
-//                registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-//            if (stateVal < 2) {
-//                // In this scenario We do everything from the beginning
-//                log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " +
-//                        " and stop the execution chain");
-//            } else if (stateVal >= 8) {
-//                log.error("This experiment is almost finished, so cannot cancel this experiment");
-//                ZKUtil.deleteRecursive(zk,
-//                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID()));
-//            } else {
+            if (isNewJob(gfacExpState)) {
+                log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " +
+                        " and stop the execution chain");
+            } else if (isCompletedJob(gfacExpState)) {
+                log.error("This experiment is almost finished, so cannot cancel this experiment");
+                ZKUtil.deleteRecursive(zk,
+                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+            } else {
                 log.info("Job is in a position to perform a proper cancellation");
                 try {
                     Scheduler.schedule(jobExecutionContext);
-
                     invokeProviderCancel(jobExecutionContext);
-
                 } catch (Exception e) {
                     try {
                         // we make the experiment as failed due to exception scenario
                         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                        // monitorPublisher.publish(new
-                        // ExperimentStatusChangedEvent(new
-                        // ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                        // ExperimentState.FAILED));
-                        // Updating the task status if there's any task associated
-                        // monitorPublisher.publish(new TaskStatusChangeRequest(
-                        // new TaskIdentity(jobExecutionContext.getExperimentID(),
-                        // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        // jobExecutionContext.getTaskData().getTaskID()),
-                        // TaskState.FAILED
-                        // ));
                         JobStatusChangeRequestEvent changeRequestEvent = new JobStatusChangeRequestEvent();
                         changeRequestEvent.setState(JobState.FAILED);
                         JobIdentifier jobIdentifier = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
-                                                                        jobExecutionContext.getTaskData().getTaskID(),
-                                                                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                                                        jobExecutionContext.getExperimentID(),
-                                                                        jobExecutionContext.getGatewayID());
+                                jobExecutionContext.getTaskData().getTaskID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getGatewayID());
                         changeRequestEvent.setJobIdentity(jobIdentifier);
                         monitorPublisher.publish(changeRequestEvent);
                     } catch (NullPointerException e1) {
                         log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
                                 + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-                        //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
                         // Updating the task status if there's any task associated
                         monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED,
-                                                                                  new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                                                                                                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                                                                                     jobExecutionContext.getExperimentID(),
-                                                                                                     jobExecutionContext.getGatewayID())));
+                                new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                        jobExecutionContext.getExperimentID(),
+                                        jobExecutionContext.getGatewayID())));
 
                     }
                     jobExecutionContext.setProperty(ERROR_SENT, "true");
                     jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
                     throw new GFacException(e.getMessage(), e);
                 }
-//            }
+            }
             return true;
-//        } catch (ApplicationSettingsException e) {
-//            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-//            throw new GFacException(e.getMessage(), e);
-//        } catch (KeeperException e) {
-//            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-//            throw new GFacException(e.getMessage(), e);
-        } catch (Exception e) {
-            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-            throw new GFacException(e.getMessage(), e);
-        }finally {
-            closeZK(jobExecutionContext);
+            }catch(Exception e){
+                log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+                throw new GFacException(e.getMessage(), e);
+            }finally{
+                closeZK(jobExecutionContext);
+            }
         }
-    }
 
 	private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
 		// Scheduler will decide the execution flow of handlers and provider