You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/15 06:57:07 UTC
svn commit: r1373212 - in /oodt/trunk/workflow/src:
main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
Author: mattmann
Date: Wed Aug 15 04:57:06 2012
New Revision: 1373212
URL: http://svn.apache.org/viewvc?rev=1373212&view=rev
Log:
- OODT-310 WIP: test harness for the task runner
Added:
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1373212&r1=1373211&r2=1373212&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Wed Aug 15 04:57:06 2012
@@ -89,25 +89,26 @@ public class TaskRunner implements Runna
TaskProcessor nextTaskProcessor = null;
while (running) {
- try {
- if (nextTaskProcessor == null){
- nextTaskProcessor = taskQuerier.getNext();
- nextTask = extractTaskFromProcessor(nextTaskProcessor);
- }
- while (running && !pause && nextTask != null
+ try {
+ nextTaskProcessor = taskQuerier.getNext();
+ nextTask = nextTaskProcessor != null ?
+ extractTaskFromProcessor(nextTaskProcessor):null;
+
+ while (running && !pause && nextTask != null
&& runner.hasOpenSlots(nextTask)) {
- // TODO: set Workflow met here?
+
+ // TODO: set Workflow met here?
runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
nextTaskProcessor = taskQuerier.getNext();
- nextTask = extractTaskFromProcessor(nextTaskProcessor);
-
- // take a breather
- try{
- Thread.currentThread().sleep(1000); //FIXME: make this configurable
- }
- catch (Exception ignore) {}
+ nextTask = nextTaskProcessor != null ?
+ extractTaskFromProcessor(nextTaskProcessor):null;
}
- } catch (Exception e) {
+ }
+ catch(InterruptedException e){
+ this.running = false;
+ break;
+ }
+ catch (Exception e) {
LOG.log(
Level.SEVERE,
"Engine failed while submitting jobs to its runner : "
@@ -132,6 +133,7 @@ public class TaskRunner implements Runna
} catch (Exception ignore) {
}
}
+
}
/**
Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java?rev=1373212&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MetSetterTaskQuerier.java Wed Aug 15 04:57:06 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.Calendar;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+import org.apache.oodt.commons.date.DateUtils;
+
+/**
+ *
+ * Intercepts the calls to {@link TaskQuerier#getNext()} and injects
+ * StartDateTime (and potentially other met fields into the
+ * {@link TaskProcessor#getDynamicMetadata()}.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class MetSetterTaskQuerier extends TaskQuerier {
+
+ /**
+ * @param processorQueue
+ * @param prioritizer
+ */
+ public MetSetterTaskQuerier(WorkflowProcessorQueue processorQueue,
+ PrioritySorter prioritizer) {
+ super(processorQueue, prioritizer);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.engine.TaskQuerier#getNext()
+ */
+ @Override
+ public TaskProcessor getNext() {
+ TaskProcessor taskProcessor = super.getNext();
+ if(taskProcessor == null) return null;
+ Metadata met = new Metadata();
+ met.addMetadata("StartDateTime", DateUtils.toString(Calendar.getInstance()));
+ taskProcessor.setDynamicMetadata(met);
+ return taskProcessor;
+ }
+
+}
Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java?rev=1373212&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java Wed Aug 15 04:57:06 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.io.File;
+import java.util.List;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
+import org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunner;
+import org.apache.oodt.cas.workflow.engine.runner.EngineRunner;
+import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+//Junit imports
+import junit.framework.TestCase;
+
+/**
+ *
+ * Exercises the {@link TaskRunner}.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class TestTaskRunner extends TestCase {
+
+ private File testDir;
+
+ private EngineRunner runner;
+
+ private TaskRunner taskRunner;
+
+ private TaskQuerier querier;
+
+ public void testExecuteTasks() {
+ FILOPrioritySorter prioritizer = new FILOPrioritySorter();
+ MockProcessorQueue processorQueue = new MockProcessorQueue();
+ querier = new MetSetterTaskQuerier(processorQueue, prioritizer);
+ Thread querierThread = new Thread(querier);
+ querierThread.start();
+ while (querier.getRunnableProcessors().size() != 2) {
+ assertNotNull(querier.getRunnableProcessors());
+ }
+ List<WorkflowProcessor> runnables = querier.getRunnableProcessors();
+ assertNotNull(runnables);
+ assertEquals(2, runnables.size());
+ runner = new AsynchronousLocalEngineRunner();
+ taskRunner = new TaskRunner(querier, runner, 2);
+ assertNotNull(taskRunner);
+ Thread runnerThread = new Thread(taskRunner);
+ WorkflowTask task = taskRunner
+ .extractTaskFromProcessor((TaskProcessor) runnables.get(0));
+ assertNotNull(task);
+ testDir = new File(task.getTaskConfig().getProperty("TestDirPath"));
+ assertNotNull(testDir);
+ runnerThread.start();
+
+ while (!testDir.exists()
+ || (testDir.exists() && testDir.listFiles().length < 2)) {
+ }
+
+ querier.setRunning(false);
+ runnerThread.interrupt();
+
+ // get the test dir path
+ assertTrue(testDir.exists());
+ assertNotNull(testDir.listFiles());
+ assertEquals(2, testDir.listFiles().length);
+ taskRunner.setRunning(false);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#tearDown()
+ */
+ @Override
+ protected void tearDown() throws Exception {
+ // blow away test file
+ deleteAllFiles(testDir.getAbsolutePath());
+ testDir.delete();
+ testDir = null;
+ this.runner = null;
+ this.querier = null;
+ this.taskRunner = null;
+ }
+
+ private void deleteAllFiles(String startDir) {
+ File startDirFile = new File(startDir);
+ File[] delFiles = startDirFile.listFiles();
+
+ if (delFiles != null && delFiles.length > 0) {
+ for (int i = 0; i < delFiles.length; i++) {
+ delFiles[i].delete();
+ }
+ }
+
+ startDirFile.delete();
+
+ }
+
+}