You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by ak...@apache.org on 2004/02/19 09:00:11 UTC

svn commit: rev 6770 - in incubator/directory/eve/trunk/eve/frontend: common/api/src/java/org/apache/eve/event event/pojo-impl/src/java/org/apache/eve/event input/merlin-impl/src/java/org/apache/eve/input input/pojo-impl/src/java/org/apache/eve/input input/spi/src/java/org/apache/eve/input listener/pojo-impl/src/java/org/apache/eve/listener

Author: akarasulu
Date: Thu Feb 19 00:00:08 2004
New Revision: 6770

Added:
   incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/AbstractSubscriber.java
   incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/SubscriberMonitor.java
Modified:
   incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java
   incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java
   incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java
   incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java
   incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java
   incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
Log:
Added some extra subscriber functionality to the common api.  Also 
moved out interfaces for pub/sub functionality to the implementation
rather than having the service interface extend them.


Added: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/AbstractSubscriber.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/AbstractSubscriber.java	Thu Feb 19 00:00:08 2004
@@ -0,0 +1,92 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.eve.event ;
+
+
+import java.util.EventObject ;
+import java.lang.reflect.Method ;
+
+
+/**
+ * An abstract Subscriber that calls the provided type-specific inform method
+ * of Subscriber sub-interface.  This way their is no need to downcast the 
+ * event.  Reflection is used by the abstract subscriber's inform method to 
+ * determine at run time which inform method of a concrete subscriber to invoke.
+ * 
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class AbstractSubscriber implements Subscriber
+{
+    /** monitor for this Subscriber */
+    private final SubscriberMonitor m_monitor ;
+    
+    
+    /**
+     * Creates a Subscriber that does not monitor failures to inform.
+     */
+    public AbstractSubscriber()
+    {
+        m_monitor = null ;
+    }
+    
+    
+    /**
+     * Creates a Subscriber that does monitor failures on inform.
+     * 
+     * @param a_monitor the monitor to use on failures
+     */
+    public AbstractSubscriber( SubscriberMonitor a_monitor )
+    {
+        m_monitor = a_monitor ;
+    }
+    
+    
+    /* (non-Javadoc)
+     * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
+     */
+    public void inform( EventObject a_event )
+    {
+        if ( a_event == null )
+        {    
+            return ;
+        }
+        
+        Method l_method = null ;
+        Class l_paramTypes[] = new Class[1] ;
+        l_paramTypes[0] = a_event.getClass() ;
+        
+        try { 
+          /* 
+           * Look for an inform method in the current object that takes the 
+           * event subtype as a parameter
+           */ 
+          l_method = getClass().getDeclaredMethod( "inform", l_paramTypes ) ; 
+          Object l_paramList[] = new Object[1] ; 
+          l_paramList[0] = a_event ; 
+          l_method.invoke( this, l_paramList ) ; 
+        }
+        catch ( Throwable t )
+        {
+            if ( m_monitor != null )
+            {
+                m_monitor.failedOnInform( this, a_event, t ) ;
+            }
+        }
+    }
+}

Added: incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/SubscriberMonitor.java
==============================================================================
--- (empty file)
+++ incubator/directory/eve/trunk/eve/frontend/common/api/src/java/org/apache/eve/event/SubscriberMonitor.java	Thu Feb 19 00:00:08 2004
@@ -0,0 +1,41 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.eve.event ;
+
+
+import java.util.EventObject ;
+
+
+/**
+ * Monitors noteworthy Subscriber activities.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">
+ * Apache Directory Project</a>
+ * @version $Rev$
+ */
+public interface SubscriberMonitor
+{
+    /**
+     * Monitors failures occuring while handling events.
+     * 
+     * @param a_subscriber the Subscriber that failed on inform
+     * @param a_eventObject the EventObject fired
+     * @param a_throwable the resulting failure exception if any
+     */
+    void failedOnInform( Subscriber a_subscriber, EventObject a_eventObject, 
+                         Throwable a_throwable ) ;
+}

Modified: incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java	(original)
+++ incubator/directory/eve/trunk/eve/frontend/event/pojo-impl/src/java/org/apache/eve/event/DefaultEventRouter.java	Thu Feb 19 00:00:08 2004
@@ -195,8 +195,11 @@
                 continue ;
             }
             
-            if ( l_subscriptions[ii].getFilter() != null && 
-                 l_subscriptions[ii].getFilter().apply( a_event ) )  
+            if ( l_subscriptions[ii].getFilter() == null )
+            {
+                l_subscriptions[ii].getSubscriber().inform( a_event ) ;
+            }
+            else if ( l_subscriptions[ii].getFilter().apply( a_event ) )  
             {
                 l_subscriptions[ii].getSubscriber().inform( a_event ) ;
             }

Modified: incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java	(original)
+++ incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/AvalonInputManagerMonitor.java	Thu Feb 19 00:00:08 2004
@@ -227,8 +227,13 @@
     }
     
     
+    /*
+     * Generates a hex string for a buffer.
+     */
     public String toHexString( ByteBuffer a_buf )
     {
-        return new String ( a_buf.array() ) ;
+        byte[] l_bites = new byte[a_buf.remaining()] ;
+        a_buf.get( l_bites ) ;
+        return new String ( l_bites ) ;
     }
 }

Modified: incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java	(original)
+++ incubator/directory/eve/trunk/eve/frontend/input/merlin-impl/src/java/org/apache/eve/input/MerlinInputManager.java	Thu Feb 19 00:00:08 2004
@@ -17,23 +17,19 @@
 package org.apache.eve.input ;
 
 
-import java.util.EventObject ;
-
 import java.nio.channels.Selector ;
 
+import org.apache.avalon.framework.logger.Logger ;
 import org.apache.avalon.framework.activity.Startable ;
 import org.apache.avalon.framework.service.Serviceable ;
 import org.apache.avalon.framework.activity.Initializable ;
 import org.apache.avalon.framework.service.ServiceManager ;
 import org.apache.avalon.framework.service.ServiceException ;
 import org.apache.avalon.framework.logger.AbstractLogEnabled ;
-import org.apache.avalon.framework.logger.Logger;
 import org.apache.avalon.cornerstone.services.threads.ThreadManager ;
 
 import org.apache.eve.buffer.BufferPool ;
 import org.apache.eve.event.EventRouter ;
-import org.apache.eve.event.ConnectEvent ;
-import org.apache.eve.event.DisconnectEvent ;
 
 
 /**
@@ -67,40 +63,6 @@
     private AvalonInputManagerMonitor m_monitor = null ;
     
     
-    // ------------------------------------------------------------------------
-    // Listener Interfaces
-    // ------------------------------------------------------------------------
-    
-    
-    /*
-     * @see org.apache.eve.event.ConnectListener#
-     * connectPerformed(org.apache.eve.event.ConnectEvent)
-     */
-    public void inform( ConnectEvent an_event )
-    {
-        m_delegate.inform( an_event ) ;
-    }
-
-    
-    /*
-     * @see org.apache.eve.event.DisconnectListener#
-     * inform(org.apache.eve.event.DisconnectEvent)
-     */
-    public void inform( DisconnectEvent an_event )
-    {
-        m_delegate.inform( an_event ) ;
-    }
-    
-    
-    /*
-     * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
-     */
-    public void inform( EventObject an_event )
-    {
-        m_delegate.inform( an_event ) ;
-    }
-    
-
     // ------------------------------------------------------------------------
     // Life Cycle Methods
     // ------------------------------------------------------------------------

Modified: incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java	(original)
+++ incubator/directory/eve/trunk/eve/frontend/input/pojo-impl/src/java/org/apache/eve/input/DefaultInputManager.java	Thu Feb 19 00:00:08 2004
@@ -18,7 +18,7 @@
 
 
 import java.util.Iterator ;
-import java.util.EventObject ;
+import java.util.ArrayList ;
 
 import java.io.IOException ;
 import java.nio.ByteBuffer ;
@@ -33,6 +33,9 @@
 import org.apache.eve.listener.ClientKey ;
 import org.apache.eve.event.ConnectEvent ;
 import org.apache.eve.event.DisconnectEvent ;
+import org.apache.eve.event.ConnectSubscriber ;
+import org.apache.eve.event.AbstractSubscriber ;
+import org.apache.eve.event.DisconnectSubscriber ;
 import org.apache.eve.listener.KeyExpiryException ;
 
 
@@ -43,7 +46,8 @@
  * Apache Directory Project</a>
  * @version $Rev: 1452 $
  */
-public class DefaultInputManager implements InputManager
+public class DefaultInputManager extends AbstractSubscriber
+    implements InputManager, ConnectSubscriber, DisconnectSubscriber
 {
     /** the thread driving this Runnable */ 
     private Thread m_thread = null ;
@@ -55,6 +59,10 @@
     private EventRouter m_router = null ;
     /** selector used to select a ready socket channel */
     private Selector m_selector = null ;
+    /** contains the batch of new connect events and channels to register */
+    private final ArrayList m_connectEvents = new ArrayList() ;
+    /** contains the batch of disconnect events & selection keys to cancel */
+    private final ArrayList m_disconnectEvents = new ArrayList() ;
     /** the input manager's monitor */
     private InputManagerMonitor m_monitor = new InputManagerMonitorAdapter() ;
 
@@ -74,9 +82,12 @@
         throws IOException
     {
         m_bp = a_bp ;
-        m_router = a_router ;
         m_hasStarted = new Boolean( false ) ;
         m_selector = Selector.open() ;
+
+        m_router = a_router ;
+        m_router.subscribe( ConnectEvent.class, null, this ) ;
+        m_router.subscribe( DisconnectEvent.class, null, this ) ;
     }
     
 
@@ -107,6 +118,8 @@
                 try
                 {
                     m_monitor.enteringSelect( m_selector ) ;
+                    maintainConnections() ;
+                    
                     if ( 0 == ( l_count = m_selector.select() ) )
                     {
                         m_monitor.selectTimedOut( m_selector ) ;
@@ -173,33 +186,12 @@
      */
     public void inform( ConnectEvent an_event )
     {
-        ClientKey l_key = null ;
-        SocketChannel l_channel = null ;
-        
-        try
-        {
-            l_key = an_event.getClientKey() ;
-            l_channel = l_key.getSocket().getChannel() ;
-            
-            // hands-off blocking sockets!
-            if ( null == l_channel )
-            {
-                return ;
-            }
-            
-            l_channel.configureBlocking( false ) ;
-            l_channel.register( m_selector, SelectionKey.OP_READ, l_key ) ;
-            m_monitor.registeredChannel( l_key, m_selector ) ;
-        }
-        catch ( KeyExpiryException e )
+        synchronized ( m_connectEvents )
         {
-            m_monitor.keyExpiryFailure( l_key, e ) ;
-        }
-        catch ( IOException e )
-        {
-            m_monitor.channelRegistrationFailure( m_selector, l_channel, 
-                    SelectionKey.OP_READ, e ) ;
+            m_connectEvents.add( an_event ) ;
         }
+        
+        m_selector.wakeup() ;
     }
 
     
@@ -209,65 +201,139 @@
      */
     public void inform( DisconnectEvent an_event )
     {
-        SelectionKey l_key = null ;
-        Iterator l_keys = m_selector.keys().iterator() ;
-        
-        while ( l_keys.hasNext() )
-        {
-            l_key = ( SelectionKey ) l_keys.next() ;
-            if ( l_key.attachment().equals( an_event.getClientKey() ) )
-            {
-                break ;
-            }
-        }
-
-        if ( null == l_key )
+        synchronized ( m_disconnectEvents )
         {
-            return ;
+            m_connectEvents.add( an_event ) ;
         }
         
-        try
-        {
-            l_key.channel().close() ;
-        }
-        catch ( IOException e )
-        {
-            m_monitor.channelCloseFailure( 
-                    ( SocketChannel ) l_key.channel(), e ) ;
-        }
-        
-        l_key.cancel() ;
-        m_monitor.disconnectedClient( an_event.getClientKey() ) ;
+        m_selector.wakeup() ;
     }
     
 
+    // ------------------------------------------------------------------------
+    // private utilities
+    // ------------------------------------------------------------------------
+    
+    
     /**
-     * @see org.apache.eve.event.Subscriber#inform(java.util.EventObject)
+     * Maintains connections by registering newly established connections within
+     * ConnectEvents and cancelling the selection keys of dropped connections.
+     * 
+     * @see created in response to a <a href=
+     * "http://nagoya.apache.org/jira/secure/ViewIssue.jspa?id=13574">JIRA Issue
+     * </a>
      */
-    public void inform( EventObject an_event )
+    private void maintainConnections()
     {
-        Class l_clazz = an_event.getClass() ;
+        /* Register New Connections 
+         * ========================
+         * 
+         * Here we perform a synchronized transfer of newly arrived events 
+         * which are batched in the list of ConnectEvents.  This is done to
+         * minimize the chances of contention.  Next we cycle through each
+         * event registering the new connection's channel with the selector.
+         */
         
-        if ( l_clazz.isAssignableFrom( ConnectEvent.class ) )
-        {
-            inform( ( ConnectEvent ) an_event ) ;
+        // copy all events into a separate list first and clear
+        ConnectEvent[] l_connectEvents = null ;
+        synchronized( m_connectEvents ) 
+        {
+            l_connectEvents = new ConnectEvent[m_connectEvents.size()] ;
+            l_connectEvents = ( ConnectEvent[] ) 
+                m_connectEvents.toArray( l_connectEvents ) ;
+            m_connectEvents.clear() ;
+        }
+
+        // cycle through connections and register them with the selector
+        for ( int ii = 0; ii < l_connectEvents.length ; ii++ )
+        {    
+            ClientKey l_key = null ;
+            SocketChannel l_channel = null ;
+            
+            try
+            {
+                l_key = l_connectEvents[ii].getClientKey() ;
+                l_channel = l_key.getSocket().getChannel() ;
+                
+                // hands-off blocking sockets!
+                if ( null == l_channel )
+                {
+                    continue ;
+                }
+                
+                l_channel.configureBlocking( false ) ;
+                l_channel.register( m_selector, SelectionKey.OP_READ, l_key ) ;
+                m_monitor.registeredChannel( l_key, m_selector ) ;
+            }
+            catch ( KeyExpiryException e )
+            {
+                m_monitor.keyExpiryFailure( l_key, e ) ;
+            }
+            catch ( IOException e )
+            {
+                m_monitor.channelRegistrationFailure( m_selector, l_channel, 
+                        SelectionKey.OP_READ, e ) ;
+            }
+        }
+        
+        
+        /* Cancel/Unregister Dropped Connections 
+         * =====================================
+         *
+         * To do this we simply cancel the selection key for the client the
+         * disconnect event is associated with.  
+         */
+        
+        // copy all events into a separate list first and clear
+        DisconnectEvent[] l_disconnectEvents = null ;
+        synchronized( m_disconnectEvents ) 
+        {
+            l_disconnectEvents = new DisconnectEvent[m_disconnectEvents.size()] ;
+            l_disconnectEvents = ( DisconnectEvent[] ) 
+                m_disconnectEvents.toArray( l_disconnectEvents ) ;
+            m_disconnectEvents.clear() ;
         }
-        else if ( l_clazz.isAssignableFrom( DisconnectEvent.class ) ) ;
+
+        SelectionKey l_key = null ;
+        for ( int ii = 0; ii < l_disconnectEvents.length; ii++ )
         {
-            inform( ( DisconnectEvent ) an_event ) ;
+            Iterator l_keys = m_selector.keys().iterator() ;
+            ClientKey l_clientKey = l_disconnectEvents[ii].getClientKey() ;
+
+            while ( l_keys.hasNext() )
+            {
+                l_key = ( SelectionKey ) l_keys.next() ;
+                if ( l_key.attachment().equals( l_clientKey ) )
+                {
+                    break ;
+                }
+            }
+
+            if ( null == l_key )
+            {
+                return ;
+            }
+            
+            try
+            {
+                l_key.channel().close() ;
+            }
+            catch ( IOException e )
+            {
+                m_monitor.channelCloseFailure( 
+                        ( SocketChannel ) l_key.channel(), e ) ;
+            }
+        
+            l_key.cancel() ;
+            m_monitor.disconnectedClient( l_clientKey ) ;
         }
     }
     
     
-    // ------------------------------------------------------------------------
-    // private utilities
-    // ------------------------------------------------------------------------
-    
-    
     /**
      * Processes input on channels of the read ready selected keys.
      */
-    void processInput()
+    private void processInput()
     {
         /*
          * Process the selectors that are ready.  For each selector that

Modified: incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java	(original)
+++ incubator/directory/eve/trunk/eve/frontend/input/spi/src/java/org/apache/eve/input/InputManager.java	Thu Feb 19 00:00:08 2004
@@ -50,10 +50,6 @@
 package org.apache.eve.input ;
 
 
-import org.apache.eve.event.ConnectSubscriber ;
-import org.apache.eve.event.DisconnectSubscriber ;
-
-
 /**
  * Service interface for server modules that monitor incomming PDU requests on
  * a client's inputs.
@@ -63,7 +59,6 @@
  * @version $Rev$
  */
 public interface InputManager
-    extends ConnectSubscriber, DisconnectSubscriber
 {
     /** Role played by this service as specified by Avalon */
     public static final String ROLE = InputManager.class.getName() ;

Modified: incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java
==============================================================================
--- incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java	(original)
+++ incubator/directory/eve/trunk/eve/frontend/listener/pojo-impl/src/java/org/apache/eve/listener/DefaultListenerManager.java	Thu Feb 19 00:00:08 2004
@@ -106,10 +106,11 @@
      */
     public DefaultListenerManager( EventRouter a_router ) throws IOException
     {
-        m_router = a_router ;
         m_clients = new HashSet() ;
         m_selector = Selector.open() ;
         m_hasStarted = new Boolean( false ) ;
+
+        m_router = a_router ;
         m_router.subscribe( DisconnectEvent.class, null, this ) ;
     }
     
@@ -150,6 +151,7 @@
                     a_listener.getPort() ) ;
             l_channel.socket().bind( l_address, a_listener.getBacklog() ) ;
             l_channel.configureBlocking( false ) ;
+            m_selector.wakeup() ;
             l_channel.register( m_selector, SelectionKey.OP_ACCEPT, 
                     a_listener ) ;
         }