You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ak...@apache.org on 2004/02/19 09:00:11 UTC
svn commit: rev 6770 - in incubator/directory/eve/trunk/eve/frontend: common/api/src/java/org/apache/eve/event event/pojo-impl/src/java/org/apache/eve/event input/merlin-impl/src/java/org/apache/eve/input input/pojo-impl/src/java/org/apache/eve/input input/spi/src/java/org/apache/eve/input listener/pojo-impl/src/java/org/apache/eve/listener
Author: akarasulu
Date: Thu Feb 19 00:00:08 2004
New Revision: 6770
Added:
incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/AbstractSubscriber.java
incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/SubscriberMonitor.java
Modified:
incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java
incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java
incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java
incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java
incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java
incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
Log:
Added some extra subscriber functionality to the common api. Also
moved out interfaces for pub/sub functionality to the implementation
rather than having the service interface extend them.
Added: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/AbstractSubscriber.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/AbstractSubscriber.java Thu Feb 19 00:00:08 2004
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eve.event ;
+
+
+import java.util.EventObject ;
+import java.lang.reflect.Method ;
+
+
+/**
+ * An abstract Subscriber that calls the provided type-specific inform method
+ * of Subscriber sub-interface. This way their is no need to downcast the
+ * event. Reflection is used by the abstract subscriber's inform method to
+ * determine at run time which inform method of a concrete subscriber to invoke.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class AbstractSubscriber implements Subscriber
+{
+ /** monitor for this Subscriber */
+ private final SubscriberMonitor m_monitor ;
+
+
+ /**
+ * Creates a Subscriber that does not monitor failures to inform.
+ */
+ public AbstractSubscriber()
+ {
+ m_monitor = null ;
+ }
+
+
+ /**
+ * Creates a Subscriber that does monitor failures on inform.
+ *
+ * @param a_monitor the monitor to use on failures
+ */
+ public AbstractSubscriber( SubscriberMonitor a_monitor )
+ {
+ m_monitor = a_monitor ;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
+ */
+ public void inform( EventObject a_event )
+ {
+ if ( a_event == null )
+ {
+ return ;
+ }
+
+ Method l_method = null ;
+ Class l_paramTypes[] = new Class[1] ;
+ l_paramTypes[0] = a_event.getClass() ;
+
+ try {
+ /*
+ * Look for an inform method in the current object that takes the
+ * event subtype as a parameter
+ */
+ l_method = getClass().getDeclaredMethod( "inform", l_paramTypes ) ;
+ Object l_paramList[] = new Object[1] ;
+ l_paramList[0] = a_event ;
+ l_method.invoke( this, l_paramList ) ;
+ }
+ catch ( Throwable t )
+ {
+ if ( m_monitor != null )
+ {
+ m_monitor.failedOnInform( this, a_event, t ) ;
+ }
+ }
+ }
+}
Added: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/SubscriberMonitor.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/SubscriberMonitor.java Thu Feb 19 00:00:08 2004
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eve.event ;
+
+
+import java.util.EventObject ;
+
+
+/**
+ * Monitors noteworthy Subscriber activities.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev$
+ */
+public interface SubscriberMonitor
+{
+ /**
+ * Monitors failures occuring while handling events.
+ *
+ * @param a_subscriber the Subscriber that failed on inform
+ * @param a_eventObject the EventObject fired
+ * @param a_throwable the resulting failure exception if any
+ */
+ void failedOnInform( Subscriber a_subscriber, EventObject a_eventObject,
+ Throwable a_throwable ) ;
+}
Modified: incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java Thu Feb 19 00:00:08 2004
@@ -195,8 +195,11 @@
continue ;
}
- if ( l_subscriptions[ii].getFilter() != null &&
- l_subscriptions[ii].getFilter().apply( a_event ) )
+ if ( l_subscriptions[ii].getFilter() == null )
+ {
+ l_subscriptions[ii].getSubscriber().inform( a_event ) ;
+ }
+ else if ( l_subscriptions[ii].getFilter().apply( a_event ) )
{
l_subscriptions[ii].getSubscriber().inform( a_event ) ;
}
Modified: incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java Thu Feb 19 00:00:08 2004
@@ -227,8 +227,13 @@
}
+ /*
+ * Generates a hex string for a buffer.
+ */
public String toHexString( ByteBuffer a_buf )
{
- return new String ( a_buf.array() ) ;
+ byte[] l_bites = new byte[a_buf.remaining()] ;
+ a_buf.get( l_bites ) ;
+ return new String ( l_bites ) ;
}
}
Modified: incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java Thu Feb 19 00:00:08 2004
@@ -17,23 +17,19 @@
package org.apache.eve.input ;
-import java.util.EventObject ;
-
import java.nio.channels.Selector ;
+import org.apache.avalon.framework.logger.Logger ;
import org.apache.avalon.framework.activity.Startable ;
import org.apache.avalon.framework.service.Serviceable ;
import org.apache.avalon.framework.activity.Initializable ;
import org.apache.avalon.framework.service.ServiceManager ;
import org.apache.avalon.framework.service.ServiceException ;
import org.apache.avalon.framework.logger.AbstractLogEnabled ;
-import org.apache.avalon.framework.logger.Logger;
import org.apache.avalon.cornerstone.services.threads.ThreadManager ;
import org.apache.eve.buffer.BufferPool ;
import org.apache.eve.event.EventRouter ;
-import org.apache.eve.event.ConnectEvent ;
-import org.apache.eve.event.DisconnectEvent ;
/**
@@ -67,40 +63,6 @@
private AvalonInputManagerMonitor m_monitor = null ;
- // ------------------------------------------------------------------------
- // Listener Interfaces
- // ------------------------------------------------------------------------
-
-
- /*
- * @see org.apache.eve.event.ConnectListener#
- * connectPerformed(org.apache.eve.event.ConnectEvent)
- */
- public void inform( ConnectEvent an_event )
- {
- m_delegate.inform( an_event ) ;
- }
-
-
- /*
- * @see org.apache.eve.event.DisconnectListener#
- * inform(org.apache.eve.event.DisconnectEvent)
- */
- public void inform( DisconnectEvent an_event )
- {
- m_delegate.inform( an_event ) ;
- }
-
-
- /*
- * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
- */
- public void inform( EventObject an_event )
- {
- m_delegate.inform( an_event ) ;
- }
-
-
// ------------------------------------------------------------------------
// Life Cycle Methods
// ------------------------------------------------------------------------
Modified: incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java Thu Feb 19 00:00:08 2004
@@ -18,7 +18,7 @@
import java.util.Iterator ;
-import java.util.EventObject ;
+import java.util.ArrayList ;
import java.io.IOException ;
import java.nio.ByteBuffer ;
@@ -33,6 +33,9 @@
import org.apache.eve.listener.ClientKey ;
import org.apache.eve.event.ConnectEvent ;
import org.apache.eve.event.DisconnectEvent ;
+import org.apache.eve.event.ConnectSubscriber ;
+import org.apache.eve.event.AbstractSubscriber ;
+import org.apache.eve.event.DisconnectSubscriber ;
import org.apache.eve.listener.KeyExpiryException ;
@@ -43,7 +46,8 @@
* Apache Directory Project</a>
* @version $Rev: 1452 $
*/
-public class DefaultInputManager implements InputManager
+public class DefaultInputManager extends AbstractSubscriber
+ implements InputManager, ConnectSubscriber, DisconnectSubscriber
{
/** the thread driving this Runnable */
private Thread m_thread = null ;
@@ -55,6 +59,10 @@
private EventRouter m_router = null ;
/** selector used to select a ready socket channel */
private Selector m_selector = null ;
+ /** contains the batch of new connect events and channels to register */
+ private final ArrayList m_connectEvents = new ArrayList() ;
+ /** contains the batch of disconnect events & selection keys to cancel */
+ private final ArrayList m_disconnectEvents = new ArrayList() ;
/** the input manager's monitor */
private InputManagerMonitor m_monitor = new InputManagerMonitorAdapter() ;
@@ -74,9 +82,12 @@
throws IOException
{
m_bp = a_bp ;
- m_router = a_router ;
m_hasStarted = new Boolean( false ) ;
m_selector = Selector.open() ;
+
+ m_router = a_router ;
+ m_router.subscribe( ConnectEvent.class, null, this ) ;
+ m_router.subscribe( DisconnectEvent.class, null, this ) ;
}
@@ -107,6 +118,8 @@
try
{
m_monitor.enteringSelect( m_selector ) ;
+ maintainConnections() ;
+
if ( 0 == ( l_count = m_selector.select() ) )
{
m_monitor.selectTimedOut( m_selector ) ;
@@ -173,33 +186,12 @@
*/
public void inform( ConnectEvent an_event )
{
- ClientKey l_key = null ;
- SocketChannel l_channel = null ;
-
- try
- {
- l_key = an_event.getClientKey() ;
- l_channel = l_key.getSocket().getChannel() ;
-
- // hands-off blocking sockets!
- if ( null == l_channel )
- {
- return ;
- }
-
- l_channel.configureBlocking( false ) ;
- l_channel.register( m_selector, SelectionKey.OP_READ, l_key ) ;
- m_monitor.registeredChannel( l_key, m_selector ) ;
- }
- catch ( KeyExpiryException e )
+ synchronized ( m_connectEvents )
{
- m_monitor.keyExpiryFailure( l_key, e ) ;
- }
- catch ( IOException e )
- {
- m_monitor.channelRegistrationFailure( m_selector, l_channel,
- SelectionKey.OP_READ, e ) ;
+ m_connectEvents.add( an_event ) ;
}
+
+ m_selector.wakeup() ;
}
@@ -209,65 +201,139 @@
*/
public void inform( DisconnectEvent an_event )
{
- SelectionKey l_key = null ;
- Iterator l_keys = m_selector.keys().iterator() ;
-
- while ( l_keys.hasNext() )
- {
- l_key = ( SelectionKey ) l_keys.next() ;
- if ( l_key.attachment().equals( an_event.getClientKey() ) )
- {
- break ;
- }
- }
-
- if ( null == l_key )
+ synchronized ( m_disconnectEvents )
{
- return ;
+ m_connectEvents.add( an_event ) ;
}
- try
- {
- l_key.channel().close() ;
- }
- catch ( IOException e )
- {
- m_monitor.channelCloseFailure(
- ( SocketChannel ) l_key.channel(), e ) ;
- }
-
- l_key.cancel() ;
- m_monitor.disconnectedClient( an_event.getClientKey() ) ;
+ m_selector.wakeup() ;
}
+ // ------------------------------------------------------------------------
+ // private utilities
+ // ------------------------------------------------------------------------
+
+
/**
- * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
+ * Maintains connections by registering newly established connections within
+ * ConnectEvents and cancelling the selection keys of dropped connections.
+ *
+ * @see created in response to a <a href=
+ * "http://nagoya.apache.org/jira/secure/ViewIssue.jspa?id=13574">JIRA Issue
+ * </a>
*/
- public void inform( EventObject an_event )
+ private void maintainConnections()
{
- Class l_clazz = an_event.getClass() ;
+ /* Register New Connections
+ * ========================
+ *
+ * Here we perform a synchronized transfer of newly arrived events
+ * which are batched in the list of ConnectEvents. This is done to
+ * minimize the chances of contention. Next we cycle through each
+ * event registering the new connection's channel with the selector.
+ */
- if ( l_clazz.isAssignableFrom( ConnectEvent.class ) )
- {
- inform( ( ConnectEvent ) an_event ) ;
+ // copy all events into a separate list first and clear
+ ConnectEvent[] l_connectEvents = null ;
+ synchronized( m_connectEvents )
+ {
+ l_connectEvents = new ConnectEvent[m_connectEvents.size()] ;
+ l_connectEvents = ( ConnectEvent[] )
+ m_connectEvents.toArray( l_connectEvents ) ;
+ m_connectEvents.clear() ;
+ }
+
+ // cycle through connections and register them with the selector
+ for ( int ii = 0; ii < l_connectEvents.length ; ii++ )
+ {
+ ClientKey l_key = null ;
+ SocketChannel l_channel = null ;
+
+ try
+ {
+ l_key = l_connectEvents[ii].getClientKey() ;
+ l_channel = l_key.getSocket().getChannel() ;
+
+ // hands-off blocking sockets!
+ if ( null == l_channel )
+ {
+ continue ;
+ }
+
+ l_channel.configureBlocking( false ) ;
+ l_channel.register( m_selector, SelectionKey.OP_READ, l_key ) ;
+ m_monitor.registeredChannel( l_key, m_selector ) ;
+ }
+ catch ( KeyExpiryException e )
+ {
+ m_monitor.keyExpiryFailure( l_key, e ) ;
+ }
+ catch ( IOException e )
+ {
+ m_monitor.channelRegistrationFailure( m_selector, l_channel,
+ SelectionKey.OP_READ, e ) ;
+ }
+ }
+
+
+ /* Cancel/Unregister Dropped Connections
+ * =====================================
+ *
+ * To do this we simply cancel the selection key for the client the
+ * disconnect event is associated with.
+ */
+
+ // copy all events into a separate list first and clear
+ DisconnectEvent[] l_disconnectEvents = null ;
+ synchronized( m_disconnectEvents )
+ {
+ l_disconnectEvents = new DisconnectEvent[m_disconnectEvents.size()] ;
+ l_disconnectEvents = ( DisconnectEvent[] )
+ m_disconnectEvents.toArray( l_disconnectEvents ) ;
+ m_disconnectEvents.clear() ;
}
- else if ( l_clazz.isAssignableFrom( DisconnectEvent.class ) ) ;
+
+ SelectionKey l_key = null ;
+ for ( int ii = 0; ii < l_disconnectEvents.length; ii++ )
{
- inform( ( DisconnectEvent ) an_event ) ;
+ Iterator l_keys = m_selector.keys().iterator() ;
+ ClientKey l_clientKey = l_disconnectEvents[ii].getClientKey() ;
+
+ while ( l_keys.hasNext() )
+ {
+ l_key = ( SelectionKey ) l_keys.next() ;
+ if ( l_key.attachment().equals( l_clientKey ) )
+ {
+ break ;
+ }
+ }
+
+ if ( null == l_key )
+ {
+ return ;
+ }
+
+ try
+ {
+ l_key.channel().close() ;
+ }
+ catch ( IOException e )
+ {
+ m_monitor.channelCloseFailure(
+ ( SocketChannel ) l_key.channel(), e ) ;
+ }
+
+ l_key.cancel() ;
+ m_monitor.disconnectedClient( l_clientKey ) ;
}
}
- // ------------------------------------------------------------------------
- // private utilities
- // ------------------------------------------------------------------------
-
-
/**
* Processes input on channels of the read ready selected keys.
*/
- void processInput()
+ private void processInput()
{
/*
* Process the selectors that are ready. For each selector that
Modified: incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java Thu Feb 19 00:00:08 2004
@@ -50,10 +50,6 @@
package org.apache.eve.input ;
-import org.apache.eve.event.ConnectSubscriber ;
-import org.apache.eve.event.DisconnectSubscriber ;
-
-
/**
* Service interface for server modules that monitor incomming PDU requests on
* a client's inputs.
@@ -63,7 +59,6 @@
* @version $Rev$
*/
public interface InputManager
- extends ConnectSubscriber, DisconnectSubscriber
{
/** Role played by this service as specified by Avalon */
public static final String ROLE = InputManager.class.getName() ;
Modified: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java (original)
+++ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java Thu Feb 19 00:00:08 2004
@@ -106,10 +106,11 @@
*/
public DefaultListenerManager( EventRouter a_router ) throws IOException
{
- m_router = a_router ;
m_clients = new HashSet() ;
m_selector = Selector.open() ;
m_hasStarted = new Boolean( false ) ;
+
+ m_router = a_router ;
m_router.subscribe( DisconnectEvent.class, null, this ) ;
}
@@ -150,6 +151,7 @@
a_listener.getPort() ) ;
l_channel.socket().bind( l_address, a_listener.getBacklog() ) ;
l_channel.configureBlocking( false ) ;
+ m_selector.wakeup() ;
l_channel.register( m_selector, SelectionKey.OP_ACCEPT,
a_listener ) ;
}