You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/15 06:57:07 UTC

svn commit: r1373212 - in /oodt/trunk/workflow/src: main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java

Author: mattmann
Date: Wed Aug 15 04:57:06 2012
New Revision: 1373212

URL: http://svn.apache.org/viewvc?rev=1373212&view=rev
Log:
- OODT-310 WIP: test harness for the task runner

Added:
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1373212&r1=1373211&r2=1373212&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Wed Aug 15 04:57:06 2012
@@ -89,25 +89,26 @@ public class TaskRunner implements Runna
     TaskProcessor nextTaskProcessor = null;
 
     while (running) {
-      try {
-        if (nextTaskProcessor == null){
-          nextTaskProcessor = taskQuerier.getNext();
-          nextTask = extractTaskFromProcessor(nextTaskProcessor);
-        }
-        while (running && !pause && nextTask != null
+      try {          
+        nextTaskProcessor = taskQuerier.getNext();
+        nextTask = nextTaskProcessor != null ? 
+            extractTaskFromProcessor(nextTaskProcessor):null;        
+        
+        while (running && !pause && nextTask != null 
             && runner.hasOpenSlots(nextTask)) {
-          // TODO: set Workflow met here?
+              
+          // TODO: set Workflow met here?          
           runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
           nextTaskProcessor = taskQuerier.getNext();
-          nextTask = extractTaskFromProcessor(nextTaskProcessor);
-
-          // take a breather
-          try{
-            Thread.currentThread().sleep(1000); //FIXME: make this configurable
-          }
-          catch (Exception ignore) {}
+          nextTask = nextTaskProcessor != null ? 
+              extractTaskFromProcessor(nextTaskProcessor):null;
         }
-      } catch (Exception e) {
+      } 
+       catch(InterruptedException e){
+         this.running = false;
+         break;
+       }
+      catch (Exception e) {
         LOG.log(
             Level.SEVERE,
             "Engine failed while submitting jobs to its runner : "
@@ -132,6 +133,7 @@ public class TaskRunner implements Runna
       } catch (Exception ignore) {
       }
     }
+    
   }
 
   /**

Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java?rev=1373212&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java Wed Aug 15 04:57:06 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.Calendar;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+import org.apache.oodt.commons.date.DateUtils;
+
+/**
+ * 
+ * Intercepts the calls to {@link TaskQuerier#getNext()} and injects
+ * StartDateTime (and potentially other met fields into the
+ * {@link TaskProcessor#getDynamicMetadata()}.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class MetSetterTaskQuerier extends TaskQuerier {
+
+  /**
+   * @param processorQueue
+   * @param prioritizer
+   */
+  public MetSetterTaskQuerier(WorkflowProcessorQueue processorQueue,
+      PrioritySorter prioritizer) {
+    super(processorQueue, prioritizer);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.oodt.cas.workflow.engine.TaskQuerier#getNext()
+   */
+  @Override
+  public TaskProcessor getNext() {
+    TaskProcessor taskProcessor = super.getNext();
+    if(taskProcessor == null) return null;
+    Metadata met = new Metadata();
+    met.addMetadata("StartDateTime", DateUtils.toString(Calendar.getInstance()));
+    taskProcessor.setDynamicMetadata(met);
+    return taskProcessor;
+  }
+
+}

Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java?rev=1373212&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java Wed Aug 15 04:57:06 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.io.File;
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
+import org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunner;
+import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
+import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+//Junit imports
+import junit.framework.TestCase;
+
+/**
+ * 
+ * Exercises the {@link TaskRunner}.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class TestTaskRunner extends TestCase {
+
+  private File testDir;
+
+  private EngineRunner runner;
+
+  private TaskRunner taskRunner;
+
+  private TaskQuerier querier;
+
+  public void testExecuteTasks() {
+    FILOPrioritySorter prioritizer = new FILOPrioritySorter();
+    MockProcessorQueue processorQueue = new MockProcessorQueue();
+    querier = new MetSetterTaskQuerier(processorQueue, prioritizer);
+    Thread querierThread = new Thread(querier);
+    querierThread.start();
+    while (querier.getRunnableProcessors().size() != 2) {
+      assertNotNull(querier.getRunnableProcessors());
+    }
+    List<WorkflowProcessor> runnables = querier.getRunnableProcessors();
+    assertNotNull(runnables);
+    assertEquals(2, runnables.size());
+    runner = new AsynchronousLocalEngineRunner();
+    taskRunner = new TaskRunner(querier, runner, 2);
+    assertNotNull(taskRunner);
+    Thread runnerThread = new Thread(taskRunner);
+    WorkflowTask task = taskRunner
+        .extractTaskFromProcessor((TaskProcessor) runnables.get(0));
+    assertNotNull(task);
+    testDir = new File(task.getTaskConfig().getProperty("TestDirPath"));
+    assertNotNull(testDir);
+    runnerThread.start();
+
+    while (!testDir.exists()
+        || (testDir.exists() && testDir.listFiles().length < 2)) {
+    }
+
+    querier.setRunning(false);
+    runnerThread.interrupt();
+
+    // get the test dir path
+    assertTrue(testDir.exists());
+    assertNotNull(testDir.listFiles());
+    assertEquals(2, testDir.listFiles().length);
+    taskRunner.setRunning(false);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see junit.framework.TestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see junit.framework.TestCase#tearDown()
+   */
+  @Override
+  protected void tearDown() throws Exception {
+    // blow away test file
+    deleteAllFiles(testDir.getAbsolutePath());
+    testDir.delete();
+    testDir = null;
+    this.runner = null;
+    this.querier = null;
+    this.taskRunner = null;
+  }
+
+  private void deleteAllFiles(String startDir) {
+    File startDirFile = new File(startDir);
+    File[] delFiles = startDirFile.listFiles();
+
+    if (delFiles != null && delFiles.length > 0) {
+      for (int i = 0; i < delFiles.length; i++) {
+        delFiles[i].delete();
+      }
+    }
+
+    startDirFile.delete();
+
+  }
+
+}