You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/30 22:33:36 UTC
airavata git commit: Fixed consumer unbind issue with cancel
operation and Complete cancel operation support for new changes
Repository: airavata
Updated Branches:
refs/heads/develop a6b1785a3 -> 3d16cbcce
Fixed consumer unbind issue with cancel operation and Complete cancel operation support for new changes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3d16cbcc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3d16cbcc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3d16cbcc
Branch: refs/heads/develop
Commit: 3d16cbccec4cfedf7bbef6ebca3d3bba107e3855
Parents: a6b1785
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Nov 30 16:33:29 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Nov 30 16:33:29 2015 -0500
----------------------------------------------------------------------
.../gfac/core/context/ProcessContext.java | 11 +++++-
.../apache/airavata/gfac/impl/GFacWorker.java | 39 ++++++++++++--------
.../impl/task/DefaultJobSubmissionTask.java | 10 +++--
.../impl/watcher/CancelRequestWatcherImpl.java | 9 ++---
4 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 88bd178..49b323c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -94,8 +94,9 @@ public class ProcessContext {
private boolean complete = false; // all tasks executed?
private boolean recovery = false; // is process in recovery mode?
private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
+ private boolean acknowledge;
- /**
+ /**
* Note: process context property use lazy loading approach. In runtime you will see some properties as null
* unless you have access it previously. Once that property access using the api,it will be set to correct value.
*/
@@ -482,4 +483,12 @@ public class ProcessContext {
public void setStorageResource(StorageResourceDescription storageResource) {
this.storageResource = storageResource;
}
+
+ public void setAcknowledge(boolean acknowledge) {
+ this.acknowledge = acknowledge;
+ }
+
+ public boolean isAcknowledge() {
+ return acknowledge;
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 596baab..c701ed5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,8 +29,6 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.task.TaskModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +36,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;
import java.util.List;
-import java.util.Map;
public class GFacWorker implements Runnable {
@@ -122,8 +119,13 @@ public class GFacWorker implements Runnable {
throw new GFacException("process Id : " + processId + " Couldn't identify process type");
}
if (processContext.isCancel()) {
- sendAck();
- Factory.getGfacContext().removeProcess(processContext.getProcessId());
+ if (processContext.getProcessState() == ProcessState.MONITORING
+ || processContext.getProcessState() == ProcessState.EXECUTING) {
+ // don't send ack if the process is in MONITORING state, wait until cancel email comes to airavata.
+ } else {
+ sendAck();
+ Factory.getGfacContext().removeProcess(processContext.getProcessId());
+ }
}
} catch (GFacException e) {
log.error("GFac Worker throws an exception", e);
@@ -233,16 +235,23 @@ public class GFacWorker implements Runnable {
// }
private void sendAck() {
- try {
- long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
- processContext.getExperimentId(), processId);
- Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
- log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
- processId, processDeliveryTag);
- } catch (Exception e1) {
- String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ",
- processContext .getExperimentId(), processId);
- log.error(format, e1);
+ // this ensure, gfac doesn't send ack more than once for a process. which cause to remove gfac rabbitmq consumer from rabbitmq server.
+ if (!processContext.isAcknowledge()) {
+ try {
+ long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
+ processContext.getExperimentId(), processId);
+ Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+ processContext.setAcknowledge(true);
+ log.info("expId: {}, processId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
+ processId, processDeliveryTag);
+ } catch (Exception e1) {
+ processContext.setAcknowledge(false);
+ String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ",
+ processContext .getExperimentId(), processId);
+ log.error(format, e1);
+ }
+ } else {
+ log.info("expId: {}, processId: {} :- already acknowledged ", processContext.getExperimentId(), processId);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index d435b5f..b9805f8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -36,7 +36,6 @@ import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.*;
import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.core.experiment.catalog.model.Process;
import org.apache.airavata.registry.cpi.AppCatalogException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -48,7 +47,10 @@ import java.util.Map;
public class DefaultJobSubmissionTask implements JobSubmissionTask {
private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
- @Override
+ private static int waitForProcessIdmillis = 5000;
+ private static int pauseTimeInSec = waitForProcessIdmillis / 1000;
+
+ @Override
public void init(Map<String, String> propertyMap) throws TaskException {
}
@@ -262,9 +264,9 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
if (jobModel != null) {
if (processContext.getProcessState() == ProcessState.EXECUTING) {
while (jobModel.getJobId() == null) {
- log.info("Cancellation pause until process get jobId");
+ log.info("Cancellation pause {} secs until process get jobId", pauseTimeInSec);
try {
- Thread.sleep(1000);
+ Thread.sleep(waitForProcessIdmillis);
} catch (InterruptedException e) {
// ignore
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index 595380f..e4a24d3 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -88,15 +88,14 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
if (processContext != null) {
processContext.setCancel(true);
- log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>> calling process cancelling operation <<<<<<<<<<<<<<<<<<<<<<<<<<<");
- Factory.getGFacEngine().cancelProcess(processContext);
- log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
+ log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
+ Factory.getGFacEngine().cancelProcess(processContext);
} else {
if (retryAttempt < max_retry) {
log.info("expId: {}, Cancel request came for processId {} but couldn't find process context. " +
- "retry in {} ms ", experimentId, processId, retryAttempt);
+ "retry in {} s ", experimentId, processId, retryAttempt*3);
try {
- Thread.sleep(retryAttempt++*1000);
+ Thread.sleep(retryAttempt++*3000);
} catch (InterruptedException e) {
// ignore we don't care this exception.
}