You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/24 21:32:29 UTC

git commit: fixing a bug in completed job handling

Repository: airavata
Updated Branches:
  refs/heads/master 3f8986881 -> 21c9a9266


fixing a bug in completed job handling


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/21c9a926
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/21c9a926
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/21c9a926

Branch: refs/heads/master
Commit: 21c9a9266c9a509ba50990077b14d6f21fc5b619
Parents: 3f89868
Author: lahiru <la...@apache.org>
Authored: Wed Sep 24 15:32:19 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Sep 24 15:32:19 2014 -0400

----------------------------------------------------------------------
 .../gfac/monitor/impl/pull/qstat/HPCPullMonitor.java  | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/21c9a926/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 34a6065..bac27bf 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -160,7 +160,7 @@ public class HPCPullMonitor extends PullMonitor {
         HostDescription currentHostDescription = null;
         try {
             take = this.queue.take();
-            List<MonitorID> completedJobs = new ArrayList<MonitorID>();
+            Map<String,MonitorID> completedJobs = new HashMap<String,MonitorID>();
             List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
             for (HostMonitorData iHostMonitorData : hostMonitorData) {
                 if (iHostMonitorData.getHost().getType() instanceof GsisshHostType
@@ -191,7 +191,7 @@ public class HPCPullMonitor extends PullMonitor {
                             if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
                                 logger.info("Found a match in monitoring Queue, so marking this job to remove from monitor queue " + cancelMId);
                                 logger.info("ExperimentID: " + cancelMId.split("\\+")[0] + ",TaskID: " + cancelMId.split("\\+")[1] + "JobID" + iMonitorID.getJobID());
-                                completedJobs.add(iMonitorID);
+                                completedJobs.put(iMonitorID.getJobName(), iMonitorID);
                                 iMonitorID.setStatus(JobState.CANCELED);
                                 iterator1.remove();
                                 break;
@@ -207,7 +207,7 @@ public class HPCPullMonitor extends PullMonitor {
                                  completeId = iterator.next();
                                 if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
                                     logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
-                                    completedJobs.add(iMonitorID);
+                                    completedJobs.put(iMonitorID.getJobName(), iMonitorID);
                                     iMonitorID.setStatus(JobState.COMPLETE);
                                     break;
                                 }
@@ -229,7 +229,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
                         }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
-                            completedJobs.add(iMonitorID);
+                            completedJobs.put(iMonitorID.getJobName(), iMonitorID);
                         }
                         jobStatus = new JobStatusChangeRequest(iMonitorID);
                         // we have this JobStatus class to handle amqp monitoring
@@ -242,7 +242,7 @@ public class HPCPullMonitor extends PullMonitor {
                             logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" + iMonitorID.getFailedCount() +
                                     " 3 times, so skip this Job from Monitor");
                             iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
-                            completedJobs.add(iMonitorID);
+                            completedJobs.put(iMonitorID.getJobName(), iMonitorID);
                             try {
                                 logger.error("Launching outflow handlers to check output are genereated or not");
                                 gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
@@ -269,7 +269,9 @@ public class HPCPullMonitor extends PullMonitor {
             // they become empty
             Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>();
             ZooKeeper zk = null;
-            for (MonitorID completedJob : completedJobs) {
+            Set<String> keys = completedJobs.keySet();
+            for (String jobName: keys) {
+                MonitorID completedJob = completedJobs.get(jobName);
                 GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher));
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
                 if (zk == null) {