You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/03/23 01:12:30 UTC

svn commit: r1084421 - /oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java

Author: bfoster
Date: Wed Mar 23 00:12:30 2011
New Revision: 1084421

URL: http://svn.apache.org/viewvc?rev=1084421&view=rev
Log:

- improved synchronization of QueueManager

-------------------

Modified:
    oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java

Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1084421&r1=1084420&r2=1084421&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java Wed Mar 23 00:12:30 2011
@@ -197,14 +197,19 @@ public class QueueManager {
 		}
 		if (stub != null) {
 			CachedWorkflowProcessor cachedWP = this.processorQueue.get(stub.getInstanceId());
-			cachedWP.uncache();
-			processorLock.lock(cachedWP.getInstanceId());
-			TaskProcessor taskProcessor = (TaskProcessor) WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
-			TaskInstance taskInstance = this.makeInstance(taskProcessor);
-			this.executingTasks.put(taskProcessor.getInstanceId() + ":" + taskProcessor.getModelId(), taskProcessor.getStub());
-			processorLock.unlock(cachedWP.getInstanceId());
-			cachedWP.cache();
-			return taskInstance;
+			try {
+				cachedWP.uncache();
+				processorLock.lock(cachedWP.getInstanceId());
+				TaskProcessor taskProcessor = (TaskProcessor) WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
+				TaskInstance taskInstance = this.makeInstance(taskProcessor);
+				this.executingTasks.put(taskProcessor.getInstanceId() + ":" + taskProcessor.getModelId(), taskProcessor.getStub());
+				return taskInstance;
+			}catch (Exception e) {
+				throw e;
+			}finally {
+				processorLock.unlock(cachedWP.getInstanceId());
+				cachedWP.cache();
+			}
 		}else { 
 			return null;
 		}
@@ -226,15 +231,20 @@ public class QueueManager {
 		try {
 			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
 			if (cachedWP != null) {
-				cachedWP.uncache();
-				processorLock.lock(cachedWP.getInstanceId());
-				if (modelId != null)
-					WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId).revertState();
-				else
-					cachedWP.getWorkflowProcessor().revertState();
-				WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
-				processorLock.unlock(cachedWP.getInstanceId());
-				cachedWP.cache();
+				try {
+					cachedWP.uncache();
+					processorLock.lock(cachedWP.getInstanceId());
+					if (modelId != null)
+						WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId).revertState();
+					else
+						cachedWP.getWorkflowProcessor().revertState();
+					WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
+				}catch (Exception e) {
+					throw e;
+				}finally {
+					processorLock.unlock(cachedWP.getInstanceId());
+					cachedWP.cache();
+				}
 			}
 		}catch (Exception e) {
 			LOG.log(Level.SEVERE, "Failed to revert state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -245,13 +255,18 @@ public class QueueManager {
 		try {
 			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
 			if (cachedWP != null) {
-				cachedWP.uncache();
-				processorLock.lock(cachedWP.getInstanceId());
-				WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-				if (wp instanceof TaskProcessor)
-					((TaskProcessor) wp).setJobId(jobId);
-				processorLock.unlock(cachedWP.getInstanceId());
-				cachedWP.cache();
+				try {
+					cachedWP.uncache();
+					processorLock.lock(cachedWP.getInstanceId());
+					WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+					if (wp instanceof TaskProcessor)
+						((TaskProcessor) wp).setJobId(jobId);
+				}catch (Exception e) {
+					throw e;
+				}finally {
+					processorLock.unlock(cachedWP.getInstanceId());
+					cachedWP.cache();
+				}
 			}
 		}catch (Exception e) {
 			LOG.log(Level.SEVERE, "Failed to set state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -262,24 +277,29 @@ public class QueueManager {
 		try {
 			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
 			if (cachedWP != null) {
-				cachedWP.uncache();
-				processorLock.lock(cachedWP.getInstanceId());
-				WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-				if (state instanceof RevertableWorkflowState)
-					((RevertableWorkflowState) state).setPrevState(wp.getState());
-				wp.setState(state);
-				if (wp instanceof TaskProcessor) {
-					if (this.executingTasks.containsKey(instanceId + ":" + modelId)) {
-						if (!(state instanceof ExecutingState))
-							this.executingTasks.remove(instanceId + ":" + modelId);
-						else
-							this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
-					}else {
-						this.updateRunnableStub(wp);
+				try {
+					cachedWP.uncache();
+					processorLock.lock(cachedWP.getInstanceId());
+					WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+					if (state instanceof RevertableWorkflowState)
+						((RevertableWorkflowState) state).setPrevState(wp.getState());
+					wp.setState(state);
+					if (wp instanceof TaskProcessor) {
+						if (this.executingTasks.containsKey(instanceId + ":" + modelId)) {
+							if (!(state instanceof ExecutingState))
+								this.executingTasks.remove(instanceId + ":" + modelId);
+							else
+								this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
+						}else {
+							this.updateRunnableStub(wp);
+						}
 					}
+				}catch (Exception e) {
+					throw e;
+				}finally {
+					processorLock.unlock(cachedWP.getInstanceId());
+					cachedWP.cache();
 				}
-				processorLock.unlock(cachedWP.getInstanceId());
-				cachedWP.cache();
 			}
 		}catch (Exception e) {
 			LOG.log(Level.SEVERE, "Failed to set state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -294,14 +314,19 @@ public class QueueManager {
 			}
 			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
 			if (cachedWP != null) {
-				cachedWP.uncache();
-				processorLock.lock(cachedWP.getInstanceId());
-				WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-				wp.setPriorityRecur(priority);
-				if (wp instanceof TaskProcessor) 
-					this.updateRunnableStub(wp);
-				processorLock.unlock(cachedWP.getInstanceId());
-				cachedWP.cache();
+				try {
+					cachedWP.uncache();
+					processorLock.lock(cachedWP.getInstanceId());
+					WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+					wp.setPriorityRecur(priority);
+					if (wp instanceof TaskProcessor) 
+						this.updateRunnableStub(wp);
+				}catch (Exception e) {
+					throw e;
+				}finally {
+					processorLock.unlock(cachedWP.getInstanceId());
+					cachedWP.cache();
+				}
 			}
 		}catch (Exception e) {
 			LOG.log(Level.SEVERE, "Failed to set priority for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -312,12 +337,17 @@ public class QueueManager {
 		try {
 			CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
 			if (cachedWP != null) {
-				cachedWP.uncache();
-				processorLock.lock(cachedWP.getInstanceId());
-				WorkflowProcessor wp = modelId == null ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
-				wp.setDynamicMetadata(metadata);
-				processorLock.unlock(cachedWP.getInstanceId());
-				cachedWP.cache();
+				try {
+					cachedWP.uncache();
+					processorLock.lock(cachedWP.getInstanceId());
+					WorkflowProcessor wp = modelId == null ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+					wp.setDynamicMetadata(metadata);
+				}catch (Exception e) {
+					throw e;
+				}finally {
+					processorLock.unlock(cachedWP.getInstanceId());
+					cachedWP.cache();
+				}
 			}
 		}catch (Exception e) {
 			LOG.log(Level.SEVERE, "Failed to set metadata for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -328,11 +358,16 @@ public class QueueManager {
 		CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
 		WorkflowProcessor returnProcessor = null;
 		if (cachedWP != null) {
-			cachedWP.uncache();
-			processorLock.lock(instanceId);
-			returnProcessor = cachedWP.getWorkflowProcessor();
-			processorLock.unlock(instanceId);
-			cachedWP.cache();
+			try {
+				cachedWP.uncache();
+				processorLock.lock(instanceId);
+				returnProcessor = cachedWP.getWorkflowProcessor();
+			}catch (RuntimeException e) {
+				throw e;
+			}finally {
+				processorLock.unlock(instanceId);
+				cachedWP.cache();
+			}
 		}		
 		return returnProcessor;
 	}
@@ -566,6 +601,10 @@ public class QueueManager {
 			try {
 				if (QueueManager.this.processorRepo != null)
 					QueueManager.this.processorRepo.delete(this.instanceId);
+				this.wp = null;
+				this.processorStub = null;
+				this.cachedMetadata = null;
+				this.instanceId = null;
 			}catch (Exception e) {
 				LOG.log(Level.WARNING, "Failed to delete " + this.instanceId + " : " + e.getMessage(), e);
 			}