You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ant.apache.org by co...@apache.org on 2003/02/11 12:26:44 UTC
cvs commit: ant/src/testcases/org/apache/tools/ant/taskdefs ParallelTest.java
conor 2003/02/11 03:26:44
Modified: src/etc/testcases/taskdefs parallel.xml
src/main/org/apache/tools/ant/taskdefs Parallel.java
src/testcases/org/apache/tools/ant/taskdefs
ParallelTest.java
Log:
Add a thread count to the parallel task to stop it using too many threads
PR: 16906
Submitted by: Danno Ferrin
Revision Changes Path
1.2 +95 -0 ant/src/etc/testcases/taskdefs/parallel.xml
Index: parallel.xml
===================================================================
RCS file: /home/cvs/ant/src/etc/testcases/taskdefs/parallel.xml,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -u -r1.1 -r1.2
--- parallel.xml 21 Feb 2002 15:38:16 -0000 1.1
+++ parallel.xml 11 Feb 2003 11:26:44 -0000 1.2
@@ -21,6 +21,101 @@
</parallel>
</target>
+ <target name="testThreadCount">
+ <parallel threadCount='1' pollInterval="30">
+ <!-- expected start 1, end 1, start 2, end 2, start 3, end 3 -->
+ <sequential>
+ <echo message="+1"/>
+ <sleep seconds="1"/>
+ <echo message="-1"/>
+ </sequential>
+ <sequential>
+ <echo message="+2"/>
+ <sleep seconds="2"/>
+ <echo message="-2"/>
+ </sequential>
+ <sequential>
+ <echo message="+3"/>
+ <sleep seconds="3"/>
+ <echo message="-3"/>
+ </sequential>
+ </parallel>
+ <parallel threadCount='2' pollInterval="30">
+ <!-- expected start 1, end 1, start 2, end 2, start 3, end 3 -->
+ <sequential>
+ <echo message="+1"/>
+ <sleep seconds="1"/>
+ <echo message="-1"/>
+ </sequential>
+ <sequential>
+ <echo message="+2"/>
+ <sleep seconds="2"/>
+ <echo message="-2"/>
+ </sequential>
+ <sequential>
+ <echo message="+3"/>
+ <sleep seconds="3"/>
+ <echo message="-3"/>
+ </sequential>
+ </parallel>
+ <parallel threadCount='3' pollInterval="30">
+ <!-- expected start 1, start 2, end 1, start 3, end 2, end 3 -->
+ <sequential>
+ <echo message="+1"/>
+ <sleep seconds="1"/>
+ <echo message="-1"/>
+ </sequential>
+ <sequential>
+ <echo message="+2"/>
+ <sleep seconds="2"/>
+ <echo message="-2"/>
+ </sequential>
+ <sequential>
+ <echo message="+3"/>
+ <sleep seconds="3"/>
+ <echo message="-3"/>
+ </sequential>
+ </parallel>
+ <parallel threadCount='4' pollInterval="30">
+ <!-- expected start 1, start 2, start 3, end 1, end 2, end 3 -->
+ <sequential>
+ <echo message="+1"/>
+ <sleep seconds="1"/>
+ <echo message="-1"/>
+ </sequential>
+ <sequential>
+ <echo message="+2"/>
+ <sleep seconds="2"/>
+ <echo message="-2"/>
+ </sequential>
+ <sequential>
+ <echo message="+3"/>
+ <sleep seconds="3"/>
+ <echo message="-3"/>
+ </sequential>
+ </parallel>
+ <parallel threadsPerProcessor='1' pollInterval="30">
+ <!-- expected result varies, depends on setup -->
+ <!-- this is a smoke test for threadsPerProcessor -->
+ <sequential>
+ <!--echo message="+1"/-->
+ <sleep seconds="1"/>
+ <!--echo message="-1"/-->
+ </sequential>
+ <sequential>
+ <!--echo message="+2"/-->
+ <sleep seconds="2"/>
+ <!--echo message="-2"/-->
+ </sequential>
+ <sequential>
+ <!--echo message="+3"/-->
+ <sleep seconds="3"/>
+ <!--echo message="-3"/-->
+ </sequential>
+ </parallel>
+
+ </target>
+
<target name="testDemux">
<parallel>
<demuxtest/>
1.15 +190 -8 ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java
Index: Parallel.java
===================================================================
RCS file: /home/cvs/ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -w -u -r1.14 -r1.15
--- Parallel.java 10 Feb 2003 14:13:36 -0000 1.14
+++ Parallel.java 11 Feb 2003 11:26:44 -0000 1.15
@@ -53,6 +53,7 @@
*/
package org.apache.tools.ant.taskdefs;
+import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.Vector;
import org.apache.tools.ant.BuildException;
@@ -61,14 +62,22 @@
import org.apache.tools.ant.TaskContainer;
import org.apache.tools.ant.util.StringUtils;
-
-
/**
* Executes the contained tasks in separate threads, continuing
- * once all are completed.
+ * once all are completed.<br/>
+ * New behavior allows for the ant script to specify a maximum number of
+ * threads that will be executed in parallel. One should be very careful about
+ * using the <code>waitFor</code> task when specifying <code>threadCount</code>
+ * as it can cause deadlocks if the number of threads is too small or if one of
+ * the nested tasks fails to execute completely. The task selection algorithm
+ * will insure that the tasks listed before a task have started before that
+ * task is started, but it will not insure a successful completion of those
+ * tasks or that those tasks will finish first (i.e. it's a classic race
+ * condition).
* <p>
* @author Thomas Christen <a href="mailto:chr@active.ch">chr@active.ch</a>
* @author Conor MacNeill
+ * @author Danno Ferrin
* @since Ant 1.4
*
* @ant.task category="control"
@@ -79,6 +88,17 @@
/** Collection holding the nested tasks */
private Vector nestedTasks = new Vector();
+ /** Semaphore to notify of completed threads */
+ private final Object semaphore = new Object();
+
+ /** Total number of threads to run */
+ int numThreads = 0;
+
+ /** Total number of threads per processor to run. */
+ int numThreadsPerProcessor = 0;
+
+ /** Interval (in ms) to poll for finished threads. */
+ int pollInterval = 1000; // default is once a second
/**
* Add a nested task to execute in parallel.
@@ -89,11 +109,154 @@
}
/**
- * Block execution until the specified time or for a
- * specified amount of milliseconds and if defined,
- * execute the wait status.
+ * Dynamically generates the number of threads to execute based on the
+ * number of available processors (via
+ * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
+ * 1.4 VM, and it will overwrite the value set in threadCount.
+ * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
+ * <code>threadCount</code>.; optional
+ * @param numThreadsPerProcessor Number of threads to create per available
+ * processor.
+ *
+ */
+ public void setThreadsPerProcessor(int numThreadsPerProcessor) {
+ this.numThreadsPerProcessor = numThreadsPerProcessor;
+ }
+
+ /**
+ * Statically determine the maximum number of tasks to execute
+ * simultaneously. If there are less tasks than threads then all will be
+ * executed at once, if there are more then only <code>threadCount</code>
+ * tasks will be executed at one time. If <code>threadsPerProcessor</code>
+ * is set and the JVM is at least a 1.4 VM then this value is ignormed.; optional
+ *
+ * @param numThreads total number of therads.
+ *
+ */
+ public void setThreadCount(int numThreads) {
+ this.numThreads = numThreads;
+ }
+
+ /**
+ * Interval to poll for completed threads when threadCount or
+ * threadsPerProcessor is specified. Integer in milliseconds.; optional
+ *
+ * @param pollInterval New value of property pollInterval.
*/
+ public void setPollInterval(int pollInterval) {
+ this.pollInterval = pollInterval;
+ }
+
public void execute() throws BuildException {
+ updateThreadCounts();
+ if (numThreads == 0) {
+ spinAllThreads();
+ } else {
+ spinNumThreads();
+ }
+ }
+
+ public void updateThreadCounts() {
+ if (numThreadsPerProcessor != 0) {
+ int numProcessors = getNumProcessors();
+ if (numProcessors != 0) {
+ numThreads = numProcessors * numThreadsPerProcessor;
+ }
+ }
+ }
+
+ /**
+ * Spin up threadCount threads.
+ */
+ public void spinNumThreads() throws BuildException {
+ final int maxThreads = nestedTasks.size();
+ Thread[] threads = new Thread[maxThreads];
+ TaskThread[] taskThreads = new TaskThread[maxThreads];
+ int threadNumber = 0;
+ for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
+ threadNumber++) {
+ Task nestedTask = (Task) e.nextElement();
+ ThreadGroup group = new ThreadGroup("parallel");
+ TaskThread taskThread = new TaskThread(threadNumber, nestedTask);
+ taskThreads[threadNumber] = taskThread;
+ threads[threadNumber] = new Thread(group, taskThread);
+ }
+
+ final int maxRunning = numThreads;
+ Thread[] running = new Thread[maxRunning];
+ threadNumber = 0;
+
+ // now run them in limited numbers...
+ outer:
+ while (threadNumber < maxThreads) {
+ synchronized(semaphore) {
+ for (int i = 0; i < maxRunning; i++) {
+ if (running[i] == null || !running[i].isAlive()) {
+ running[i] = threads[threadNumber++];
+ running[i].start();
+ // countinue on outer while loop in case we used our last thread
+ continue outer;
+ }
+ }
+ // if we got here all are running, so sleep a little
+ try {
+ semaphore.wait(pollInterval);
+ } catch (InterruptedException ie) {
+ // dosen't java know interruptions are rude?
+ // just pretend it didn't happen and go aobut out business.
+ // sheesh!
+ }
+ }
+ }
+
+ // now join to all the threads
+ for (int i = 0; i < maxRunning; ++i) {
+ try {
+ if (running[i] != null) {
+ running[i].join();
+ }
+ } catch (InterruptedException ie) {
+ // who would interrupt me at a time like this?
+ }
+ }
+
+ // now did any of the threads throw an exception
+ StringBuffer exceptionMessage = new StringBuffer();
+ int numExceptions = 0;
+ Throwable firstException = null;
+ Location firstLocation = Location.UNKNOWN_LOCATION;;
+ for (int i = 0; i < maxThreads; ++i) {
+ Throwable t = taskThreads[i].getException();
+ if (t != null) {
+ numExceptions++;
+ if (firstException == null) {
+ firstException = t;
+ }
+ if (t instanceof BuildException &&
+ firstLocation == Location.UNKNOWN_LOCATION) {
+ firstLocation = ((BuildException) t).getLocation();
+ }
+ exceptionMessage.append(StringUtils.LINE_SEP);
+ exceptionMessage.append(t.getMessage());
+ }
+ }
+
+ if (numExceptions == 1) {
+ if (firstException instanceof BuildException) {
+ throw (BuildException) firstException;
+ } else {
+ throw new BuildException(firstException);
+ }
+ } else if (numExceptions > 1) {
+ throw new BuildException(exceptionMessage.toString(),
+ firstLocation);
+ }
+ }
+
+ /**
+ * Spin up one thread per task.
+ */
+ public void spinAllThreads() throws BuildException {
int numTasks = nestedTasks.size();
Thread[] threads = new Thread[numTasks];
TaskThread[] taskThreads = new TaskThread[numTasks];
@@ -154,10 +317,25 @@
}
}
+ public int getNumProcessors() {
+ try {
+ Class[] paramTypes = {};
+ Method availableProcessors =
+ Runtime.class.getMethod("availableProcessors", paramTypes);
+
+ Object[] args = {};
+ Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
+ return ret.intValue();
+ } catch (Exception e) {
+ // return a bogus number
+ return 0;
+ }
+ }
+
/**
* thread that execs a task
*/
- private static class TaskThread implements Runnable {
+ private class TaskThread implements Runnable {
private Throwable exception;
private Task task;
private int taskNumber;
@@ -181,6 +359,10 @@
task.perform();
} catch (Throwable t) {
exception = t;
+ } finally {
+ synchronized (semaphore) {
+ semaphore.notifyAll();
+ }
}
}
1.4 +13 -0 ant/src/testcases/org/apache/tools/ant/taskdefs/ParallelTest.java
Index: ParallelTest.java
===================================================================
RCS file: /home/cvs/ant/src/testcases/org/apache/tools/ant/taskdefs/ParallelTest.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -w -u -r1.3 -r1.4
--- ParallelTest.java 10 Feb 2003 14:14:45 -0000 1.3
+++ ParallelTest.java 11 Feb 2003 11:26:44 -0000 1.4
@@ -103,6 +103,19 @@
}
+ /** tests basic operation of the parallel task */
+ public void testTreadCount() {
+ // should get no output at all
+ Project project = getProject();
+ project.setUserProperty("test.direct", DIRECT_MESSAGE);
+ project.setUserProperty("test.delayed", DELAYED_MESSAGE);
+ expectOutputAndError("testThreadCount", "", "");
+ String log = getLog();
+ assertEquals("parallel tasks did't block on threads properly", log,
+ "+1-1+2-2+3-3+1+2-1+3-2-3+1+2+3-1-2-3+1+2+3-1-2-3");
+
+ }
+
/** tests the failure of a task within a parallel construction */
public void testFail() {
// should get no output at all