You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/08/20 08:32:45 UTC
git commit: fixing stampede job cancel issue
Repository: airavata
Updated Branches:
refs/heads/master 83cb4bf21 -> 4fbe57dac
fixing stampede job cancel issue
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4fbe57da
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4fbe57da
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4fbe57da
Branch: refs/heads/master
Commit: 4fbe57dac658f0b24141c85f43ae41c8d1feaa44
Parents: 83cb4bf
Author: lahiru <la...@apache.org>
Authored: Wed Aug 20 12:02:18 2014 +0530
Committer: lahiru <la...@apache.org>
Committed: Wed Aug 20 12:02:18 2014 +0530
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 24 +++++++++----
.../gsissh/provider/impl/GSISSHProvider.java | 4 +--
.../handlers/GridPullMonitorHandler.java | 36 +++++++++++++++----
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 24 +++++++++++++
.../gfac/ssh/provider/impl/SSHProvider.java | 38 +++++++++++++++++++-
.../gsi/ssh/impl/GSISSHAbstractCluster.java | 17 +++++----
6 files changed, 121 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 9415625..b917542 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -689,8 +689,14 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
- }
- }
+ }finally {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider
@@ -756,8 +762,14 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
throw new GFacException(e.getMessage(), e);
- }
- }
+ }finally {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
GFacProvider provider = jobExecutionContext.getProvider();
@@ -1176,8 +1188,8 @@ public class BetterGfacImpl implements GFac,Watcher {
public void process(WatchedEvent watchedEvent) {
if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
// node data is changed, this means node is cancelled.
- System.out.println(watchedEvent.getPath());
- System.out.println("Experiment is cancelled with this path");
+ log.info("Experiment is cancelled with this path:");
+ log.info(watchedEvent.getPath());
this.cancelled = true;
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index d6981f3..28c792d 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -184,7 +184,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
pullMonitorHandler = threadedHandler;
if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
jobExecutionContext.setProperty("cancel","true");
pullMonitorHandler.invoke(jobExecutionContext);
} else {
@@ -194,7 +193,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
} else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
pushMonitorHandler = threadedHandler;
if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
- log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
pushMonitorHandler.invoke(jobExecutionContext);
} else {
log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
@@ -218,7 +216,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
log.info("canceling the job status in GSISSHProvider!!!!!");
HostDescriptionType host = jobExecutionContext.getApplicationContext().
getHostDescription().getType();
- StringBuffer data = new StringBuffer();
JobDetails jobDetails = jobExecutionContext.getJobDetails();
try {
Cluster cluster = null;
@@ -242,6 +239,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
log.error("No Job Id is set, so cannot perform the cancel operation !!!");
return;
}
+ removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID());
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
// we know this host is type GsiSSHHostType
} catch (SSHApiException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 3899538..5cd929d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -28,15 +28,22 @@ import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.monitor.HPCMonitorID;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.Properties;
/**
@@ -45,7 +52,7 @@ import java.util.Properties;
* commands like qstat,squeue and this supports sun grid enging monitoring too
* which is a slight variation of qstat monitoring.
*/
-public class GridPullMonitorHandler extends ThreadedHandler {
+public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
private final static Logger logger = LoggerFactory.getLogger(GridPullMonitorHandler.class);
private HPCPullMonitor hpcPullMonitor;
@@ -83,6 +90,19 @@ public class GridPullMonitorHandler extends ThreadedHandler {
if ("true".equals(jobExecutionContext.getProperty("cancel"))) {
removeJobFromMonitoring(jobExecutionContext);
} else {
+ ZooKeeper zk = jobExecutionContext.getZk();
+ try {
+ String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk);
+ String path = experimentEntry + File.separator + "operation";
+ Stat exists = zk.exists(path, this);
+ if(exists != null){
+ zk.getData(path,this,exists); // watching the operations node
+ }
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
}
} catch (AiravataMonitorException e) {
@@ -92,11 +112,7 @@ public class GridPullMonitorHandler extends ThreadedHandler {
public void removeJobFromMonitoring(JobExecutionContext jobExecutionContext)throws GFacHandlerException {
MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
- try {
- CommonUtils.removeMonitorFromQueue(hpcPullMonitor.getQueue(),monitorID);
- } catch (AiravataMonitorException e) {
- throw new GFacHandlerException(e);
- }
+ hpcPullMonitor.getCancelJobList().add(monitorID);
}
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
@@ -117,4 +133,12 @@ public class GridPullMonitorHandler extends ThreadedHandler {
public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
this.removeJobFromMonitoring(jobExecutionContext);
}
+
+ public void process(WatchedEvent watchedEvent) {
+ if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
+ // node data is changed, this means node is cancelled.
+ logger.info("Experiment is cancelled with this path:");
+ logger.info(watchedEvent.getPath());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 2fea154..a2ead4d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -71,6 +71,8 @@ public class HPCPullMonitor extends PullMonitor {
private MonitorPublisher publisher;
+ private List<MonitorID> cancelJobList;
+
private GFac gfac;
@@ -80,6 +82,7 @@ public class HPCPullMonitor extends PullMonitor {
connections = new HashMap<String, ResourceConnection>();
this.queue = new LinkedBlockingDeque<UserMonitorData>();
publisher = new MonitorPublisher(new EventBus());
+ cancelJobList = new ArrayList<MonitorID>();
}
public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
@@ -87,12 +90,14 @@ public class HPCPullMonitor extends PullMonitor {
this.queue = new LinkedBlockingDeque<UserMonitorData>();
publisher = monitorPublisher;
authenticationInfo = authInfo;
+ cancelJobList = new ArrayList<MonitorID>();
}
public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
this.queue = queue;
this.publisher = publisher;
connections = new HashMap<String, ResourceConnection>();
+ cancelJobList = new ArrayList<MonitorID>();
}
@@ -159,7 +164,18 @@ public class HPCPullMonitor extends PullMonitor {
connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
connections.put(hostName, connection);
}
+ // before we get the statuses, we check the cancel job list and remove them permanently
List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
+ for(MonitorID iMonitorID:monitorID){
+ for(MonitorID cancelMId:cancelJobList){
+ if(iMonitorID.getJobID().equals(cancelMId.getJobID())
+ && iMonitorID.getExperimentID().equals(cancelMId.getExperimentID())
+ && iMonitorID.getTaskID().equals(cancelMId.getTaskID())){
+ completedJobs.add(iMonitorID);
+ cancelJobList.remove(cancelMId); // once we found we delte the cancel job, so we don't have to do this check again and again
+ }
+ }
+ }
Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
for (MonitorID iMonitorID : monitorID) {
currentMonitorID = iMonitorID;
@@ -340,4 +356,12 @@ public class HPCPullMonitor extends PullMonitor {
public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) {
this.authenticationInfo = authenticationInfo;
}
+
+ public List<MonitorID> getCancelJobList() {
+ return cancelJobList;
+ }
+
+ public void setCancelJobList(List<MonitorID> cancelJobList) {
+ this.cancelJobList = cancelJobList;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 4db72a4..2ad26a3 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -37,6 +37,7 @@ import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
@@ -231,6 +232,7 @@ public class SSHProvider extends AbstractProvider {
log.error("No Job Id is set, so cannot perform the cancel operation !!!");
return;
}
+ removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID());
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
@@ -251,7 +253,41 @@ public class SSHProvider extends AbstractProvider {
}
}
-
+ public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+ List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ String monitorMode = host.getMonitorMode();
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+ jobExecutionContext.setProperty("cancel","true");
+ pullMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+ " to handle by the GridPullMonitorHandler");
+ }
+ } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pushMonitorHandler = threadedHandler;
+ if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+ log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
+ pushMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+ " to handle by the GridPushMonitorHandler");
+ }
+ }
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
+ }
+ }
private File createShellScript(JobExecutionContext context) throws IOException {
ApplicationDeploymentDescriptionType app = context.getApplicationContext()
.getApplicationDeploymentDescription().getType();
http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 855c9dc..3639ddd 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -225,12 +225,17 @@ public class GSISSHAbstractCluster implements Cluster {
String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission",rawCommandInfo.getBaseCommand(jobManagerConfiguration.getInstalledPath()));
// this might not be the case for all teh resources, if so Cluster implementation can override this method
// because here after cancelling we try to get the job description and return it back
- JobDescriptor jobById = this.getJobDescriptorById(jobID);
- if (CommonUtils.isJobFinished(jobById)) {
- log.debug("Job Cancel operation was successful !");
- return jobById;
- } else {
- log.debug("Job Cancel operation was not successful !");
+ try {
+ JobDescriptor jobById = this.getJobDescriptorById(jobID);
+ if (CommonUtils.isJobFinished(jobById)) {
+ log.debug("Job Cancel operation was successful !");
+ return jobById;
+ } else {
+ log.debug("Job Cancel operation was not successful !");
+ return null;
+ }
+ }catch (Exception e){
+ //its ok to fail to get status when the job is gone
return null;
}
}