You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ant.apache.org by co...@apache.org on 2003/02/11 12:26:44 UTC

cvs commit: ant/src/testcases/org/apache/tools/ant/taskdefs ParallelTest.java

conor       2003/02/11 03:26:44

  Modified:    src/etc/testcases/taskdefs parallel.xml
               src/main/org/apache/tools/ant/taskdefs Parallel.java
               src/testcases/org/apache/tools/ant/taskdefs
                        ParallelTest.java
  Log:
  Add a thread count to the parallel task to stop it using too many threads
  
  PR:	16906
  Submitted by:	Danno Ferrin
  
  Revision  Changes    Path
  1.2       +95 -0     ant/src/etc/testcases/taskdefs/parallel.xml
  
  Index: parallel.xml
  ===================================================================
  RCS file: /home/cvs/ant/src/etc/testcases/taskdefs/parallel.xml,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -w -u -r1.1 -r1.2
  --- parallel.xml	21 Feb 2002 15:38:16 -0000	1.1
  +++ parallel.xml	11 Feb 2003 11:26:44 -0000	1.2
  @@ -21,6 +21,101 @@
       </parallel>
     </target>
   
  +  <target name="testThreadCount">
  +    <parallel threadCount='1' pollInterval="30">
  +      <!-- expected start 1, end 1, start 2, end 2, start 3, end 3 -->
  +      <sequential>
  +        <echo message="+1"/>
  +        <sleep seconds="1"/>
  +        <echo message="-1"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+2"/>
  +        <sleep seconds="2"/>
  +        <echo message="-2"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+3"/>
  +        <sleep seconds="3"/>
  +        <echo message="-3"/>
  +      </sequential>
  +    </parallel>
  +    <parallel threadCount='2' pollInterval="30">
  +      <!-- expected start 1, end 1, start 2, end 2, start 3, end 3 -->
  +      <sequential>
  +        <echo message="+1"/>
  +        <sleep seconds="1"/>
  +        <echo message="-1"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+2"/>
  +        <sleep seconds="2"/>
  +        <echo message="-2"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+3"/>
  +        <sleep seconds="3"/>
  +        <echo message="-3"/>
  +      </sequential>
  +    </parallel>
  +    <parallel threadCount='3' pollInterval="30">
  +      <!-- expected start 1, start 2, end 1, start 3, end 2, end 3 -->
  +      <sequential>
  +        <echo message="+1"/>
  +        <sleep seconds="1"/>
  +        <echo message="-1"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+2"/>
  +        <sleep seconds="2"/>
  +        <echo message="-2"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+3"/>
  +        <sleep seconds="3"/>
  +        <echo message="-3"/>
  +      </sequential>
  +    </parallel>
  +    <parallel threadCount='4' pollInterval="30">
  +      <!-- expected start 1, start 2, start 3, end 1, end 2, end 3 -->
  +      <sequential>
  +        <echo message="+1"/>
  +        <sleep seconds="1"/>
  +        <echo message="-1"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+2"/>
  +        <sleep seconds="2"/>
  +        <echo message="-2"/>
  +      </sequential>
  +      <sequential>
  +        <echo message="+3"/>
  +        <sleep seconds="3"/>
  +        <echo message="-3"/>
  +      </sequential>
  +    </parallel>
  +    <parallel threadsPerProcessor='1' pollInterval="30">
  +      <!-- expected result varies, depends on setup -->
  +      <!-- this is a smoke test for threadsPerProcessor -->
  +      <sequential>
  +        <!--echo message="+1"/-->
  +        <sleep seconds="1"/>
  +        <!--echo message="-1"/-->
  +      </sequential>
  +      <sequential>
  +        <!--echo message="+2"/-->
  +        <sleep seconds="2"/>
  +        <!--echo message="-2"/-->
  +      </sequential>
  +      <sequential>
  +        <!--echo message="+3"/-->
  +        <sleep seconds="3"/>
  +        <!--echo message="-3"/-->
  +      </sequential>
  +    </parallel>
  +    
  +  </target>
  +
     <target name="testDemux">
       <parallel>
         <demuxtest/>
  
  
  
  1.15      +190 -8    ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java
  
  Index: Parallel.java
  ===================================================================
  RCS file: /home/cvs/ant/src/main/org/apache/tools/ant/taskdefs/Parallel.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -w -u -r1.14 -r1.15
  --- Parallel.java	10 Feb 2003 14:13:36 -0000	1.14
  +++ Parallel.java	11 Feb 2003 11:26:44 -0000	1.15
  @@ -53,6 +53,7 @@
    */
   package org.apache.tools.ant.taskdefs;
   
  +import java.lang.reflect.Method;
   import java.util.Enumeration;
   import java.util.Vector;
   import org.apache.tools.ant.BuildException;
  @@ -61,14 +62,22 @@
   import org.apache.tools.ant.TaskContainer;
   import org.apache.tools.ant.util.StringUtils;
   
  -
  -
   /**
    * Executes the contained tasks in separate threads, continuing
  - * once all are completed.
  + * once all are completed.<br/>
  + * New behavior allows for the ant script to specify a maximum number of 
  + * threads that will be executed in parallel.  One should be very careful about
  + * using the <code>waitFor</code> task when specifying <code>threadCount</code>
  + * as it can cause deadlocks if the number of threads is too small or if one of 
  + * the nested tasks fails to execute completely.  The task selection algorithm 
  + * will insure that the tasks listed before a task have started before that 
  + * task is started, but it will not insure a successful completion of those 
  + * tasks or that those tasks will finish first (i.e. it's a classic race 
  + * condition).
    * <p>
    * @author Thomas Christen <a href="mailto:chr@active.ch">chr@active.ch</a>
    * @author Conor MacNeill
  + * @author Danno Ferrin
    * @since Ant 1.4
    *
    * @ant.task category="control"
  @@ -79,6 +88,17 @@
       /** Collection holding the nested tasks */
       private Vector nestedTasks = new Vector();
   
  +    /** Semaphore to notify of completed threads */
  +    private final Object semaphore = new Object();
  +    
  +    /** Total number of threads to run */
  +    int numThreads = 0;
  +    
  +    /** Total number of threads per processor to run.  */
  +    int numThreadsPerProcessor = 0;
  +    
  +    /** Interval (in ms) to poll for finished threads. */
  +    int pollInterval = 1000; // default is once a second
   
       /**
        * Add a nested task to execute in parallel.
  @@ -89,11 +109,154 @@
       }
   
       /**
  -     * Block execution until the specified time or for a
  -     * specified amount of milliseconds and if defined,
  -     * execute the wait status.
  +     * Dynamically generates the number of threads to execute based on the 
  +     * number of available processors (via 
  +     * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE 
  +     * 1.4 VM, and it will overwrite the value set in threadCount.  
  +     * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to 
  +     * <code>threadCount</code>.; optional
  +     * @param numThreadsPerProcessor Number of threads to create per available 
  +     *        processor.
  +     *
  +     */
  +    public void setThreadsPerProcessor(int numThreadsPerProcessor) {
  +        this.numThreadsPerProcessor = numThreadsPerProcessor;
  +    }
  +    
  +    /** 
  +     * Statically determine the maximum number of tasks to execute 
  +     * simultaneously.  If there are less tasks than threads then all will be 
  +     * executed at once, if there are more then only <code>threadCount</code> 
  +     * tasks will be executed at one time.  If <code>threadsPerProcessor</code> 
  +     * is set and the JVM is at least a 1.4 VM then this value is ignormed.; optional
  +     *
  +     * @param numThreads total number of therads.
  +     *
  +     */
  +    public void setThreadCount(int numThreads) {
  +        this.numThreads = numThreads;
  +    }
  +
  +    /** 
  +     * Interval to poll for completed threads when threadCount or 
  +     * threadsPerProcessor is specified.  Integer in milliseconds.; optional
  +     *
  +     * @param pollInterval New value of property pollInterval.
        */
  +    public void setPollInterval(int pollInterval) {
  +        this.pollInterval = pollInterval;
  +    }
  +    
       public void execute() throws BuildException {
  +        updateThreadCounts();
  +        if (numThreads == 0) {
  +            spinAllThreads();
  +        } else {
  +            spinNumThreads();
  +        }
  +    }
  +    
  +    public void updateThreadCounts() {
  +        if (numThreadsPerProcessor != 0) {
  +            int numProcessors = getNumProcessors();
  +            if (numProcessors != 0) {
  +                numThreads = numProcessors * numThreadsPerProcessor;
  +            }
  +        }
  +    }
  +        
  +    /**
  +     * Spin up threadCount threads.
  +     */
  +    public void spinNumThreads() throws BuildException {
  +        final int maxThreads = nestedTasks.size();
  +        Thread[] threads = new Thread[maxThreads];
  +        TaskThread[] taskThreads = new TaskThread[maxThreads];
  +        int threadNumber = 0;
  +        for (Enumeration e = nestedTasks.elements(); e.hasMoreElements(); 
  +             threadNumber++) {
  +            Task nestedTask = (Task) e.nextElement();
  +            ThreadGroup group = new ThreadGroup("parallel");
  +            TaskThread taskThread = new TaskThread(threadNumber, nestedTask);
  +            taskThreads[threadNumber] = taskThread;
  +            threads[threadNumber] = new Thread(group, taskThread);
  +        }
  +
  +        final int maxRunning = numThreads;
  +        Thread[] running = new Thread[maxRunning];
  +        threadNumber = 0;
  +        
  +        // now run them in limited numbers...
  +        outer:
  +        while (threadNumber < maxThreads) {
  +            synchronized(semaphore) {
  +                for (int i = 0; i < maxRunning; i++) {
  +                    if (running[i] == null || !running[i].isAlive()) {
  +                        running[i] = threads[threadNumber++];
  +                        running[i].start();
  +                        // countinue on outer while loop in case we used our last thread
  +                        continue outer;
  +                    }
  +                }
  +                // if we got here all are running, so sleep a little
  +                try {
  +                    semaphore.wait(pollInterval);
  +                } catch (InterruptedException ie) {
  +                    // dosen't java know interruptions are rude?
  +                    // just pretend it didn't happen and go aobut out business.
  +                    // sheesh!
  +                }
  +            }
  +        }
  +            
  +        // now join to all the threads 
  +        for (int i = 0; i < maxRunning; ++i) {
  +            try {
  +                if (running[i] != null) {
  +                    running[i].join();
  +                }
  +            } catch (InterruptedException ie) {
  +                // who would interrupt me at a time like this?
  +            }
  +        }
  +        
  +        // now did any of the threads throw an exception
  +        StringBuffer exceptionMessage = new StringBuffer();
  +        int numExceptions = 0;
  +        Throwable firstException = null;
  +        Location firstLocation = Location.UNKNOWN_LOCATION;;
  +        for (int i = 0; i < maxThreads; ++i) {
  +            Throwable t = taskThreads[i].getException();
  +            if (t != null) {
  +                numExceptions++;
  +                if (firstException == null) {
  +                    firstException = t;
  +                }
  +                if (t instanceof BuildException && 
  +                        firstLocation == Location.UNKNOWN_LOCATION) {
  +                    firstLocation = ((BuildException) t).getLocation();
  +                }
  +                exceptionMessage.append(StringUtils.LINE_SEP);
  +                exceptionMessage.append(t.getMessage());
  +            }
  +        }
  +        
  +        if (numExceptions == 1) {
  +            if (firstException instanceof BuildException) {
  +                throw (BuildException) firstException;
  +            } else {
  +                throw new BuildException(firstException);
  +            }
  +        } else if (numExceptions > 1) {
  +            throw new BuildException(exceptionMessage.toString(), 
  +                                     firstLocation);
  +        }
  +    }
  +        
  +    /**
  +     * Spin up one thread per task.
  +     */
  +    public void spinAllThreads() throws BuildException {
           int numTasks = nestedTasks.size();
           Thread[] threads = new Thread[numTasks];
           TaskThread[] taskThreads = new TaskThread[numTasks];
  @@ -154,10 +317,25 @@
           }
       }
   
  +    public int getNumProcessors() {
  +        try {
  +            Class[] paramTypes = {};
  +            Method availableProcessors =
  +                Runtime.class.getMethod("availableProcessors", paramTypes);
  +
  +            Object[] args = {};
  +            Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
  +            return ret.intValue();
  +        } catch (Exception e) {
  +            // return a bogus number
  +            return 0;
  +        }
  +    }
  +
       /**
        * thread that execs a task
        */
  -    private static class TaskThread implements Runnable {
  +    private class TaskThread implements Runnable {
           private Throwable exception;
           private Task task;
           private int taskNumber;
  @@ -181,6 +359,10 @@
                   task.perform();
               } catch (Throwable t) {
                   exception = t;
  +            } finally {
  +                synchronized (semaphore) {
  +                    semaphore.notifyAll();
  +                }
               }
           }
   
  
  
  
  1.4       +13 -0     ant/src/testcases/org/apache/tools/ant/taskdefs/ParallelTest.java
  
  Index: ParallelTest.java
  ===================================================================
  RCS file: /home/cvs/ant/src/testcases/org/apache/tools/ant/taskdefs/ParallelTest.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -w -u -r1.3 -r1.4
  --- ParallelTest.java	10 Feb 2003 14:14:45 -0000	1.3
  +++ ParallelTest.java	11 Feb 2003 11:26:44 -0000	1.4
  @@ -103,6 +103,19 @@
   
       }
   
  +    /** tests basic operation of the parallel task */
  +    public void testTreadCount() {
  +        // should get no output at all
  +        Project project = getProject();
  +        project.setUserProperty("test.direct", DIRECT_MESSAGE);
  +        project.setUserProperty("test.delayed", DELAYED_MESSAGE);
  +        expectOutputAndError("testThreadCount", "", "");
  +        String log = getLog();
  +        assertEquals("parallel tasks did't block on threads properly", log,
  +            "+1-1+2-2+3-3+1+2-1+3-2-3+1+2+3-1-2-3+1+2+3-1-2-3");
  +
  +    }
  +
       /** tests the failure of a task within a parallel construction */
       public void testFail() {
           // should get no output at all