You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by bf...@apache.org on 2011/02/17 19:53:36 UTC
svn commit: r1071749 - in
/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner:
LocalEngineRunner.java LocalEngineRunnerFactory.java
Author: bfoster
Date: Thu Feb 17 18:53:36 2011
New Revision: 1071749
URL: http://svn.apache.org/viewvc?rev=1071749&view=rev
Log:
- update to LocalEngineRunner to allow a cache (with configurable size)
-----------------------------------
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1071749&r1=1071748&r2=1071749&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java Thu Feb 17 18:53:36 2011
@@ -17,6 +17,10 @@
package org.apache.oodt.cas.workflow.engine.runner;
//OODT imports
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.oodt.cas.workflow.instance.TaskInstance;
/**
@@ -30,15 +34,43 @@ import org.apache.oodt.cas.workflow.inst
*/
public class LocalEngineRunner extends EngineRunner {
+ private static final Logger LOG = Logger.getLogger(LocalEngineRunner.class.getName());
+
+ private int cacheSize;
private int numOfSlots;
private int usedSlots = 0;
+ private List<TaskInstance> cache;
- public LocalEngineRunner(int numOfSlots) {
+ public LocalEngineRunner(int numOfSlots, int cacheSize) {
this.numOfSlots = numOfSlots;
+ this.cacheSize = cacheSize;
+ if (this.cacheSize > 0) {
+ new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ if (LocalEngineRunner.this.numOfSlots > LocalEngineRunner.this.usedSlots && LocalEngineRunner.this.cache.size() > 0)
+ LocalEngineRunner.this.execute(LocalEngineRunner.this.cache.remove(0));
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to submit job from cache : " + e.getMessage(), e);
+ }
+ try {
+ synchronized(this) {
+ this.wait(2000);
+ }
+ }catch (Exception e) {
+ LOG.log(Level.WARNING, "Local Runner cache submitter thread wait terminated : " + e.getMessage(), e);
+ }
+ }
+ }
+ }).start();
+ }
}
public void execute(final TaskInstance workflowInstance) throws Exception {
incrSlots();
+ if (this.cache.size() > 0)
+ cache.add(workflowInstance);
new Thread(new Runnable() {
public void run() {
try {
@@ -52,7 +84,7 @@ public class LocalEngineRunner extends E
@Override
public synchronized int getOpenSlots(TaskInstance workflowInstance) throws Exception {
- return numOfSlots - usedSlots;
+ return (numOfSlots - usedSlots) + this.cacheSize;
}
@Override
Modified: oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
URL: http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java?rev=1071749&r1=1071748&r2=1071749&view=diff
==============================================================================
--- oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java (original)
+++ oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java Thu Feb 17 18:53:36 2011
@@ -27,14 +27,19 @@ package org.apache.oodt.cas.workflow.eng
*/
public class LocalEngineRunnerFactory implements EngineRunnerFactory {
+ private int cacheSize = 0;
private int numOfSlots = 6;
public LocalEngineRunner createRunner() {
- return new LocalEngineRunner(this.numOfSlots);
+ return new LocalEngineRunner(this.numOfSlots, this.cacheSize);
}
public void setNumOfSlots(int numOfSlots) {
this.numOfSlots = numOfSlots;
}
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}