You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@avalon.apache.org by pr...@apache.org on 2002/05/24 16:52:45 UTC

cvs commit: jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test TPCThreadManagerTestCase.java

proyal      02/05/24 07:52:45

  Modified:    event/src/java/org/apache/excalibur/event/command
                        TPCThreadManager.java
  Added:       event/src/test/org/apache/excalibur/event/command/test
                        TPCThreadManagerTestCase.java
  Log:
  Patch and TestCase from Gregory Steuck <gr...@nest.cx>
   * Fixes http://nagoya.apache.org/bugzilla/show_bug.cgi?id=9177
  
  Revision  Changes    Path
  1.15      +62 -46    jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
  
  Index: TPCThreadManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- TPCThreadManager.java	13 May 2002 12:17:40 -0000	1.14
  +++ TPCThreadManager.java	24 May 2002 14:52:45 -0000	1.15
  @@ -9,6 +9,7 @@
   
   import java.util.HashMap;
   import java.util.Iterator;
  +
   import org.apache.avalon.excalibur.concurrent.Mutex;
   import org.apache.avalon.excalibur.thread.ThreadControl;
   import org.apache.avalon.excalibur.thread.ThreadPool;
  @@ -27,7 +28,7 @@
    * @author <a href="mailto:bloritsch@apache.org">Berin Loritsch</a>
    */
   public final class TPCThreadManager
  -    implements Runnable, ThreadManager, Disposable
  +  implements Runnable, ThreadManager, Disposable
   {
       private final ThreadPool m_threadPool;
       private final Mutex m_mutex = new Mutex();
  @@ -90,21 +91,24 @@
           {
               m_mutex.acquire();
   
  -            m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
  +            try
  +            {
  +                m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
   
  -            if( m_done )
  +                if( m_done )
  +                {
  +                    m_threadControl = m_threadPool.execute( this );
  +                }
  +            }
  +            finally
               {
  -                m_threadControl = m_threadPool.execute( this );
  +                m_mutex.release();
               }
           }
           catch( InterruptedException ie )
           {
               // ignore for now
           }
  -        finally
  -        {
  -            m_mutex.release();
  -        }
       }
   
       /**
  @@ -116,22 +120,25 @@
           {
               m_mutex.acquire();
   
  -            m_pipelines.remove( pipeline );
  +            try
  +            {
  +                m_pipelines.remove( pipeline );
   
  -            if( m_pipelines.isEmpty() )
  +                if( m_pipelines.isEmpty() )
  +                {
  +                    m_done = true;
  +                    m_threadControl.join( 1000 );
  +                }
  +            }
  +            finally
               {
  -                m_done = true;
  -                m_threadControl.join( 1000 );
  +                m_mutex.release();
               }
           }
           catch( InterruptedException ie )
           {
               // ignore for now
           }
  -        finally
  -        {
  -            m_mutex.release();
  -        }
       }
   
       /**
  @@ -142,28 +149,31 @@
           try
           {
               m_mutex.acquire();
  +            try
  +            {
  +                m_done = true;
  +                m_pipelines.clear();
   
  -            m_done = true;
  -            m_pipelines.clear();
  -
  -            m_threadControl.join( 1000 );
  +                m_threadControl.join( 1000 );
  +            }
  +            finally
  +            {
  +                m_mutex.release();
  +            }
           }
           catch( InterruptedException ie )
           {
               // ignore for now
           }
  -        finally
  -        {
  -            m_mutex.release();
  -        }
       }
   
       public final void dispose()
       {
           deregisterAll();
  +
           if( m_threadPool instanceof Disposable )
           {
  -            ( (Disposable)m_threadPool ).dispose();
  +            ( ( Disposable ) m_threadPool ).dispose();
           }
   
           m_threadControl = null;
  @@ -171,36 +181,42 @@
   
       public void run()
       {
  -        while( !m_done )
  +        try
           {
  -            try
  +            while( !m_done )
               {
                   m_mutex.acquire();
   
  -                Iterator i = m_pipelines.values().iterator();
  +                try
  +                {
  +                    Iterator i = m_pipelines.values().iterator();
   
  -                while( i.hasNext() )
  +                    while( i.hasNext() )
  +                    {
  +                        try
  +                        {
  +                            m_threadPool.execute( ( PipelineRunner ) i.next() );
  +                        }
  +                        catch( IllegalStateException e )
  +                        {
  +                            // that's the way ResourceLimitingThreadPool reports
  +                            // that it has no threads available, will still try
  +                            // to go on, hopefully at one point there will be
  +                            // a thread to execute our runner
  +                        }
  +                    }
  +                }
  +                finally
                   {
  -                    m_threadPool.execute( (PipelineRunner)i.next() );
  +                    m_mutex.release();
                   }
  -            }
  -            catch( InterruptedException ie )
  -            {
  -                // ignore for now
  -            }
  -            finally
  -            {
  -                m_mutex.release();
  -            }
   
  -            try
  -            {
                   Thread.sleep( m_sleepTime );
               }
  -            catch( InterruptedException ie )
  -            {
  -                // ignore and continue processing
  -            }
  +        }
  +        catch( InterruptedException e )
  +        {
  +            Thread.interrupted();
           }
       }
   
  @@ -220,7 +236,7 @@
   
               for( int i = 0; i < sources.length; i++ )
               {
  -                handler.handleEvents( sources[ i ].dequeueAll() );
  +                handler.handleEvents( sources[i].dequeueAll() );
               }
           }
       }
  
  
  
  1.1                  jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java
  
  Index: TPCThreadManagerTestCase.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.avalon.excalibur.event.command.test;
  
  import java.io.PrintWriter;
  import java.io.StringWriter;
  import junit.framework.TestCase;
  import org.apache.excalibur.event.DefaultQueue;
  import org.apache.excalibur.event.EventHandler;
  import org.apache.excalibur.event.Queue;
  import org.apache.excalibur.event.QueueElement;
  import org.apache.excalibur.event.Sink;
  import org.apache.excalibur.event.SinkException;
  import org.apache.excalibur.event.Source;
  import org.apache.excalibur.event.command.EventPipeline;
  import org.apache.excalibur.event.command.TPCThreadManager;
  
  /**
   * @author <a href="mailto:greg-tpcthreadmanager@nest.cx">Gregory Steuck</a>
   */
  public class TPCThreadManagerTestCase extends TestCase
  {
      public TPCThreadManagerTestCase(String name) {
          super(name);
      }
  
      // number of milliseconds it reasonably takes the JVM to switch threads
      private final static int SCHEDULING_TIMEOUT = 1000; // ms
  
      // number of times the handler should be called
      private final static int MINIMAL_NUMBER_INVOCATIONS = 2;
  
      /**
       * Checks TPCThreadManager ability to survive the situation when
       * it tries to schedule more tasks than it has threads. Originally
       * it was dying due to hitting Pool limit and not catching the
       * resulting runtime exception.
       * <p>
       * The test is not foolproof, it probably depends on preemtive
       * threads management.
       */
      public void testThreadContention() throws Exception
      {
          // enforces only 1 thread and no timeout which makes it
          // fail quickly
          final TPCThreadManager threadManager = new TPCThreadManager(1, 1, 0);
          // an obviously syncronized component
          final StringBuffer result = new StringBuffer();
          final StringWriter exceptionBuffer = new StringWriter();
          final PrintWriter errorOut = new PrintWriter(exceptionBuffer);
          threadManager.register(new Pipeline(result, errorOut));
          // sleeps for 1 more scheduling timeout to surely go over limit
          Thread.sleep(SCHEDULING_TIMEOUT * (MINIMAL_NUMBER_INVOCATIONS + 1));
          int numberCalls = result.length();
          String msg =
              "Number of calls to handler (" + numberCalls +
              ") is less than the expected number of calls (" +
              MINIMAL_NUMBER_INVOCATIONS + ")";
          assertTrue(msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS);
          errorOut.flush(); // why not?
          String stackTrace = exceptionBuffer.toString();
          assertEquals("Exceptions while running the test",
                       "",
                       stackTrace);
      }
  
      private static class Pipeline implements EventPipeline, EventHandler
      {
          private final Queue m_queue = new DefaultQueue();
          private final Source[] m_sources = new Source[]{m_queue};
          private final StringBuffer m_result;
          private final PrintWriter m_errorOut;
  
          Pipeline(StringBuffer resultAccumulator, PrintWriter errorOut)
              throws SinkException
          {
              m_result = resultAccumulator;
              m_errorOut = errorOut;
              // even though TPCThreadManager currently calls event handlers
              // when there is nothing to do, that may change
              m_queue.enqueue(new QueueElement() {});
          }
  
          public EventHandler getEventHandler() {
              return this;
          }
  
          public final Source[] getSources()
          {
              return m_sources;
          }
  
          public final Sink getSink()
          {
              return m_queue;
          }
  
  
          public void handleEvent(QueueElement element) {
              handleEvents(new QueueElement[] {element});
          }
  
          public void handleEvents(QueueElement[] elements) {
              // records the fact that the handler was called
              m_result.append('a');
              try {
                  // sleeps to occupy the thread and let thread manager try to reschedule
                  Thread.sleep(SCHEDULING_TIMEOUT);
                  // enqueues another element to be called again
                  m_queue.enqueue(new QueueElement() {});
              } catch (Exception e) {
                  // fails the test, no exceptions are expected
                  e.printStackTrace(m_errorOut);
                  
              }
          }
      }
  }
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>