You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@avalon.apache.org by pr...@apache.org on 2002/05/24 16:52:45 UTC
cvs commit: jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test TPCThreadManagerTestCase.java
proyal 02/05/24 07:52:45
Modified: event/src/java/org/apache/excalibur/event/command
TPCThreadManager.java
Added: event/src/test/org/apache/excalibur/event/command/test
TPCThreadManagerTestCase.java
Log:
Patch and TestCase from Gregory Steuck <gr...@nest.cx>
* Fixes http://nagoya.apache.org/bugzilla/show_bug.cgi?id=9177
Revision Changes Path
1.15 +62 -46 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
Index: TPCThreadManager.java
===================================================================
RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- TPCThreadManager.java 13 May 2002 12:17:40 -0000 1.14
+++ TPCThreadManager.java 24 May 2002 14:52:45 -0000 1.15
@@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.Iterator;
+
import org.apache.avalon.excalibur.concurrent.Mutex;
import org.apache.avalon.excalibur.thread.ThreadControl;
import org.apache.avalon.excalibur.thread.ThreadPool;
@@ -27,7 +28,7 @@
* @author <a href="mailto:bloritsch@apache.org">Berin Loritsch</a>
*/
public final class TPCThreadManager
- implements Runnable, ThreadManager, Disposable
+ implements Runnable, ThreadManager, Disposable
{
private final ThreadPool m_threadPool;
private final Mutex m_mutex = new Mutex();
@@ -90,21 +91,24 @@
{
m_mutex.acquire();
- m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
+ try
+ {
+ m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
- if( m_done )
+ if( m_done )
+ {
+ m_threadControl = m_threadPool.execute( this );
+ }
+ }
+ finally
{
- m_threadControl = m_threadPool.execute( this );
+ m_mutex.release();
}
}
catch( InterruptedException ie )
{
// ignore for now
}
- finally
- {
- m_mutex.release();
- }
}
/**
@@ -116,22 +120,25 @@
{
m_mutex.acquire();
- m_pipelines.remove( pipeline );
+ try
+ {
+ m_pipelines.remove( pipeline );
- if( m_pipelines.isEmpty() )
+ if( m_pipelines.isEmpty() )
+ {
+ m_done = true;
+ m_threadControl.join( 1000 );
+ }
+ }
+ finally
{
- m_done = true;
- m_threadControl.join( 1000 );
+ m_mutex.release();
}
}
catch( InterruptedException ie )
{
// ignore for now
}
- finally
- {
- m_mutex.release();
- }
}
/**
@@ -142,28 +149,31 @@
try
{
m_mutex.acquire();
+ try
+ {
+ m_done = true;
+ m_pipelines.clear();
- m_done = true;
- m_pipelines.clear();
-
- m_threadControl.join( 1000 );
+ m_threadControl.join( 1000 );
+ }
+ finally
+ {
+ m_mutex.release();
+ }
}
catch( InterruptedException ie )
{
// ignore for now
}
- finally
- {
- m_mutex.release();
- }
}
public final void dispose()
{
deregisterAll();
+
if( m_threadPool instanceof Disposable )
{
- ( (Disposable)m_threadPool ).dispose();
+ ( ( Disposable ) m_threadPool ).dispose();
}
m_threadControl = null;
@@ -171,36 +181,42 @@
public void run()
{
- while( !m_done )
+ try
{
- try
+ while( !m_done )
{
m_mutex.acquire();
- Iterator i = m_pipelines.values().iterator();
+ try
+ {
+ Iterator i = m_pipelines.values().iterator();
- while( i.hasNext() )
+ while( i.hasNext() )
+ {
+ try
+ {
+ m_threadPool.execute( ( PipelineRunner ) i.next() );
+ }
+ catch( IllegalStateException e )
+ {
+ // that's the way ResourceLimitingThreadPool reports
+ // that it has no threads available, will still try
+ // to go on, hopefully at one point there will be
+ // a thread to execute our runner
+ }
+ }
+ }
+ finally
{
- m_threadPool.execute( (PipelineRunner)i.next() );
+ m_mutex.release();
}
- }
- catch( InterruptedException ie )
- {
- // ignore for now
- }
- finally
- {
- m_mutex.release();
- }
- try
- {
Thread.sleep( m_sleepTime );
}
- catch( InterruptedException ie )
- {
- // ignore and continue processing
- }
+ }
+ catch( InterruptedException e )
+ {
+ Thread.interrupted();
}
}
@@ -220,7 +236,7 @@
for( int i = 0; i < sources.length; i++ )
{
- handler.handleEvents( sources[ i ].dequeueAll() );
+ handler.handleEvents( sources[i].dequeueAll() );
}
}
}
1.1 jakarta-avalon-excalibur/event/src/test/org/apache/excalibur/event/command/test/TPCThreadManagerTestCase.java
Index: TPCThreadManagerTestCase.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE.txt file.
*/
package org.apache.avalon.excalibur.event.command.test;
import java.io.PrintWriter;
import java.io.StringWriter;
import junit.framework.TestCase;
import org.apache.excalibur.event.DefaultQueue;
import org.apache.excalibur.event.EventHandler;
import org.apache.excalibur.event.Queue;
import org.apache.excalibur.event.QueueElement;
import org.apache.excalibur.event.Sink;
import org.apache.excalibur.event.SinkException;
import org.apache.excalibur.event.Source;
import org.apache.excalibur.event.command.EventPipeline;
import org.apache.excalibur.event.command.TPCThreadManager;
/**
* @author <a href="mailto:greg-tpcthreadmanager@nest.cx">Gregory Steuck</a>
*/
public class TPCThreadManagerTestCase extends TestCase
{
public TPCThreadManagerTestCase(String name) {
super(name);
}
// number of milliseconds it reasonably takes the JVM to switch threads
private final static int SCHEDULING_TIMEOUT = 1000; // ms
// number of times the handler should be called
private final static int MINIMAL_NUMBER_INVOCATIONS = 2;
/**
* Checks TPCThreadManager ability to survive the situation when
* it tries to schedule more tasks than it has threads. Originally
* it was dying due to hitting Pool limit and not catching the
* resulting runtime exception.
* <p>
* The test is not foolproof, it probably depends on preemtive
* threads management.
*/
public void testThreadContention() throws Exception
{
// enforces only 1 thread and no timeout which makes it
// fail quickly
final TPCThreadManager threadManager = new TPCThreadManager(1, 1, 0);
// an obviously syncronized component
final StringBuffer result = new StringBuffer();
final StringWriter exceptionBuffer = new StringWriter();
final PrintWriter errorOut = new PrintWriter(exceptionBuffer);
threadManager.register(new Pipeline(result, errorOut));
// sleeps for 1 more scheduling timeout to surely go over limit
Thread.sleep(SCHEDULING_TIMEOUT * (MINIMAL_NUMBER_INVOCATIONS + 1));
int numberCalls = result.length();
String msg =
"Number of calls to handler (" + numberCalls +
") is less than the expected number of calls (" +
MINIMAL_NUMBER_INVOCATIONS + ")";
assertTrue(msg, numberCalls >= MINIMAL_NUMBER_INVOCATIONS);
errorOut.flush(); // why not?
String stackTrace = exceptionBuffer.toString();
assertEquals("Exceptions while running the test",
"",
stackTrace);
}
private static class Pipeline implements EventPipeline, EventHandler
{
private final Queue m_queue = new DefaultQueue();
private final Source[] m_sources = new Source[]{m_queue};
private final StringBuffer m_result;
private final PrintWriter m_errorOut;
Pipeline(StringBuffer resultAccumulator, PrintWriter errorOut)
throws SinkException
{
m_result = resultAccumulator;
m_errorOut = errorOut;
// even though TPCThreadManager currently calls event handlers
// when there is nothing to do, that may change
m_queue.enqueue(new QueueElement() {});
}
public EventHandler getEventHandler() {
return this;
}
public final Source[] getSources()
{
return m_sources;
}
public final Sink getSink()
{
return m_queue;
}
public void handleEvent(QueueElement element) {
handleEvents(new QueueElement[] {element});
}
public void handleEvents(QueueElement[] elements) {
// records the fact that the handler was called
m_result.append('a');
try {
// sleeps to occupy the thread and let thread manager try to reschedule
Thread.sleep(SCHEDULING_TIMEOUT);
// enqueues another element to be called again
m_queue.enqueue(new QueueElement() {});
} catch (Exception e) {
// fails the test, no exceptions are expected
e.printStackTrace(m_errorOut);
}
}
}
}
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>