You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/10/23 02:27:54 UTC

svn commit: rev 55337 - in incubator/directory/seda/trunk/src/java/org/apache/seda: decoder encoder input listener output protocol

Author: trustin
Date: Fri Oct 22 17:27:52 2004
New Revision: 55337

Added:
   incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPClientKey.java   (contents, props changed)
   incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPClientKey.java   (contents, props changed)
   incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPListenerManager.java   (contents, props changed)
Modified:
   incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java
   incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
Log:
* Splitted ClientKey to TCPClientKey and UDPClientKey
* ClientKey is an abstract class now.
* ClientKey.getClientAddress() is changed to ClientKey.getRemoteAddress().
* Added ClientKey.getLocalAddress()
* Implemented UDPListenerManager

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java	Fri Oct 22 17:27:52 2004
@@ -166,7 +166,7 @@
     private StatefulDecoder createDecoder( ClientKey key )
         throws KeyExpiryException
     {
-        String proto = inetdb.getProtoByPort( key.getSocket().getLocalPort() );
+        String proto = inetdb.getProtoByPort( key.getLocalAddress().getPort() );
         DecoderFactory factory = ( DecoderFactory ) factories.get( proto );
         return factory.createDecoder();
     }

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java	Fri Oct 22 17:27:52 2004
@@ -140,7 +140,7 @@
     private StatefulEncoder createEncoder( ClientKey key )
         throws KeyExpiryException
     {
-        String proto = inetdb.getProtoByPort( key.getSocket().getLocalPort() );
+        String proto = inetdb.getProtoByPort( key.getLocalAddress().getPort() );
         EncoderFactory factory = ( EncoderFactory ) factories.get( proto );
         return factory.createEncoder();
     }
@@ -193,7 +193,7 @@
 
         try
         {
-            port = key.getSocket().getLocalPort();
+            port = key.getLocalAddress().getPort();
         }
         catch ( KeyExpiryException e )
         {

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java	Fri Oct 22 17:27:52 2004
@@ -17,26 +17,26 @@
 package org.apache.seda.input;
 
 
-import java.util.Iterator;
-import java.util.ArrayList;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.Selector;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
 
-import org.apache.seda.event.InputEvent;
 import org.apache.seda.ResourceException;
 import org.apache.seda.buffer.BufferPool;
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.listener.ClientKey;
+import org.apache.seda.event.AbstractSubscriber;
 import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.DisconnectEvent;
 import org.apache.seda.event.ConnectSubscriber;
-import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.DisconnectEvent;
 import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.InputEvent;
+import org.apache.seda.listener.ClientKey;
 import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.listener.TCPClientKey;
 
 
 /**
@@ -257,12 +257,12 @@
         // cycle through connections and register them with the selector
         for ( int ii = 0; ii < l_events.length; ii++ )
         {    
-            ClientKey l_key = null;
+            TCPClientKey l_key = null;
             SocketChannel l_channel = null;
             
             try
             {
-                l_key = l_events[ii].getClientKey();
+                l_key = (TCPClientKey) l_events[ii].getClientKey();
                 l_channel = l_key.getSocket().getChannel();
                 
                 // hands-off blocking sockets!

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java	Fri Oct 22 17:27:52 2004
@@ -17,9 +17,8 @@
 package org.apache.seda.listener;
 
 
-import java.net.Socket;
-
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 
 /**
@@ -41,7 +40,7 @@
  * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
  * @version $Rev$
  */
-public final class ClientKey
+public abstract class ClientKey
 {
     // ----------------------------------------------
     // Private members.
@@ -53,11 +52,6 @@
     private final Object outputLock = new Object();
     /** Unique key or client id */
     private final String clientId;
-    /** Socket connection to client */
-    private final Socket socket;
-    
-    /** Whether or not this key has expired: the client has disconnected. */
-    private boolean hasExpired = false;
 
 
     // ----------------------------------------------
@@ -76,23 +70,11 @@
       *
       * This makes the key unique at any single point in time.
       *
-      * @param a_socket newly established client socket connection to the
-      * server.
+      * @param clientId the id of this client key
       */
-    ClientKey( final Socket a_socket )
+    ClientKey( String clientId )
     {
-        // build the key ...
-        StringBuffer l_buf = new StringBuffer();
-        l_buf.append( a_socket.getLocalAddress().getHostAddress() );
-        l_buf.append( ':' );
-        l_buf.append( a_socket.getLocalPort() ).append( "<-" );
-        l_buf.append( a_socket.getInetAddress().getHostAddress() );
-        l_buf.append( ':' );
-        l_buf.append( a_socket.getPort() );
-        
-        // set finals ...
-        clientId = l_buf.toString();
-        socket = a_socket;
+        this.clientId = clientId;
     }
 
 
@@ -110,49 +92,28 @@
      * than depending on developers to maintain a convention of checking for
      * key expiration before use in other modules.
      */
-    public String getClientId() throws KeyExpiryException
+    public final String getClientId() throws KeyExpiryException
     {
         checkExpiry();
         return clientId;
     }
     
-    
     /**
-     * Gets the clients socket connection.
+     * Gets the local socket address.
      * 
-     * @return the client's socket connection
-     */
-    public Socket getSocket() throws KeyExpiryException
-    {
-        checkExpiry();
-        return socket;
-    }
-
-
-    /**
-     * Gets the client's IP address.
-     *
-     * @return the client's ip address.
+     * @return the local socket address
      * @throws KeyExpiryException to force the handling of expired keys
      */
-    public String getClientAddress() throws KeyExpiryException
-    {
-        checkExpiry();
-        return socket.getInetAddress().getHostAddress();
-    }
-
-
+    public abstract InetSocketAddress getLocalAddress() throws KeyExpiryException;
+    
+    
     /**
-     * Gets the client's hostname.
+     * Gets the client's socket address.
      *
-     * @return the client's hostname.
+     * @return the client's socket address.
      * @throws KeyExpiryException to force the handling of expired keys
      */
-    public String getClientHost() throws KeyExpiryException
-    {
-        checkExpiry();
-        return socket.getInetAddress().getHostName();
-    }
+    public abstract InetSocketAddress getRemoteAddress() throws KeyExpiryException;
 
 
     // ----------------------------------------------
@@ -166,7 +127,7 @@
      * @return ouput lock object.
      * @throws KeyExpiryException to force the handling of expired keys
      */
-    public Object getOutputLock() throws KeyExpiryException
+    public final Object getOutputLock() throws KeyExpiryException
     {
         checkExpiry();
         return outputLock;
@@ -179,7 +140,7 @@
      * @return input lock object.
      * @throws KeyExpiryException to force the handling of expired keys
      */
-    public Object getInputLock() throws KeyExpiryException
+    public final Object getInputLock() throws KeyExpiryException
     {
         checkExpiry();
         return inputLock;
@@ -199,10 +160,7 @@
      * @return true if the client is no longer connected to the server, false
      * if the client is connected.
      */
-    public boolean hasExpired()
-    {
-        return hasExpired;
-    }
+    public abstract boolean hasExpired();
 
 
     /**
@@ -211,15 +169,7 @@
      * only allow access by the ClientModule.  Tries to close socket if it is
      * still open.
      */
-    void expire() throws IOException
-    {
-        hasExpired = true;
-        
-        if ( null != socket )
-        {
-            socket.close();
-        }
-    }
+    protected abstract void expire() throws IOException;
 
 
     /**
@@ -234,13 +184,7 @@
      * than depending on developers to maintain a convention of checking for
      * key expiration before use in other modules.
      */
-    void checkExpiry() throws KeyExpiryException
-    {
-        if( hasExpired )
-        {
-            throw new KeyExpiryException( this );
-        }
-    }
+    protected abstract void checkExpiry() throws KeyExpiryException;
 
 
     // ----------------------------------------------
@@ -253,7 +197,7 @@
      *
      * @return the client id string.
      */
-    public String toString()
+    public final String toString()
     {
         return clientId;
     }
@@ -265,7 +209,7 @@
      *
      * @return the clientId hashCode value.
      */
-    public int hashCode()
+    public final int hashCode()
     {
         return clientId.hashCode();
     }
@@ -281,7 +225,7 @@
      *
      * @return true if an_obj equals this ClientKey, false otherwise.
      */
-    public boolean equals( Object an_obj )
+    public final boolean equals( Object an_obj )
     {
         if( this == an_obj ) 
         {

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPClientKey.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPClientKey.java	Fri Oct 22 17:27:52 2004
@@ -0,0 +1,178 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.seda.listener;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+
+/**
+ * Every client that successfully binds anonymously or with a valid identity
+ * has a unique client key represented by this class.  The key uniquely 
+ * identifies the client based on the connection parameters: interface and port 
+ * used on the server as well as the interface and port used by the client.
+ * <p>
+ * The ClientKey plays a central role in coordinating activities with the
+ * server across various threads.  Threads within the same stage or across
+ * stages are synchronized on client resources using lock objects held by a
+ * ClientKey instance.  Socket IO is managed using a pair of lock objects
+ * specificially for this purpose.
+ * </p>
+ * 
+ * @todo do we really need these lock objects?
+ * @todo why are we carrying around the damn socket?
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public final class TCPClientKey extends ClientKey
+{
+    // ----------------------------------------------
+    // Private members.
+    // ----------------------------------------------
+
+    /** Socket connection to client */
+    private final Socket socket;
+    
+    /** Whether or not this key has expired: the client has disconnected. */
+    private boolean hasExpired = false;
+
+
+    // ----------------------------------------------
+    // Constructors
+    // ----------------------------------------------
+
+
+    /**
+      * Generates a unique connection/client identifier String for a client
+      * socket connection.  The key is composed of the local server address
+      * and port attached to the remote client address and port.  If the
+      * server ip and port are 192.168.1.1:1389 and the client's ip and port are
+      * 34.23.12.1:5678 then the key string would be:
+      *
+      * 192.168.1.1:1389<-34.23.12.1:5678
+      *
+      * This makes the key unique at any single point in time.
+      *
+      * @param a_socket newly established client socket connection to the
+      * server.
+      */
+    TCPClientKey( final Socket a_socket )
+    {
+        super(getClientId(a_socket));
+        socket = a_socket;
+    }
+    
+    private static String getClientId(Socket a_socket) {
+        // build the key ...
+        StringBuffer l_buf = new StringBuffer();
+        l_buf.append( a_socket.getLocalAddress().getHostAddress() );
+        l_buf.append( ':' );
+        l_buf.append( a_socket.getLocalPort() ).append( "<-" );
+        l_buf.append( a_socket.getInetAddress().getHostAddress() );
+        l_buf.append( ':' );
+        l_buf.append( a_socket.getPort() );
+        return l_buf.toString();
+    }
+
+
+    // ----------------------------------------------
+    // Accessors of conn. parameters to client id
+    // ----------------------------------------------
+
+    
+    /**
+     * Gets the clients socket connection.
+     * 
+     * @return the client's socket connection
+     */
+    public Socket getSocket() throws KeyExpiryException
+    {
+        checkExpiry();
+        return socket;
+    }
+    
+    public InetSocketAddress getLocalAddress() throws KeyExpiryException {
+        checkExpiry();
+        return new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+    }
+
+    public InetSocketAddress getRemoteAddress() throws KeyExpiryException
+    {
+        checkExpiry();
+        return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
+    }
+
+
+    // ----------------------------------------------
+    // Key expiration methods.
+    // ----------------------------------------------
+
+
+    /**
+     * Determines if the client represented by this ClientKey is still
+     * connected to the server.  Once disconnected the ClientKey is expired
+     * by the server so processing on behalf of the client does not continue.
+     *
+     * @return true if the client is no longer connected to the server, false
+     * if the client is connected.
+     */
+    public boolean hasExpired()
+    {
+        return hasExpired;
+    }
+
+
+    /**
+     * Expires this key to indicate the disconnection of the client represented
+     * by this key from the server.  It is intentionally package friendly to
+     * only allow access by the ClientModule.  Tries to close socket if it is
+     * still open.
+     */
+    protected void expire() throws IOException
+    {
+        hasExpired = true;
+        
+        if ( null != socket )
+        {
+            socket.close();
+        }
+    }
+
+
+    /**
+     * Utility method to throw key expiration exception if this ClientKey has
+     * expired.  This method is called by most accessor methods within this
+     * class with <code>hasExpired()</code> being the only exception.  The
+     * purpose for this is to force ClientKey using modules to check for
+     * expiration rather rely upon them to check to see if the key is valid
+     * before use everytime.
+     * 
+     * @throws KeyExpiryException to force the handling of expired keys rather
+     * than depending on developers to maintain a convention of checking for
+     * key expiration before use in other modules.
+     */
+    protected void checkExpiry() throws KeyExpiryException
+    {
+        if( hasExpired )
+        {
+            throw new KeyExpiryException( this );
+        }
+    }
+}

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java	Fri Oct 22 17:27:52 2004
@@ -17,8 +17,6 @@
 package org.apache.seda.listener;
 
 
-import org.apache.seda.event.*;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -26,7 +24,19 @@
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.*;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.ProtocolEvent;
+import org.apache.seda.event.ProtocolSubscriber;
 
 
 /**
@@ -337,7 +347,7 @@
                         continue;
                     }
                     
-                    ClientKey clientKey = new ClientKey( channel.socket() );
+                    ClientKey clientKey = new TCPClientKey( channel.socket() );
                     ConnectEvent event = new ConnectEvent( this, clientKey );
                     router.publish( event );
                 }
@@ -420,7 +430,7 @@
                 Iterator list = clients.iterator();
                 while ( list.hasNext() )
                 {
-                    ClientKey key = ( ClientKey ) list.next();
+                    TCPClientKey key = ( TCPClientKey ) list.next();
                         
                     try
                     {

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPClientKey.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPClientKey.java	Fri Oct 22 17:27:52 2004
@@ -0,0 +1,176 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.seda.listener;
+
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+
+
+/**
+ * Every client that successfully binds anonymously or with a valid identity
+ * has a unique client key represented by this class.  The key uniquely 
+ * identifies the client based on the connection parameters: interface and port 
+ * used on the server as well as the interface and port used by the client.
+ * <p>
+ * The ClientKey plays a central role in coordinating activities with the
+ * server across various threads.  Threads within the same stage or across
+ * stages are synchronized on client resources using lock objects held by a
+ * ClientKey instance.  Socket IO is managed using a pair of lock objects
+ * specificially for this purpose.
+ * </p>
+ * 
+ * @todo do we really need these lock objects?
+ * @todo why are we carrying around the damn socket?
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public final class UDPClientKey extends ClientKey
+{
+    // ----------------------------------------------
+    // Private members.
+    // ----------------------------------------------
+
+    /** Socket connection to client */
+    private final DatagramSocket socket;
+    /** Client address */
+    private final InetSocketAddress clientAddress;
+    
+    /** Whether or not this key has expired: the client has disconnected. */
+    private boolean hasExpired = false;
+
+
+    // ----------------------------------------------
+    // Constructors
+    // ----------------------------------------------
+
+
+    /**
+      * Generates a unique connection/client identifier String for a client
+      * socket connection.  The key is composed of the local server address
+      * and port attached to the remote client address and port.  If the
+      * server ip and port are 192.168.1.1:1389 and the client's ip and port are
+      * 34.23.12.1:5678 then the key string would be:
+      *
+      * 192.168.1.1:1389<-34.23.12.1:5678
+      *
+      * This makes the key unique at any single point in time.
+      *
+      * @param a_socket newly established client socket connection to the
+      * server.
+      */
+    UDPClientKey( final DatagramSocket a_socket, final InetSocketAddress clientAddress )
+    {
+        super(getClientId(a_socket, clientAddress));
+        
+        socket = a_socket;
+        this.clientAddress = clientAddress;
+    }
+    
+    private static String getClientId(DatagramSocket a_socket, InetSocketAddress clientAddress) {
+        // build the key ...
+        StringBuffer l_buf = new StringBuffer();
+        l_buf.append( a_socket.getLocalAddress().getHostAddress() );
+        l_buf.append( ':' );
+        l_buf.append( a_socket.getLocalPort() ).append( "<-" );
+        l_buf.append( clientAddress.getAddress().getHostAddress() );
+        l_buf.append( ':' );
+        l_buf.append( clientAddress.getPort() );
+        return l_buf.toString();
+    }
+
+
+    // ----------------------------------------------
+    // Accessors of conn. parameters to client id
+    // ----------------------------------------------
+
+    public DatagramSocket getSocket() throws KeyExpiryException
+    {
+        checkExpiry();
+        return socket;
+    }
+
+    public InetSocketAddress getLocalAddress() throws KeyExpiryException
+    {
+        checkExpiry();
+        return new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+    }
+
+    public InetSocketAddress getRemoteAddress() throws KeyExpiryException
+    {
+        checkExpiry();
+        return clientAddress;
+    }
+
+    // ----------------------------------------------
+    // Key expiration methods.
+    // ----------------------------------------------
+
+
+    /**
+     * Determines if the client represented by this ClientKey is still
+     * connected to the server.  Once disconnected the ClientKey is expired
+     * by the server so processing on behalf of the client does not continue.
+     *
+     * @return true if the client is no longer connected to the server, false
+     * if the client is connected.
+     */
+    public boolean hasExpired()
+    {
+        return hasExpired;
+    }
+
+
+    /**
+     * Expires this key to indicate the disconnection of the client represented
+     * by this key from the server.  It is intentionally package friendly to
+     * only allow access by the ClientModule.  Tries to close socket if it is
+     * still open.
+     */
+    protected void expire() throws IOException
+    {
+        hasExpired = true;
+        
+        if ( null != socket )
+        {
+            socket.close();
+        }
+    }
+
+
+    /**
+     * Utility method to throw key expiration exception if this ClientKey has
+     * expired.  This method is called by most accessor methods within this
+     * class with <code>hasExpired()</code> being the only exception.  The
+     * purpose for this is to force ClientKey using modules to check for
+     * expiration rather rely upon them to check to see if the key is valid
+     * before use everytime.
+     * 
+     * @throws KeyExpiryException to force the handling of expired keys rather
+     * than depending on developers to maintain a convention of checking for
+     * key expiration before use in other modules.
+     */
+    protected void checkExpiry() throws KeyExpiryException
+    {
+        if( hasExpired )
+        {
+            throw new KeyExpiryException( this );
+        }
+    }
+}

Added: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPListenerManager.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPListenerManager.java	Fri Oct 22 17:27:52 2004
@@ -0,0 +1,511 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.seda.listener;
+
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.seda.ResourceException;
+import org.apache.seda.buffer.BufferPool;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.InputEvent;
+import org.apache.seda.event.ProtocolEvent;
+import org.apache.seda.event.ProtocolSubscriber;
+
+
+/**
+ * A listener manager that uses non-blocking NIO based constructs to detect
+ * client connections on server socket listeners.
+ * 
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class UDPListenerManager 
+    implements
+    DisconnectSubscriber,
+    ProtocolSubscriber,
+    ListenerManager,
+    Runnable
+{
+    /** event manager used to decouple source to sink relationships */
+    private final EventRouter router;
+    /** selector used to select a acceptable socket channel */
+    private final Selector selector;
+    /** the buffer pool we get direct buffers from */
+    private BufferPool bp = null;
+    /** a map of auth service names to their protocol providers */
+    private final Map protocols;
+    /** the client keys for accepted connections */
+    private final Set clients;
+    /** the set of listeners managed */
+    private final Set listeners;
+    /** new listeners waiting to be bound */
+    private final Set bindListeners;
+    /** old listeners waiting to be unbound */
+    private final Set unbindListeners;
+    
+    /** the thread driving this Runnable */ 
+    private Thread thread = null;
+    /** parameter used to politely stop running thread */
+    private Boolean hasStarted = null;
+    /** the listner manager's monitor */
+    private ListenerManagerMonitor monitor = null;
+    
+    
+    /**
+     * Creates a default listener manager using an event router.
+     * 
+     * @param router the router to publish events to
+     * @throws IOException
+     */
+    public UDPListenerManager( EventRouter router, BufferPool bp ) throws IOException
+    {
+        this.router = router;
+        this.clients = new HashSet();
+        this.selector = Selector.open();
+        this.protocols = new HashMap();
+        this.listeners = new HashSet();
+        this.hasStarted = new Boolean( false );
+        this.bindListeners = new HashSet();
+        this.unbindListeners = new HashSet();
+        this.bp = bp;
+        
+        this.router.subscribe( DisconnectEvent.class, null, this );
+        this.monitor = new ListenerManagerMonitorAdapter();
+    }
+    
+    
+    /**
+     * Gets the monitor.
+     * 
+     * @return Returns the monitor.
+     */
+    public ListenerManagerMonitor getMonitor()
+    {
+        return monitor;
+    }
+
+    
+    /**
+     * Sets the monitor.
+     * 
+     * @param monitor The monitor to set.
+     */
+    public void setMonitor( ListenerManagerMonitor monitor )
+    {
+        this.monitor = monitor;
+    }
+
+
+    /**
+     * @see org.apache.seda.listener.ListenerManager#bind(ListenerConfig)
+     */
+    public void bind( ListenerConfig listener ) throws IOException
+    {
+        ensureListenerConfigType(listener);
+
+        synchronized ( bindListeners )
+        {
+            bindListeners.add( listener );
+        }
+        
+        selector.wakeup();
+    }
+    
+    
+    /**
+     * @see org.apache.seda.listener.ListenerManager#unbind(ListenerConfig)
+     */
+    public void unbind( ListenerConfig listener ) throws IOException
+    {
+        ensureListenerConfigType(listener);
+
+        synchronized ( unbindListeners )
+        {
+            unbindListeners.add( listener );
+        }
+
+        selector.wakeup();
+    }
+
+    private void ensureListenerConfigType(ListenerConfig listener) {
+        if (listener == null)
+            throw new NullPointerException();
+        if (!(listener instanceof UDPListenerConfig))
+            throw new IllegalArgumentException();
+    }
+
+    /**
+     * Binds all the listeners that have been collecting up waiting to be bound.
+     * This is not fail fast - meaning it will try all the connections in the
+     * ready to bind set even if one fails.
+     */
+    private void bind()
+    {
+        synchronized ( bindListeners )
+        {
+            Iterator list = bindListeners.iterator();
+            while ( list.hasNext() )
+            {
+                UDPListenerConfig listener =
+                        ( UDPListenerConfig ) list.next();
+                    
+                try
+                {
+                    DatagramChannel channel = DatagramChannel.open();
+                    InetSocketAddress address =
+                    new InetSocketAddress(
+                            listener.getInetAddress(),
+                            listener.getInetServiceEntry().getPort() );
+                    channel.socket().bind(address);
+                    channel.configureBlocking( false );
+                    channel.register( selector, SelectionKey.OP_READ, 
+                            listener );
+                    
+                    synchronized ( listeners )
+                    {
+                        listeners.add( listener );
+                    }
+                    
+                    bindListeners.remove( listener );
+                }
+                catch ( IOException e )
+                {
+                    monitor.failedToBind( listener, e );
+                }
+            
+                monitor.bindOccured( listener );
+            }
+        }
+    }
+    
+    
+    /**
+     * Unbinds listeners that have been collecting up waiting to be unbound.
+     * This is not fail fast - meaning it will try all the connections in the
+     * ready to unbind set even if one fails.
+     */
+    private void unbind()
+    {
+        SelectionKey key = null;
+        
+        synchronized ( unbindListeners ) 
+        {
+            Iterator keys = selector.keys().iterator();
+            while ( keys.hasNext() )
+            {
+                key = ( SelectionKey ) keys.next();
+                ListenerConfig listener =
+                        ( ListenerConfig ) key.attachment();
+    
+                if ( unbindListeners.contains( listener ) )
+                {    
+                    try
+                    {
+                        key.channel().close();
+                    }
+                    catch ( IOException e )
+                    {
+                        monitor.failedToUnbind( listener, e );
+                    }
+                
+                    key.cancel();
+                    
+                    synchronized ( listeners )
+                    {
+                        listeners.remove( listener );
+                    }
+                    
+                    unbindListeners.remove( listener );
+                    monitor.unbindOccured( listener );
+                }
+            }
+        }
+    }
+
+
+    // ------------------------------------------------------------------------
+    // DisconnectSubscriber Implementation
+    // ------------------------------------------------------------------------
+    
+    
+    /**
+     * Disconnects a client by removing the clientKey from the listener.
+     * 
+     * @param event the disconnect event
+     */
+    public void inform( DisconnectEvent event )
+    {
+        clients.remove( event.getClientKey() );
+        
+        try
+        {
+            event.getClientKey().expire();
+        }
+        catch ( IOException e ) 
+        {
+            monitor.failedToExpire( event.getClientKey(), e );
+        }
+    }
+    
+    
+    /*
+     *  (non-Javadoc)
+     * @see org.apache.seda.event.Subscriber#inform(java.util.EventObject)
+     */
+    public void inform( EventObject event )
+    {
+        inform( ( DisconnectEvent ) event );
+    }
+    
+
+    /**
+     * Informs this subscriber of a protocol event.
+     *
+     * @param event the protocol event to inform of
+     */
+    public void inform( ProtocolEvent event )
+    {
+    }
+
+
+    // ------------------------------------------------------------------------
+    // Runnable implementation and start/stop controls
+    // ------------------------------------------------------------------------
+    
+    
+    /**
+     * @see java.lang.Runnable#run()
+     */
+    public void run()
+    {
+        while ( hasStarted.booleanValue() ) 
+        {
+            try
+            {
+                monitor.enteringSelect( selector );
+                
+                bind();
+                unbind();
+                
+                if ( 0 == selector.select() )
+                {
+                    monitor.selectTimedOut( selector );
+                    continue;
+                }
+            } 
+            catch( IOException e )
+            {
+                monitor.failedToSelect( selector, e );
+                continue;
+            }
+            
+            
+            Iterator list = selector.selectedKeys().iterator();
+            while ( list.hasNext() )
+            {
+                SelectionKey key = ( SelectionKey ) list.next();
+                list.remove();
+                
+                if ( key.isReadable() )
+                {
+                    DatagramChannel channel =  (DatagramChannel)
+                        key.channel();
+                    
+                    ByteBuffer buf = null;
+                    InetSocketAddress clientAddress;
+                    boolean read = false;
+                    try
+                    {
+                        buf = bp.getBuffer(this);
+                        clientAddress = (InetSocketAddress) channel.receive(buf);
+                        if (clientAddress != null) {
+                            read = true;
+                        }
+                    }
+                    catch ( IOException e )
+                    {
+                        // FIXME InputManagerMonitor.failedToRead?
+                        monitor.failedToAccept( key, e );
+                        continue;
+                    }
+                    catch ( ResourceException e ) {
+                        // FIXME InputManagerMonitor.bufferUnavailable?
+                        //monitor.bufferUnavailable( bp, e );
+                        continue;
+                    } finally {
+                        if (!read && buf != null) {
+                            bp.releaseClaim(buf, this);
+                        }
+                    }
+                    
+                    if (read) {
+                        ClientKey clientKey = new UDPClientKey( channel.socket(), clientAddress );
+                        InputEvent event = new ConcreteInputEvent(clientKey, buf);
+                        router.publish( event );
+                    }
+                }
+            }
+        }
+    }
+    
+    
+    /**
+     * Starts up this ListnerManager service.
+     * 
+     * @throws IllegalStateException if this service has already started
+     */
+    public void start()
+    {
+        if ( hasStarted.booleanValue() )
+        {
+            throw new IllegalStateException( "Already started!" );
+        }
+        
+        hasStarted = new Boolean( true );
+        thread = new Thread( this );
+        thread.start();
+        monitor.started();
+    }
+    
+    
+    /**
+     * Gracefully stops this ListenerManager service.  Blocks calling thread 
+     * until the service has fully stopped.
+     * 
+     * @throws InterruptedException if this service's driver thread cannot start
+     */
+    public void stop() throws InterruptedException
+    {
+        hasStarted = new Boolean( false );
+        selector.wakeup();
+
+        /*
+         * First lets shutdown the listeners so we're not open to having new
+         * connections created while we are trying to shutdown.  Plus we want 
+         * to make the thread for this component do the work to prevent locking
+         * issues with the selector.
+         */
+        if ( ! listeners.isEmpty() )
+        {
+            Iterator list = listeners.iterator();
+            while( list.hasNext() )
+            {
+                ListenerConfig listener =
+                        ( ListenerConfig ) list.next();
+                    
+                try
+                {
+                    /*
+                     * put the listening in the set ready to be unbound by
+                     * the runnable's thread of execution
+                     */
+                    unbind( listener );
+                }
+                catch( IOException e )
+                {
+                    // monitor.doSomthing( e );
+                    e.printStackTrace();
+                }
+            }
+        }
+        
+        /*
+         * Now we gracefully disconnect the clients that are already connected 
+         * so they can complete their current requests and recieve a 
+         * notification of disconnect.  At this point we don't know how we're 
+         * going to do that so we just do it abruptly for the time being.  This
+         * will need to be changed in the future. 
+         */
+        if ( ! clients.isEmpty() )
+        {
+            synchronized( clients )
+            {
+                Iterator list = clients.iterator();
+                while ( list.hasNext() )
+                {
+                    UDPClientKey key = ( UDPClientKey ) list.next();
+                        
+                    try
+                    {
+                        DatagramSocket socket = key.getSocket();
+                        socket.close();
+                        list.remove();
+                    }
+                    catch( IOException e )
+                    {
+                        // monitor.doSomthing( e );
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        /*
+         * now wait until the thread of execution for this runnable dies
+         */
+        if ( this.thread.isAlive() )
+        {
+            Thread.sleep( 100 );
+            selector.wakeup();
+        }
+
+        monitor.stopped();
+    }
+
+
+    /**
+     * A concrete InputEvent that uses the buffer pool to properly implement
+     * the interest claim and release methods.
+     *
+     * @author <a href="mailto:aok123@bellsouth.net">Alex Karasulu</a>
+     * @author $Author$
+     * @version $Revision$
+     */
+    class ConcreteInputEvent extends InputEvent
+    {
+        ConcreteInputEvent( ClientKey key, ByteBuffer buffer )
+        {
+            super( UDPListenerManager.this, key, buffer );
+        }
+        
+        public ByteBuffer claimInterest( Object party )
+        {
+            bp.claimInterest( getBuffer(), party );
+            return getBuffer().asReadOnlyBuffer();
+        }
+        
+        public void releaseInterest( Object party )
+        {
+            bp.releaseClaim( getBuffer(), party );
+        }
+    }
+}

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java	Fri Oct 22 17:27:52 2004
@@ -18,28 +18,27 @@
 
 
 import java.io.IOException;
-
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-
-import java.util.Map;
-import java.util.HashMap;
 import java.util.EventObject;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.event.OutputEvent;
-import org.apache.seda.stage.DefaultStage;
-import org.apache.seda.stage.StageHandler;
-import org.apache.seda.listener.ClientKey;
+import org.apache.seda.event.AbstractSubscriber;
 import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.ConnectSubscriber;
 import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.OutputEvent;
 import org.apache.seda.event.OutputSubscriber;
-import org.apache.seda.event.ConnectSubscriber;
+import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.listener.TCPClientKey;
+import org.apache.seda.stage.DefaultStage;
 import org.apache.seda.stage.DefaultStageConfig;
-import org.apache.seda.event.AbstractSubscriber;
 import org.apache.seda.stage.LoggingStageMonitor;
-import org.apache.seda.event.DisconnectSubscriber;
-import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.stage.StageHandler;
 
 
 /**
@@ -124,7 +123,7 @@
      */
     public void inform( ConnectEvent event )
     {
-        ClientKey key = event.getClientKey();
+        TCPClientKey key = (TCPClientKey) event.getClientKey();
         
         try
         {

Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java	(original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java	Fri Oct 22 17:27:52 2004
@@ -151,7 +151,7 @@
 
         try
         {
-            name = inetDb.getProtoByPort( key.getSocket().getLocalPort() );
+            name = inetDb.getProtoByPort( key.getLocalAddress().getPort() );
         }
         catch ( KeyExpiryException e )
         {