You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ak...@apache.org on 2004/03/20 04:51:57 UTC
svn commit: rev 9645 - in incubator/directory/eve/trunk/eve/frontend/common/api: . src/java/org/apache/eve/seda src/test/org/apache/eve/event
Author: akarasulu
Date: Fri Mar 19 19:51:56 2004
New Revision: 9645
Added:
incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StderrLoggingStageMonitor.java
incubator/directory/eve/trunk/eve/frontend/common/api/src/test/org/apache/eve/event/DefaultStageTest.java
Modified:
incubator/directory/eve/trunk/eve/frontend/common/api/project.xml
incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/DefaultStage.java
incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitor.java
incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitorAdapter.java
Log:
o Found bug where we were not cleaning up threads due to exceptions from
the active worker list to perform graceful shutdowns of the stage.
o Also cleaned up the monitor situation. And created a real logging monitor
that for the time being logs to the console but I think this can be
changed using logger settings so the name is a misnomer: will have to
change that on subsequent checkins.
Modified: incubator/directory/eve/trunk/eve/frontend/common/api/project.xml
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/common/api/project.xml (original)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/project.xml Fri Mar 19 19:51:56 2004
@@ -26,6 +26,20 @@
</dependency>
<dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.0.3</version>
+ <url>http://jakarta.apache.org/commons/logging</url>
+ </dependency>
+
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.7</version>
+ <url>http://logging.apache.org/log4j/docs</url>
+ </dependency>
+
+ <dependency>
<groupId>incubator-directory</groupId>
<artifactId>ldap-common</artifactId>
<version>SNAPSHOT</version>
Modified: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/DefaultStage.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/DefaultStage.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/DefaultStage.java Fri Mar 19 19:51:56 2004
@@ -32,6 +32,12 @@
*/
public class DefaultStage implements Stage
{
+ /**
+ * time to sleep in milliseconds waiting for this stage to stop
+ * @todo migth want add this to config and use this as the default value
+ */
+ private static final long STOP_WAIT = 100 ;
+
/** the configuration bean */
protected final StageConfig config ;
/** this Stage's event queue */
@@ -143,11 +149,15 @@
{
synchronized ( queue )
{
+ monitor.lockedQueue( DefaultStage.this ) ;
+
if( queue.isEmpty() )
{
try
{
+ monitor.waiting( DefaultStage.this ) ;
queue.wait() ;
+ monitor.notified( DefaultStage.this ) ;
}
catch( InterruptedException e )
{
@@ -162,6 +172,7 @@
monitor.eventDequeued( DefaultStage.this, event ) ;
Runnable l_runnable = new ExecutableHandler( event ) ;
config.getThreadPool().execute( l_runnable ) ;
+ monitor.eventHandled( DefaultStage.this, event ) ;
}
}
}
@@ -187,26 +198,25 @@
public void run()
{
- synchronized( activeWorkers )
- {
- activeWorkers.add( Thread.currentThread() ) ;
- }
+ activeWorkers.add( Thread.currentThread() ) ;
try
{
- config.getHandler().handleEvent( m_event ) ;
+ if ( config.getHandler() != null )
+ {
+ config.getHandler().handleEvent( m_event ) ;
+ }
+
+ monitor.handlerMissing( DefaultStage.this ) ;
}
catch( Throwable t )
{
monitor.handlerFailed( DefaultStage.this, m_event, t ) ;
}
-
- synchronized( activeWorkers )
+ finally
{
activeWorkers.remove( Thread.currentThread() ) ;
}
-
- monitor.eventHandled( DefaultStage.this, m_event ) ;
}
}
@@ -221,18 +231,15 @@
*/
public void start()
{
- synchronized( hasStarted )
+ if ( hasStarted.booleanValue() )
{
- if ( hasStarted.booleanValue() )
- {
- throw new IllegalStateException( "Already started!" ) ;
- }
-
- hasStarted = new Boolean( true ) ;
- thread = new Thread( new StageDriver() ) ;
- thread.start() ;
+ throw new IllegalStateException( "Already started!" ) ;
}
+ hasStarted = new Boolean( true ) ;
+ thread = new Thread( new StageDriver() ) ;
+ thread.start() ;
+
monitor.started( this ) ;
}
@@ -244,10 +251,12 @@
public void stop() throws InterruptedException
{
hasStarted = new Boolean( false ) ;
+ monitor.stopping( this ) ;
while ( thread.isAlive() || ! activeWorkers.isEmpty() )
{
- Thread.sleep( 100 ) ;
+ monitor.stopping( this, STOP_WAIT ) ;
+ Thread.sleep( STOP_WAIT ) ;
synchronized( queue )
{
Modified: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitor.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitor.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitor.java Fri Mar 19 19:51:56 2004
@@ -30,6 +30,45 @@
public interface StageMonitor
{
/**
+ * Reports when the Stage is missing its handler.
+ *
+ * @param stage the stage reporting a missing handler
+ */
+ void handlerMissing( Stage stage ) ;
+
+ /**
+ * A has been made to gracefully stop the Stage.
+ *
+ * @param stage the Stage being stopped
+ */
+ void stopping( Stage stage ) ;
+
+ /**
+ * A has been made to gracefully stop the Stage but we're waiting for some
+ * active workers to complete.
+ *
+ * @param stage the Stage being stopped
+ * @param millis the time in milliseconds waiting for workers to complete
+ */
+ void stopping( Stage stage, long millis ) ;
+
+ /**
+ * The stage driver thread was notified out of the wait state due to an
+ * enqueue operation or to the driver being requested to stop.
+ *
+ * @param stage the notified Stage
+ */
+ void notified( Stage stage ) ;
+
+ /**
+ * Notification of stage driver thread going into wait state due to an
+ * empty queue.
+ *
+ * @param stage the waiting Stage
+ */
+ void waiting( Stage stage ) ;
+
+ /**
* Monitors Stage has starts.
*
* @param stage the started Stage
@@ -73,6 +112,13 @@
* @param event the event to be enqueued
*/
void lockedQueue( Stage stage, EventObject event ) ;
+
+ /**
+ * Queue lock acquired by awoken Stage driver thread.
+ *
+ * @param stage the Stage whose queue lock was acquired
+ */
+ void lockedQueue( Stage stage ) ;
/**
* Monitor for dequeue operations.
Modified: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitorAdapter.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitorAdapter.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StageMonitorAdapter.java Fri Mar 19 19:51:56 2004
@@ -19,10 +19,13 @@
import java.util.EventObject ;
+import org.apache.commons.lang.exception.ExceptionUtils ;
+
/**
- * A do nothing adapter for a stage. For safty's sake this adapter throws
- * runtime exceptions wrapping failure exception notifications.
+ * A do nothing adapter for a stage. For safty's sake this adapter reports
+ * exceptions that occur on failure exception notifications to stderr. This
+ * is just for safty since we do not want to ignore these exceptions.
*
* @author <a href="mailto:directory-dev@incubator.apache.org">
* Apache Directory Project</a>
@@ -31,6 +34,48 @@
public class StageMonitorAdapter implements StageMonitor
{
/* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#handlerMissing(
+ * org.apache.eve.seda.Stage)
+ */
+ public void handlerMissing( Stage stage )
+ {
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#stopping(org.apache.eve.seda.Stage)
+ */
+ public void stopping( Stage stage )
+ {
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#stopping(org.apache.eve.seda.Stage,
+ * long)
+ */
+ public void stopping( Stage stage, long millis )
+ {
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#notified(org.apache.eve.seda.Stage)
+ */
+ public void notified( Stage stage )
+ {
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#waiting( Stage )
+ */
+ public void waiting( Stage stage )
+ {
+ }
+
+
+ /* (non-Javadoc)
* @see org.apache.eve.seda.StageMonitor#started(org.apache.eve.seda.Stage)
*/
public void started( Stage stage )
@@ -75,6 +120,15 @@
/* (non-Javadoc)
* @see org.apache.eve.seda.StageMonitor#lockedQueue(
+ * org.apache.eve.seda.Stage)
+ */
+ public void lockedQueue( Stage stage )
+ {
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#lockedQueue(
* org.apache.eve.seda.Stage, java.util.EventObject)
*/
public void lockedQueue( Stage stage, EventObject event )
@@ -106,7 +160,7 @@
*/
public void driverFailed( Stage stage, InterruptedException fault )
{
- throw new RuntimeException( fault ) ;
+ System.err.println( ExceptionUtils.getFullStackTrace( fault ) ) ;
}
@@ -116,6 +170,6 @@
*/
public void handlerFailed( Stage stage, EventObject event, Throwable fault )
{
- throw new RuntimeException( fault ) ;
+ System.err.println( ExceptionUtils.getFullStackTrace( fault ) ) ;
}
}
Added: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StderrLoggingStageMonitor.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/seda/StderrLoggingStageMonitor.java Fri Mar 19 19:51:56 2004
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eve.seda ;
+
+
+import java.util.EventObject ;
+
+import org.apache.commons.lang.Validate ;
+import org.apache.commons.lang.ClassUtils ;
+
+import org.apache.commons.logging.Log ;
+import org.apache.commons.logging.LogFactory ;
+
+
+/**
+ * A do nothing adapter for a stage. For safty's sake this adapter reports
+ * exceptions that occur on failure exception notifications to stderr. This
+ * is just for safty since we do not want to ignore these exceptions.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev: 9537 $
+ */
+public class StderrLoggingStageMonitor implements StageMonitor
+{
+ private final Log log ;
+
+
+ /**
+ * Presumes the logged class is the DefaultStage.
+ */
+ public StderrLoggingStageMonitor()
+ {
+ log = LogFactory.getLog( DefaultStage.class ) ;
+ }
+
+
+ /**
+ * Logs a specific Stage implementing class.
+ *
+ * @param clazz the class of the stage
+ * @throws IllegalArgumentException if clazz does not implement Stage
+ */
+ public StderrLoggingStageMonitor( Class clazz )
+ {
+ Validate.isTrue( ClassUtils.isAssignable( clazz, Stage.class ),
+ clazz.getName() + " does not implement the Stage interface" ) ;
+ log = LogFactory.getLog( DefaultStage.class ) ;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#
+ * handlerMissing(org.apache.eve.seda.Stage)
+ */
+ public void handlerMissing( Stage stage )
+ {
+ if ( log.isErrorEnabled() )
+ {
+ log.error( "Stage " + stage.getConfig().getName()
+ + " does not have a handler assigned" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#started(org.apache.eve.seda.Stage)
+ */
+ public void started( Stage stage )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName() + " has started!" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#stopped(org.apache.eve.seda.Stage)
+ */
+ public void stopped( Stage stage )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName() + " has stopped!" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#startedDriver(
+ * org.apache.eve.seda.Stage)
+ */
+ public void startedDriver( Stage stage )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s driver started execution!" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#enqueueOccurred(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void enqueueOccurred( Stage stage, EventObject event )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName() + " had event " + event
+ + " enqueued!" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#enqueueRejected(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void enqueueRejected( Stage stage, EventObject event )
+ {
+ if ( log.isWarnEnabled() )
+ {
+ log.warn( stage.getConfig().getName() + " had event " + event
+ + " enqueue REJECTED!" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#lockedQueue(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void lockedQueue( Stage stage, EventObject event )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s queue locked for processing " + event ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#lockedQueue(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void lockedQueue( Stage stage )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s queue locked by awoken stage driver thread" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#notified(org.apache.eve.seda.Stage)
+ */
+ public void notified( Stage stage )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s stage driver thread notified out of waiting" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#stopping(org.apache.eve.seda.Stage)
+ */
+ public void stopping( Stage stage )
+ {
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Graceful shutdown of stage "
+ + stage.getConfig().getName() + " was requested" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#stopping(org.apache.eve.seda.Stage,
+ * long)
+ */
+ public void stopping( Stage stage, long millis )
+ {
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Waiting " + millis + " for graceful shutdown of stage "
+ + stage.getConfig().getName() ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#lockedQueue(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void waiting( Stage stage )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s stage queue is empty, driver thread is waiting" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#eventDequeued(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void eventDequeued( Stage stage, EventObject event )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName() + " had event " + event
+ + " dequeued!" ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#eventHandled(
+ * org.apache.eve.seda.Stage, java.util.EventObject)
+ */
+ public void eventHandled( Stage stage, EventObject event )
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( stage.getConfig().getName() + " handled " + event ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#driverFailed(
+ * org.apache.eve.seda.Stage, java.lang.InterruptedException)
+ */
+ public void driverFailed( Stage stage, InterruptedException fault )
+ {
+ if ( log.isErrorEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s driver failed", fault ) ;
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.seda.StageMonitor#handlerFailed(
+ * org.apache.eve.seda.Stage, java.util.EventObject, java.lang.Throwable)
+ */
+ public void handlerFailed( Stage stage, EventObject event, Throwable fault )
+ {
+ if ( log.isErrorEnabled() )
+ {
+ log.debug( stage.getConfig().getName()
+ + "'s handler failed", fault ) ;
+ }
+ }
+}
Added: incubator/directory/eve/trunk/eve/frontend/common/api/src/test/org/apache/eve/event/DefaultStageTest.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/test/org/apache/eve/event/DefaultStageTest.java Fri Mar 19 19:51:56 2004
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eve.event;
+
+import java.util.ArrayList;
+import java.util.EventObject;
+
+import org.apache.eve.seda.DefaultStage;
+import org.apache.eve.seda.DefaultStageConfig;
+import org.apache.eve.seda.EnqueuePredicate;
+import org.apache.eve.seda.StageHandler;
+import org.apache.eve.seda.StderrLoggingStageMonitor;
+import org.apache.eve.thread.ThreadPool;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests the DefaultStage class.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class DefaultStageTest extends TestCase
+{
+ DefaultStage stage = null ;
+ ThreadPool pool = null ;
+ DefaultStageConfig config = null ;
+ StageHandler handler = null ;
+ ArrayList events = null ;
+
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception
+ {
+ // @todo Auto-generated method stub
+ super.tearDown();
+
+ events = null ;
+ handler = null ;
+ config = null ;
+ pool = null ;
+ stage.stop() ;
+ stage = null ;
+ }
+
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception
+ {
+ super.setUp() ;
+
+ pool = new ThreadPool()
+ {
+ /* (non-Javadoc)
+ * @see org.apache.eve.thread.ThreadPool#execute(java.lang.Runnable)
+ */
+ public void execute( Runnable runnable )
+ {
+ runnable.run() ;
+ }
+ };
+
+ events = new ArrayList() ;
+
+ handler = new StageHandler()
+ {
+ public void handleEvent( EventObject event )
+ {
+ events.add( event ) ;
+ }
+ } ;
+
+ config = new DefaultStageConfig( "test", pool ) ;
+ config.setHandler( handler ) ;
+ stage = new DefaultStage( config ) ;
+ stage.setMonitor( new StderrLoggingStageMonitor(stage.getClass()) ) ;
+ stage.start() ;
+ }
+
+
+ public void testAddPredicateAccept() throws Exception
+ {
+ stage.addPredicate( new EnqueuePredicate()
+ {
+ public boolean accept( EventObject event )
+ {
+ return true ;
+ }
+ } ) ;
+
+ stage.enqueue( new EventObject( this ) ) ;
+ stage.stop() ;
+ assertEquals( 1, events.size() ) ;
+ }
+
+
+ public void testAddPredicateDeny() throws Exception
+ {
+ stage.addPredicate( new EnqueuePredicate()
+ {
+ public boolean accept( EventObject event )
+ {
+ return false ;
+ }
+ } ) ;
+
+ stage.enqueue( new EventObject( this ) ) ;
+ stage.stop() ;
+ assertEquals( 0, events.size() ) ;
+ }
+
+
+ public void testGetConfig()
+ {
+ assertEquals( config, stage.getConfig() ) ;
+ }
+
+
+ public void testEnqueue()
+ {
+
+ }
+
+
+ public void testSetMonitor()
+ {
+
+ }
+}