You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/09/24 20:51:30 UTC
git commit: fixing zk error due to complete jobs is not properly
handled
Repository: airavata
Updated Branches:
refs/heads/master c366adeee -> 3f8986881
fixing zk error due to complete jobs is not properly handled
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3f898688
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3f898688
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3f898688
Branch: refs/heads/master
Commit: 3f89868814759f62a743a118cec6f4a299954b8e
Parents: c366ade
Author: lahiru <la...@apache.org>
Authored: Wed Sep 24 14:51:25 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Sep 24 14:51:25 2014 -0400
----------------------------------------------------------------------
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 79 ++++++++++----------
1 file changed, 38 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/3f898688/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 9fbbb85..34a6065 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -194,6 +194,7 @@ public class HPCPullMonitor extends PullMonitor {
completedJobs.add(iMonitorID);
iMonitorID.setStatus(JobState.CANCELED);
iterator1.remove();
+ break;
}
}
iterator1 = cancelJobList.iterator();
@@ -201,67 +202,62 @@ public class HPCPullMonitor extends PullMonitor {
synchronized (completedJobsFromPush) {
Iterator<String> iterator = completedJobsFromPush.iterator();
for (MonitorID iMonitorID : monitorID) {
+ String completeId = null;
while (iterator.hasNext()) {
- String cancelMId = iterator.next();
- if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
- logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId);
+ completeId = iterator.next();
+ if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
+ logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
completedJobs.add(iMonitorID);
iMonitorID.setStatus(JobState.COMPLETE);
+ break;
}
- //we have to make this empty everytime we iterate, otherwise this list will accumilate and will
+ //we have to make this empty everytime we iterate, otherwise this list will accumulate and will
// lead to a memory leak
- iterator.remove();
+ }
+ if(completeId!=null) {
+ completedJobsFromPush.remove(completeId);
}
iterator = completedJobsFromPush.listIterator();
}
}
Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
- for (MonitorID iMonitorID : monitorID) {
+ Iterator<MonitorID> iterator = monitorID.iterator();
+ while (iterator.hasNext()) {
+ MonitorID iMonitorID = iterator.next();
currentMonitorID = iMonitorID;
if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
!JobState.COMPLETE.equals(iMonitorID.getStatus())) {
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic
- }
-
- String id = iMonitorID.getUserName() + "," + iMonitorID.getJobName();
- if(completedJobsFromPush.contains(id)){
- iMonitorID.setStatus(JobState.COMPLETE);
+ }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
+ completedJobs.add(iMonitorID);
}
jobStatus = new JobStatusChangeRequest(iMonitorID);
- // we have this JobStatus class to handle amqp monitoring
+ // we have this JobStatus class to handle amqp monitoring
- publisher.publish(jobStatus);
- // if the job is completed we do not have to put the job to the queue again
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ publisher.publish(jobStatus);
+ // if the job is completed we do not have to put the job to the queue again
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- // After successful monitoring perform follow ing actions to cleanup the queue, if necessary
- if (jobStatus.getState().equals(JobState.COMPLETE)) {
- if(completedJobs.contains(iMonitorID)) {
- completedJobs.add(iMonitorID);
- }
- // we run all the finished jobs in separate threads, because each job doesn't have to wait until
- // each one finish transfering files
- GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher));
- } else if (iMonitorID.getFailedCount() > FAILED_COUNT) {
- logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
- " 3 times, so skip this Job from Monitor");
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- completedJobs.add(iMonitorID);
- try {
- logger.error("Launching outflow handlers to check output are genereated or not");
- gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
- } catch (GFacException e) {
- publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
- iMonitorID.getTaskID()), TaskState.FAILED));
- logger.info(e.getLocalizedMessage(), e);
- }
- } else {
- // Evey
- iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
- // if the job is complete we remove it from the Map, if any of these maps
- // get empty this userMonitorData will get delete from the queue
+ if (iMonitorID.getFailedCount() > FAILED_COUNT) {
+ logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" + iMonitorID.getFailedCount() +
+ " 3 times, so skip this Job from Monitor");
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ completedJobs.add(iMonitorID);
+ try {
+ logger.error("Launching outflow handlers to check output are genereated or not");
+ gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext());
+ } catch (GFacException e) {
+ publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
+ iMonitorID.getTaskID()), TaskState.FAILED));
+ logger.info(e.getLocalizedMessage(), e);
}
+ } else {
+ // Evey
+ iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
+ // if the job is complete we remove it from the Map, if any of these maps
+ // get empty this userMonitorData will get delete from the queue
}
+ }
} else {
logger.debug("Qstat Monitor doesn't handle non-gsissh hosts");
}
@@ -274,6 +270,7 @@ public class HPCPullMonitor extends PullMonitor {
Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>();
ZooKeeper zk = null;
for (MonitorID completedJob : completedJobs) {
+ GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher));
CommonUtils.removeMonitorFromQueue(queue, completedJob);
if (zk == null) {
zk = completedJob.getJobExecutionContext().getZk();