You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/03/23 01:12:30 UTC
svn commit: r1084421 -
/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
Author: bfoster
Date: Wed Mar 23 00:12:30 2011
New Revision: 1084421
URL: http://svn.apache.org/viewvc?rev=1084421&view=rev
Log:
- improved synchronization of QueueManager
-------------------
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1084421&r1=1084420&r2=1084421&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java Wed Mar 23 00:12:30 2011
@@ -197,14 +197,19 @@ public class QueueManager {
}
if (stub != null) {
CachedWorkflowProcessor cachedWP = this.processorQueue.get(stub.getInstanceId());
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- TaskProcessor taskProcessor = (TaskProcessor) WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
- TaskInstance taskInstance = this.makeInstance(taskProcessor);
- this.executingTasks.put(taskProcessor.getInstanceId() + ":" + taskProcessor.getModelId(), taskProcessor.getStub());
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
- return taskInstance;
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ TaskProcessor taskProcessor = (TaskProcessor) WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), stub.getModelId());
+ TaskInstance taskInstance = this.makeInstance(taskProcessor);
+ this.executingTasks.put(taskProcessor.getInstanceId() + ":" + taskProcessor.getModelId(), taskProcessor.getStub());
+ return taskInstance;
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}else {
return null;
}
@@ -226,15 +231,20 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- if (modelId != null)
- WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId).revertState();
- else
- cachedWP.getWorkflowProcessor().revertState();
- WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ if (modelId != null)
+ WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId).revertState();
+ else
+ cachedWP.getWorkflowProcessor().revertState();
+ WorkflowUtils.validateWorkflowProcessor(cachedWP.getWorkflowProcessor());
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to revert state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -245,13 +255,18 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- if (wp instanceof TaskProcessor)
- ((TaskProcessor) wp).setJobId(jobId);
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ if (wp instanceof TaskProcessor)
+ ((TaskProcessor) wp).setJobId(jobId);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -262,24 +277,29 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- if (state instanceof RevertableWorkflowState)
- ((RevertableWorkflowState) state).setPrevState(wp.getState());
- wp.setState(state);
- if (wp instanceof TaskProcessor) {
- if (this.executingTasks.containsKey(instanceId + ":" + modelId)) {
- if (!(state instanceof ExecutingState))
- this.executingTasks.remove(instanceId + ":" + modelId);
- else
- this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
- }else {
- this.updateRunnableStub(wp);
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ if (state instanceof RevertableWorkflowState)
+ ((RevertableWorkflowState) state).setPrevState(wp.getState());
+ wp.setState(state);
+ if (wp instanceof TaskProcessor) {
+ if (this.executingTasks.containsKey(instanceId + ":" + modelId)) {
+ if (!(state instanceof ExecutingState))
+ this.executingTasks.remove(instanceId + ":" + modelId);
+ else
+ this.executingTasks.put(instanceId + ":" + modelId, wp.getStub());
+ }else {
+ this.updateRunnableStub(wp);
+ }
}
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
}
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set state for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -294,14 +314,19 @@ public class QueueManager {
}
CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- wp.setPriorityRecur(priority);
- if (wp instanceof TaskProcessor)
- this.updateRunnableStub(wp);
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = (modelId == null) ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ wp.setPriorityRecur(priority);
+ if (wp instanceof TaskProcessor)
+ this.updateRunnableStub(wp);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set priority for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -312,12 +337,17 @@ public class QueueManager {
try {
CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(cachedWP.getInstanceId());
- WorkflowProcessor wp = modelId == null ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
- wp.setDynamicMetadata(metadata);
- processorLock.unlock(cachedWP.getInstanceId());
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+ processorLock.lock(cachedWP.getInstanceId());
+ WorkflowProcessor wp = modelId == null ? cachedWP.getWorkflowProcessor() : WorkflowUtils.findProcessor(cachedWP.getWorkflowProcessor(), modelId);
+ wp.setDynamicMetadata(metadata);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ processorLock.unlock(cachedWP.getInstanceId());
+ cachedWP.cache();
+ }
}
}catch (Exception e) {
LOG.log(Level.SEVERE, "Failed to set metadata for workflow [InstanceId = '" + instanceId + "', ModelId = '" + modelId + "'] : " + e.getMessage(), e);
@@ -328,11 +358,16 @@ public class QueueManager {
CachedWorkflowProcessor cachedWP = this.processorQueue.get(instanceId);
WorkflowProcessor returnProcessor = null;
if (cachedWP != null) {
- cachedWP.uncache();
- processorLock.lock(instanceId);
- returnProcessor = cachedWP.getWorkflowProcessor();
- processorLock.unlock(instanceId);
- cachedWP.cache();
+ try {
+ cachedWP.uncache();
+ processorLock.lock(instanceId);
+ returnProcessor = cachedWP.getWorkflowProcessor();
+ }catch (RuntimeException e) {
+ throw e;
+ }finally {
+ processorLock.unlock(instanceId);
+ cachedWP.cache();
+ }
}
return returnProcessor;
}
@@ -566,6 +601,10 @@ public class QueueManager {
try {
if (QueueManager.this.processorRepo != null)
QueueManager.this.processorRepo.delete(this.instanceId);
+ this.wp = null;
+ this.processorStub = null;
+ this.cachedMetadata = null;
+ this.instanceId = null;
}catch (Exception e) {
LOG.log(Level.WARNING, "Failed to delete " + this.instanceId + " : " + e.getMessage(), e);
}