You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/24 20:51:30 UTC

git commit: fixing zk error due to complete jobs is not properly handled

Repository: airavata
Updated Branches:
  refs/heads/master c366adeee -> 3f8986881


fixing zk error due to complete jobs is not properly handled


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f898688
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f898688
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f898688

Branch: refs/heads/master
Commit: 3f89868814759f62a743a118cec6f4a299954b8e
Parents: c366ade
Author: lahiru <la...@apache.org>
Authored: Wed Sep 24 14:51:25 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Sep 24 14:51:25 2014 -0400

----------------------------------------------------------------------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 79 ++++++++++----------
 1 file changed, 38 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3f898688/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 9fbbb85..34a6065 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -194,6 +194,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 completedJobs.add(iMonitorID);
                                 iMonitorID.setStatus(JobState.CANCELED);
                                 iterator1.remove();
+                                break;
                             }
                         }
                         iterator1 = cancelJobList.iterator();
@@ -201,67 +202,62 @@ public class HPCPullMonitor extends PullMonitor {
                     synchronized (completedJobsFromPush) {
                         Iterator<String> iterator = completedJobsFromPush.iterator();
                         for (MonitorID iMonitorID : monitorID) {
+                            String completeId = null;
                             while (iterator.hasNext()) {
-                                String cancelMId = iterator.next();
-                                if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
-                                    logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId);
+                                 completeId = iterator.next();
+                                if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
+                                    logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
                                     completedJobs.add(iMonitorID);
                                     iMonitorID.setStatus(JobState.COMPLETE);
+                                    break;
                                 }
-                                //we have to make this empty everytime we iterate, otherwise this list will accumilate and will
+                                //we have to make this empty everytime we iterate, otherwise this list will accumulate and will
                                 // lead to a memory leak
-                                iterator.remove();
+                            }
+                            if(completeId!=null) {
+                                completedJobsFromPush.remove(completeId);
                             }
                             iterator = completedJobsFromPush.listIterator();
                         }
                     }
                     Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
-                    for (MonitorID iMonitorID : monitorID) {
+                    Iterator<MonitorID> iterator = monitorID.iterator();
+                    while (iterator.hasNext()) {
+                        MonitorID iMonitorID = iterator.next();
                         currentMonitorID = iMonitorID;
                         if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
                                 !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
-                        }
-
-                        String id = iMonitorID.getUserName() + "," + iMonitorID.getJobName();
-                        if(completedJobsFromPush.contains(id)){
-                            iMonitorID.setStatus(JobState.COMPLETE);
+                        }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
+                            completedJobs.add(iMonitorID);
                         }
                         jobStatus = new JobStatusChangeRequest(iMonitorID);
-                            // we have this JobStatus class to handle amqp monitoring
+                        // we have this JobStatus class to handle amqp monitoring
 
-                            publisher.publish(jobStatus);
-                            // if the job is completed we do not have to put the job to the queue again
-                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                        publisher.publish(jobStatus);
+                        // if the job is completed we do not have to put the job to the queue again
+                        iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
 
-                            // After successful monitoring perform follow   ing actions to cleanup the queue, if necessary
-                            if (jobStatus.getState().equals(JobState.COMPLETE)) {
-                                if(completedJobs.contains(iMonitorID)) {
-                                    completedJobs.add(iMonitorID);
-                                }
-                                // we run all the finished jobs in separate threads, because each job doesn't have to wait until
-                                // each one finish transfering files
-                                GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
-                            } else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
-                                logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
-                                        " 3 times, so skip this Job from Monitor");
-                                iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                                completedJobs.add(iMonitorID);
-                                try {
-                                    logger.error("Launching outflow handlers to check output are genereated or not");
-                                    gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
-                                } catch (GFacException e) {
-                                    publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
-                                            iMonitorID.getTaskID()), TaskState.FAILED));
-                                    logger.info(e.getLocalizedMessage(), e);
-                                }
-                            } else {
-                                // Evey
-                                iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                                // if the job is complete we remove it from the Map, if any of these maps
-                                // get empty this userMonitorData will get delete from the queue
+                        if (iMonitorID.getFailedCount() > FAILED_COUNT) {
+                            logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" + iMonitorID.getFailedCount() +
+                                    " 3 times, so skip this Job from Monitor");
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            completedJobs.add(iMonitorID);
+                            try {
+                                logger.error("Launching outflow handlers to check output are genereated or not");
+                                gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
+                            } catch (GFacException e) {
+                                publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
+                                        iMonitorID.getTaskID()), TaskState.FAILED));
+                                logger.info(e.getLocalizedMessage(), e);
                             }
+                        } else {
+                            // Evey
+                            iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+                            // if the job is complete we remove it from the Map, if any of these maps
+                            // get empty this userMonitorData will get delete from the queue
                         }
+                    }
                 } else {
                     logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
                 }
@@ -274,6 +270,7 @@ public class HPCPullMonitor extends PullMonitor {
             Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>();
             ZooKeeper zk = null;
             for (MonitorID completedJob : completedJobs) {
+                GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher));
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
                 if (zk == null) {
                     zk = completedJob.getJobExecutionContext().getZk();