You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/30 22:33:36 UTC

airavata git commit: Fixed consumer unbind issue with cancel operation and Complete cancel operation support for new changes

Repository: airavata
Updated Branches:
  refs/heads/develop a6b1785a3 -> 3d16cbcce


Fixed consumer unbind issue with cancel operation and Complete cancel operation support for new changes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3d16cbcc
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3d16cbcc
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3d16cbcc

Branch: refs/heads/develop
Commit: 3d16cbccec4cfedf7bbef6ebca3d3bba107e3855
Parents: a6b1785
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Mon Nov 30 16:33:29 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Mon Nov 30 16:33:29 2015 -0500

----------------------------------------------------------------------
 .../gfac/core/context/ProcessContext.java       | 11 +++++-
 .../apache/airavata/gfac/impl/GFacWorker.java   | 39 ++++++++++++--------
 .../impl/task/DefaultJobSubmissionTask.java     | 10 +++--
 .../impl/watcher/CancelRequestWatcherImpl.java  |  9 ++---
 4 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 88bd178..49b323c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -94,8 +94,9 @@ public class ProcessContext {
     private boolean complete = false; // all tasks executed?
     private boolean recovery = false; // is process in recovery mode?
     private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again
+	private boolean acknowledge;
 
-    /**
+	/**
 	 * Note: process context property use lazy loading approach. In runtime you will see some properties as null
 	 * unless you have access it previously. Once that property access using the api,it will be set to correct value.
 	 */
@@ -482,4 +483,12 @@ public class ProcessContext {
     public void setStorageResource(StorageResourceDescription storageResource) {
         this.storageResource = storageResource;
     }
+
+	public void setAcknowledge(boolean acknowledge) {
+		this.acknowledge = acknowledge;
+	}
+
+	public boolean isAcknowledge() {
+		return acknowledge;
+	}
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 596baab..c701ed5 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,8 +29,6 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.task.TaskModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +36,6 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.MessageFormat;
 import java.util.List;
-import java.util.Map;
 
 public class GFacWorker implements Runnable {
 
@@ -122,8 +119,13 @@ public class GFacWorker implements Runnable {
 					throw new GFacException("process Id : " + processId + " Couldn't identify process type");
 			}
 			if (processContext.isCancel()) {
-				sendAck();
-				Factory.getGfacContext().removeProcess(processContext.getProcessId());
+				if (processContext.getProcessState() == ProcessState.MONITORING
+						|| processContext.getProcessState() == ProcessState.EXECUTING) {
+					// don't send ack if the process is in MONITORING state, wait until cancel email comes to airavata.
+				} else {
+					sendAck();
+					Factory.getGfacContext().removeProcess(processContext.getProcessId());
+				}
 			}
 		} catch (GFacException e) {
 			log.error("GFac Worker throws an exception", e);
@@ -233,16 +235,23 @@ public class GFacWorker implements Runnable {
 //	}
 
 	private void sendAck() {
-		try {
-			long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
-					processContext.getExperimentId(), processId);
-			Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
-			log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
-					processId, processDeliveryTag);
-		} catch (Exception e1) {
-			String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ",
-					processContext .getExperimentId(), processId);
-			log.error(format, e1);
+		// this ensure, gfac doesn't send ack more than once for a process. which cause to remove gfac rabbitmq consumer from rabbitmq server.
+		if (!processContext.isAcknowledge()) {
+			try {
+                long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
+                        processContext.getExperimentId(), processId);
+                Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+                processContext.setAcknowledge(true);
+                log.info("expId: {}, processId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
+                        processId, processDeliveryTag);
+            } catch (Exception e1) {
+                processContext.setAcknowledge(false);
+                String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ",
+                        processContext .getExperimentId(), processId);
+                log.error(format, e1);
+            }
+		} else {
+			log.info("expId: {}, processId: {} :- already acknowledged ", processContext.getExperimentId(), processId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
index d435b5f..b9805f8 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java
@@ -36,7 +36,6 @@ import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.*;
 import org.apache.airavata.model.task.TaskTypes;
-import org.apache.airavata.registry.core.experiment.catalog.model.Process;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -48,7 +47,10 @@ import java.util.Map;
 
 public class DefaultJobSubmissionTask implements JobSubmissionTask {
     private static final Logger log = LoggerFactory.getLogger(DefaultJobSubmissionTask.class);
-    @Override
+	private static int waitForProcessIdmillis = 5000;
+	private static int pauseTimeInSec = waitForProcessIdmillis / 1000;
+
+	@Override
     public void init(Map<String, String> propertyMap) throws TaskException {
 
     }
@@ -262,9 +264,9 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask {
 		if (jobModel != null) {
 			if (processContext.getProcessState() == ProcessState.EXECUTING) {
 				while (jobModel.getJobId() == null) {
-					log.info("Cancellation pause until process get jobId");
+					log.info("Cancellation pause {} secs until process get jobId", pauseTimeInSec);
 					try {
-						Thread.sleep(1000);
+						Thread.sleep(waitForProcessIdmillis);
 					} catch (InterruptedException e) {
 						// ignore
 					}

http://git-wip-us.apache.org/repos/asf/airavata/blob/3d16cbcc/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index 595380f..e4a24d3 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -88,15 +88,14 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher {
 		ProcessContext processContext = Factory.getGfacContext().getProcess(processId);
 		if (processContext != null) {
             processContext.setCancel(true);
-            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>> calling process cancelling operation <<<<<<<<<<<<<<<<<<<<<<<<<<<");
-            Factory.getGFacEngine().cancelProcess(processContext);
-            log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
+			log.info("expId {}, processId : {}, Cancelling process", experimentId, processId);
+			Factory.getGFacEngine().cancelProcess(processContext);
         } else {
 			if (retryAttempt < max_retry) {
 				log.info("expId: {}, Cancel request came for processId {} but couldn't find process context. " +
-						"retry in {} ms ", experimentId, processId, retryAttempt);
+						"retry in {} s ", experimentId, processId, retryAttempt*3);
 				try {
-					Thread.sleep(retryAttempt++*1000);
+					Thread.sleep(retryAttempt++*3000);
 				} catch (InterruptedException e) {
 					// ignore we don't care this exception.
 				}