You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/08/20 08:32:45 UTC

git commit: fixing stampede job cancel issue

Repository: airavata
Updated Branches:
  refs/heads/master 83cb4bf21 -> 4fbe57dac


fixing stampede job cancel issue


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4fbe57da
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4fbe57da
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4fbe57da

Branch: refs/heads/master
Commit: 4fbe57dac658f0b24141c85f43ae41c8d1feaa44
Parents: 83cb4bf
Author: lahiru <la...@apache.org>
Authored: Wed Aug 20 12:02:18 2014 +0530
Committer: lahiru <la...@apache.org>
Committed: Wed Aug 20 12:02:18 2014 +0530

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 24 +++++++++----
 .../gsissh/provider/impl/GSISSHProvider.java    |  4 +--
 .../handlers/GridPullMonitorHandler.java        | 36 +++++++++++++++----
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 24 +++++++++++++
 .../gfac/ssh/provider/impl/SSHProvider.java     | 38 +++++++++++++++++++-
 .../gsi/ssh/impl/GSISSHAbstractCluster.java     | 17 +++++----
 6 files changed, 121 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 9415625..b917542 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -689,8 +689,14 @@ public class BetterGfacImpl implements GFac,Watcher {
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
 			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
 			throw new GFacException(e.getMessage(), e);
-		}
-	}
+		}finally {
+            try {
+                zk.close();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
 
 	private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
 		// Scheduler will decide the execution flow of handlers and provider
@@ -756,8 +762,14 @@ public class BetterGfacImpl implements GFac,Watcher {
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
 			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
 			throw new GFacException(e.getMessage(), e);
-		}
-	}
+		}finally {
+            try {
+                zk.close();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
 
     private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
@@ -1176,8 +1188,8 @@ public class BetterGfacImpl implements GFac,Watcher {
     public void process(WatchedEvent watchedEvent) {
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
-            System.out.println(watchedEvent.getPath());
-            System.out.println("Experiment is cancelled with this path");
+            log.info("Experiment is cancelled with this path:");
+            log.info(watchedEvent.getPath());
             this.cancelled = true;
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index d6981f3..28c792d 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -184,7 +184,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
             if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
                 pullMonitorHandler = threadedHandler;
                 if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
-                    log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
                     jobExecutionContext.setProperty("cancel","true");
                     pullMonitorHandler.invoke(jobExecutionContext);
                 } else {
@@ -194,7 +193,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
             } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
                 pushMonitorHandler = threadedHandler;
                 if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
-                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
                     pushMonitorHandler.invoke(jobExecutionContext);
                 } else {
                     log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
@@ -218,7 +216,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
         log.info("canceling the job status in GSISSHProvider!!!!!");
         HostDescriptionType host = jobExecutionContext.getApplicationContext().
                 getHostDescription().getType();
-        StringBuffer data = new StringBuffer();
         JobDetails jobDetails = jobExecutionContext.getJobDetails();
         try {
             Cluster cluster = null;
@@ -242,6 +239,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
                 log.error("No Job Id is set, so cannot perform the cancel operation !!!");
                 return;
             }
+            removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID());
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
             // we know this host is type GsiSSHHostType
         } catch (SSHApiException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 3899538..5cd929d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -28,15 +28,22 @@ import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.monitor.HPCMonitorID;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.Properties;
 
 /**
@@ -45,7 +52,7 @@ import java.util.Properties;
  * commands like qstat,squeue and this supports sun grid enging monitoring too
  * which is a slight variation of qstat monitoring.
  */
-public class GridPullMonitorHandler extends ThreadedHandler {
+public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
     private final static Logger logger = LoggerFactory.getLogger(GridPullMonitorHandler.class);
 
     private HPCPullMonitor hpcPullMonitor;
@@ -83,6 +90,19 @@ public class GridPullMonitorHandler extends ThreadedHandler {
             if ("true".equals(jobExecutionContext.getProperty("cancel"))) {
                 removeJobFromMonitoring(jobExecutionContext);
             } else {
+                ZooKeeper zk = jobExecutionContext.getZk();
+                try {
+                    String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk);
+                    String path = experimentEntry + File.separator + "operation";
+                    Stat exists = zk.exists(path, this);
+                    if(exists != null){
+                        zk.getData(path,this,exists); // watching the operations node
+                    }
+                } catch (KeeperException e) {
+                    e.printStackTrace();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
                 CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
             }
         } catch (AiravataMonitorException e) {
@@ -92,11 +112,7 @@ public class GridPullMonitorHandler extends ThreadedHandler {
 
     public void removeJobFromMonitoring(JobExecutionContext jobExecutionContext)throws GFacHandlerException {
         MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
-        try {
-            CommonUtils.removeMonitorFromQueue(hpcPullMonitor.getQueue(),monitorID);
-        } catch (AiravataMonitorException e) {
-            throw new GFacHandlerException(e);
-        }
+        hpcPullMonitor.getCancelJobList().add(monitorID);
     }
     public AuthenticationInfo getAuthenticationInfo() {
         return authenticationInfo;
@@ -117,4 +133,12 @@ public class GridPullMonitorHandler extends ThreadedHandler {
     public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         this.removeJobFromMonitoring(jobExecutionContext);
     }
+
+    public void process(WatchedEvent watchedEvent) {
+        if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
+            // node data is changed, this means node is cancelled.
+            logger.info("Experiment is cancelled with this path:");
+            logger.info(watchedEvent.getPath());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2fea154..a2ead4d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -71,6 +71,8 @@ public class HPCPullMonitor extends PullMonitor {
 
     private MonitorPublisher publisher;
 
+    private List<MonitorID> cancelJobList;
+
 
     private GFac gfac;
 
@@ -80,6 +82,7 @@ public class HPCPullMonitor extends PullMonitor {
         connections = new HashMap<String, ResourceConnection>();
         this.queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = new MonitorPublisher(new EventBus());
+        cancelJobList = new ArrayList<MonitorID>();
     }
 
     public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
@@ -87,12 +90,14 @@ public class HPCPullMonitor extends PullMonitor {
         this.queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = monitorPublisher;
         authenticationInfo = authInfo;
+        cancelJobList = new ArrayList<MonitorID>();
     }
 
     public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
         this.queue = queue;
         this.publisher = publisher;
         connections = new HashMap<String, ResourceConnection>();
+        cancelJobList = new ArrayList<MonitorID>();
     }
 
 
@@ -159,7 +164,18 @@ public class HPCPullMonitor extends PullMonitor {
                         connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
                         connections.put(hostName, connection);
                     }
+                    // before we get the statuses, we check the cancel job list and remove them permanently
                     List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+                    for(MonitorID iMonitorID:monitorID){
+                        for(MonitorID cancelMId:cancelJobList){
+                            if(iMonitorID.getJobID().equals(cancelMId.getJobID())
+                                    && iMonitorID.getExperimentID().equals(cancelMId.getExperimentID())
+                                    && iMonitorID.getTaskID().equals(cancelMId.getTaskID())){
+                                completedJobs.add(iMonitorID);
+                                cancelJobList.remove(cancelMId); // once we found we delte the cancel job, so we don't have to do this check again and again
+                            }
+                        }
+                    }
                     Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
                     for (MonitorID iMonitorID : monitorID) {
                         currentMonitorID = iMonitorID;
@@ -340,4 +356,12 @@ public class HPCPullMonitor extends PullMonitor {
     public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
         this.authenticationInfo = authenticationInfo;
     }
+
+    public List<MonitorID> getCancelJobList() {
+        return cancelJobList;
+    }
+
+    public void setCancelJobList(List<MonitorID> cancelJobList) {
+        this.cancelJobList = cancelJobList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 4db72a4..2ad26a3 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -37,6 +37,7 @@ import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.context.MessageContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
@@ -231,6 +232,7 @@ public class SSHProvider extends AbstractProvider {
                     log.error("No Job Id is set, so cannot perform the cancel operation !!!");
                     return;
                 }
+                removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID());
                 GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
             } catch (SSHApiException e) {
                 String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
@@ -251,7 +253,41 @@ public class SSHProvider extends AbstractProvider {
         }
     }
 
-
+    public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+        List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        String monitorMode = host.getMonitorMode();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+                    jobExecutionContext.setProperty("cancel","true");
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pushMonitorHandler = threadedHandler;
+                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
+        }
+    }
     private File createShellScript(JobExecutionContext context) throws IOException {
         ApplicationDeploymentDescriptionType app = context.getApplicationContext()
                 .getApplicationDeploymentDescription().getType();

http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 855c9dc..3639ddd 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -225,12 +225,17 @@ public class GSISSHAbstractCluster implements Cluster {
         String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission",rawCommandInfo.getBaseCommand(jobManagerConfiguration.getInstalledPath()));
         // this might not be the case for all teh resources, if so Cluster implementation can override this method
         // because here after cancelling we try to get the job description and return it back
-        JobDescriptor jobById = this.getJobDescriptorById(jobID);
-        if (CommonUtils.isJobFinished(jobById)) {
-            log.debug("Job Cancel operation was successful !");
-            return jobById;
-        } else {
-            log.debug("Job Cancel operation was not successful !");
+        try {
+            JobDescriptor jobById = this.getJobDescriptorById(jobID);
+            if (CommonUtils.isJobFinished(jobById)) {
+                log.debug("Job Cancel operation was successful !");
+                return jobById;
+            } else {
+                log.debug("Job Cancel operation was not successful !");
+                return null;
+            }
+        }catch (Exception e){
+            //its ok to fail to get status when the job is gone
             return null;
         }
     }