You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/25 21:05:44 UTC
[1/2] airavata git commit: Fixed issues with cancel experiment
operation
Repository: airavata
Updated Branches:
refs/heads/develop 42f071cab -> a6b1785a3
Fixed issues with cancel experiment operation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/497178de
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/497178de
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/497178de
Branch: refs/heads/develop
Commit: 497178dea0deb5b9e8911c6bb869da4b32facdd1
Parents: bb5be4b
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 25 15:05:19 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 25 15:05:19 2015 -0500
----------------------------------------------------------------------
.../airavata/gfac/core/monitor/JobMonitor.java | 6 +++
.../airavata/gfac/impl/GFacEngineImpl.java | 28 ++++++-------
.../apache/airavata/gfac/impl/GFacWorker.java | 15 -------
.../impl/task/DefaultJobSubmissionTask.java | 17 ++++++--
.../impl/watcher/CancelRequestWatcherImpl.java | 37 ++++++++++++-----
.../gfac/monitor/email/EmailBasedMonitor.java | 42 ++++++++++++++++----
6 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
index 4b2ecb2..5e83feb 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobMonitor.java
@@ -40,4 +40,10 @@ public interface JobMonitor {
* Return <code>true</code> if jobId is already monitoring by this Monitor, <code>false</code> if not
*/
boolean isMonitoring(String jobId);
+
+ /**
+ * make monitor service aware of cancelled jobs, in case job monitor details doesn't comes withing predefine time
+ * it will move job to CANCELED state and call output
+ */
+ void canceledJob(String jobId);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index db2fb50..380349e 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -255,11 +255,7 @@ public class GFacEngineImpl implements GFacEngine {
processContext.setProcessStatus(status);
GFacUtils.saveAndPublishProcessStatus(processContext);
executeJobSubmission(taskContext, processContext.isRecovery());
- // checkpoint
- if (processContext.isInterrupted()) {
- GFacUtils.handleProcessInterrupt(processContext);
- return;
- }
+ // Don't put any checkpoint in between JobSubmission and Monitoring tasks
JobStatus jobStatus = processContext.getJobModel().getJobStatus();
if (jobStatus != null && (jobStatus.getJobState() == JobState.SUBMITTED
@@ -583,9 +579,8 @@ public class GFacEngineImpl implements GFacEngine {
@Override
public void cancelProcess(ProcessContext processContext) throws GFacException {
if (processContext != null) {
- processContext.setCancel(true);
switch (processContext.getProcessState()) {
- case MONITORING:
+ case MONITORING: case EXECUTING:
// get job submission task and invoke cancel
JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
TaskContext taskCtx = getJobSubmissionTaskContext(processContext);
@@ -614,16 +609,21 @@ public class GFacEngineImpl implements GFacEngine {
try {
JobStatus oldJobStatus = jSTask.cancel(taskContext);
- if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
- JobMonitor monitorService = Factory.getMonitorService(taskContext.getParentProcessContext().getMonitorMode());
- monitorService.stopMonitor(taskContext.getParentProcessContext().getJobModel().getJobId(), true);
+/* if (oldJobStatus != null && oldJobStatus.getJobState() == JobState.QUEUED) {
+ ProcessContext pc = taskContext.getParentProcessContext();
JobStatus newJobStatus = new JobStatus(JobState.CANCELED);
newJobStatus.setReason("Job cancelled");
newJobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- taskContext.getParentProcessContext().getJobModel().setJobStatus(newJobStatus);
- GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), taskContext.getParentProcessContext()
- .getJobModel());
- }
+ pc.getJobModel().setJobStatus(newJobStatus);
+ GFacUtils.saveJobStatus(pc, pc.getJobModel());
+ JobMonitor monitorService = Factory.getMonitorService(pc.getMonitorMode());
+ monitorService.stopMonitor(pc.getJobModel().getJobId(), true);
+ }*/
+
+ ProcessContext pc = taskContext.getParentProcessContext();
+ JobMonitor monitorService = Factory.getMonitorService(pc.getMonitorMode());
+ monitorService.canceledJob(pc.getJobModel().getJobId());
+
} catch (TaskException e) {
throw new GFacException("Error while cancelling job");
} catch (AiravataException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 970cbf0..596baab 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -193,21 +193,6 @@ public class GFacWorker implements Runnable {
}
private void recoverProcess() throws GFacException {
-
- String taskDag = processContext.getProcessModel().getTaskDag();
- List<String> taskExecutionOrder = GFacUtils.parseTaskDag(taskDag);
- processContext.setTaskExecutionOrder(taskExecutionOrder);
- Map<String, TaskModel> taskMap = processContext.getTaskMap();
- String recoverTaskId = null;
- for (String taskId : taskExecutionOrder) {
- TaskModel taskModel = taskMap.get(taskId);
- TaskState state = taskModel.getTaskStatus().getState();
- if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
- recoverTaskId = taskId;
- break;
- }
- }
-
engine.recoverProcess(processContext);
if (processContext.isInterrupted()) {
return;
http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index 9ccaa94..d435b5f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -34,11 +34,9 @@ import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.core.experiment.catalog.model.Process;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -262,6 +260,17 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
JobModel jobModel = processContext.getJobModel();
int retryCount = 0;
if (jobModel != null) {
+ if (processContext.getProcessState() == ProcessState.EXECUTING) {
+ while (jobModel.getJobId() == null) {
+ log.info("Cancellation pause until process get jobId");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
try {
JobStatus oldJobStatus = remoteCluster.getJobStatus(jobModel.getJobId());
while (oldJobStatus == null && retryCount <= 5) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index 8a2dce3..595380f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -21,6 +21,7 @@
package org.apache.airavata.gfac.impl.watcher;
import org.apache.airavata.common.utils.ZkConstants;
+import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
import org.apache.airavata.gfac.impl.Factory;
@@ -35,6 +36,7 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
private static final Logger log = LoggerFactory.getLogger(CancelRequestWatcherImpl.class);
private final String processId;
private final String experimentId;
+ private final int max_retry = 3;
public CancelRequestWatcherImpl(String experimentId, String processId) {
this.experimentId = experimentId;
@@ -47,20 +49,13 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
String path = watchedEvent.getPath();
Watcher.Event.EventType type = watchedEvent.getType();
CuratorFramework curatorClient = Factory.getCuratorClient();
+ log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>> cancel watcher triggered process id {}<<<<<<<<<<<<<<<<<<<<<<<<<<<", processId);
switch (type) {
case NodeDataChanged:
byte[] bytes = curatorClient.getData().forPath(path);
String action = new String(bytes);
if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
- ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
- if (processContext != null) {
- processContext.setCancel(true);
- Factory.getGFacEngine().cancelProcess(processContext);
- log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
- } else {
- log.info("expId: {}, Cancel request came for processId {} but couldn't find process context",
- experimentId, processId);
- }
+ cancelProcess(0);
} else {
curatorClient.getData().usingWatcher(this).forPath(path);
}
@@ -88,4 +83,28 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
break;
}
}
+
+ private void cancelProcess(int retryAttempt) throws GFacException {
+ ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
+ if (processContext != null) {
+ processContext.setCancel(true);
+ log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>> calling process cancelling operation <<<<<<<<<<<<<<<<<<<<<<<<<<<");
+ Factory.getGFacEngine().cancelProcess(processContext);
+ log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
+ } else {
+ if (retryAttempt < max_retry) {
+ log.info("expId: {}, Cancel request came for processId {} but couldn't find process context. " +
+ "retry in {} ms ", experimentId, processId, retryAttempt);
+ try {
+ Thread.sleep(retryAttempt++*1000);
+ } catch (InterruptedException e) {
+ // ignore we don't care this exception.
+ }
+ cancelProcess(retryAttempt);
+ } else {
+ log.info("expId: {}, Cancel request came for processId {} but couldn't find process context.",
+ experimentId, processId);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/497178de/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 7e9e505..64b9be7 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -51,12 +51,7 @@ import javax.mail.Session;
import javax.mail.Store;
import javax.mail.search.FlagTerm;
import javax.mail.search.SearchTerm;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class EmailBasedMonitor implements JobMonitor, Runnable{
@@ -77,9 +72,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>();
private Map<String, ResourceJobManagerType> addressMap = new HashMap<>();
private Message[] flushUnseenMessages;
+ private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
+ private Timer timer;
- public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException {
+ public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException {
init();
populateAddressAndParserMap(resourceConfigs);
}
@@ -96,6 +93,9 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
properties = new Properties();
properties.put("mail.store.protocol", storeProtocol);
+ timer = new Timer("CancelJobHandler", true);
+ long period = 1000*60*5; // five minute delay between successive task executions.
+ timer.schedule(new CancelTimerTask(), 0 , period);
}
private void populateAddressAndParserMap(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException {
@@ -142,6 +142,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
return jobMonitorMap.containsKey(jobId);
}
+ @Override
+ public void canceledJob(String jobId) {
+ canceledJobs.put(jobId, Boolean.FALSE);
+ }
+
private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
Address fromAddress = message.getFrom()[0];
String addressStr = fromAddress.toString();
@@ -309,6 +314,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
JobStatus jobStatus = new JobStatus();
JobModel jobModel = taskContext.getParentProcessContext().getJobModel();
String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", JobId : " + jobStatusResult.getJobId();
+ canceledJobs.remove(jobStatusResult.getJobId());
// TODO - Handle all other valid JobStates
if (resultState == JobState.COMPLETE) {
jobMonitorMap.remove(jobStatusResult.getJobId());
@@ -338,7 +344,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails);
}else if (resultState == JobState.CANCELED) {
jobMonitorMap.remove(jobStatusResult.getJobId());
- jobStatus.setJobState(JobState.CANCELED);
+ jobStatus.setJobState(JobState.CANCELED);
jobStatus.setReason("Canceled email received");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
log.info("[EJM]: Job canceled mail received, removed job from job monitoring. " + jobDetails);
@@ -395,5 +401,25 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
this.monitorStartDate = date;
}
+ private class CancelTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ if (!canceledJobs.isEmpty()) {
+ Iterator<Map.Entry<String, Boolean>> cancelJobIter = canceledJobs.entrySet().iterator();
+ while (cancelJobIter.hasNext()) {
+ Map.Entry<String, Boolean> cancelJobIdWithFlag = cancelJobIter.next();
+ if (!cancelJobIdWithFlag.getValue()) {
+ cancelJobIdWithFlag.setValue(Boolean.TRUE);
+ } else {
+ TaskContext taskContext = jobMonitorMap.get(cancelJobIdWithFlag.getKey());
+ if (taskContext != null) {
+ stopMonitor(cancelJobIdWithFlag.getKey(), true);
+ }
+ cancelJobIter.remove();
+ }
+ }
+ }
+ }
+ }
}
[2/2] airavata git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/airavata into develop
Posted by sh...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/airavata into develop
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a6b1785a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a6b1785a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a6b1785a
Branch: refs/heads/develop
Commit: a6b1785a3ca3998e766a64e3bfb1301f7ba2a7c2
Parents: 497178d 42f071c
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 25 15:05:30 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 25 15:05:30 2015 -0500
----------------------------------------------------------------------
.../model/util/ExperimentModelUtil.java | 1 +
.../store/cpi/CredentialStoreService.java | 2 +-
.../store/datamodel/CertificateCredential.java | 2 +-
.../store/datamodel/CommunityUser.java | 2 +-
.../store/datamodel/PasswordCredential.java | 2 +-
.../store/datamodel/SSHCredential.java | 70 +++++++++++---------
.../exception/CredentialStoreException.java | 2 +-
.../credentialStoreDataModel.thrift | 2 +-
.../airavata/gfac/impl/GFacEngineImpl.java | 12 +++-
.../gfac/impl/task/SCPDataStageTask.java | 14 ++++
.../catalog/impl/ExperimentRegistry.java | 2 +
.../core/experiment/catalog/model/Process.java | 10 +++
.../catalog/model/UserConfigurationData.java | 2 +-
.../catalog/resources/ProcessResource.java | 10 +++
.../UserConfigurationDataResource.java | 1 +
.../experiment/catalog/resources/Utils.java | 2 +
.../utils/ThriftDataModelConversion.java | 2 +
.../src/main/resources/expcatalog-derby.sql | 2 +
.../src/main/resources/expcatalog-mysql.sql | 3 +
19 files changed, 103 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a6b1785a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------