You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/10/23 02:27:54 UTC
svn commit: rev 55337 - in incubator/directory/seda/trunk/src/java/org/apache/seda: decoder encoder input listener output protocol
Author: trustin
Date: Fri Oct 22 17:27:52 2004
New Revision: 55337
Added:
incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPClientKey.java (contents, props changed)
incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPClientKey.java (contents, props changed)
incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPListenerManager.java (contents, props changed)
Modified:
incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java
incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java
incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
Log:
* Splitted ClientKey to TCPClientKey and UDPClientKey
* ClientKey is an abstract class now.
* ClientKey.getClientAddress() is changed to ClientKey.getRemoteAddress().
* Added ClientKey.getLocalAddress()
* Implemented UDPListenerManager
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/decoder/DefaultDecoderManager.java Fri Oct 22 17:27:52 2004
@@ -166,7 +166,7 @@
private StatefulDecoder createDecoder( ClientKey key )
throws KeyExpiryException
{
- String proto = inetdb.getProtoByPort( key.getSocket().getLocalPort() );
+ String proto = inetdb.getProtoByPort( key.getLocalAddress().getPort() );
DecoderFactory factory = ( DecoderFactory ) factories.get( proto );
return factory.createDecoder();
}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/encoder/DefaultEncoderManager.java Fri Oct 22 17:27:52 2004
@@ -140,7 +140,7 @@
private StatefulEncoder createEncoder( ClientKey key )
throws KeyExpiryException
{
- String proto = inetdb.getProtoByPort( key.getSocket().getLocalPort() );
+ String proto = inetdb.getProtoByPort( key.getLocalAddress().getPort() );
EncoderFactory factory = ( EncoderFactory ) factories.get( proto );
return factory.createEncoder();
}
@@ -193,7 +193,7 @@
try
{
- port = key.getSocket().getLocalPort();
+ port = key.getLocalAddress().getPort();
}
catch ( KeyExpiryException e )
{
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/input/TCPInputManager.java Fri Oct 22 17:27:52 2004
@@ -17,26 +17,26 @@
package org.apache.seda.input;
-import java.util.Iterator;
-import java.util.ArrayList;
-
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
-import org.apache.seda.event.InputEvent;
import org.apache.seda.ResourceException;
import org.apache.seda.buffer.BufferPool;
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.listener.ClientKey;
+import org.apache.seda.event.AbstractSubscriber;
import org.apache.seda.event.ConnectEvent;
-import org.apache.seda.event.DisconnectEvent;
import org.apache.seda.event.ConnectSubscriber;
-import org.apache.seda.event.AbstractSubscriber;
+import org.apache.seda.event.DisconnectEvent;
import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.InputEvent;
+import org.apache.seda.listener.ClientKey;
import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.listener.TCPClientKey;
/**
@@ -257,12 +257,12 @@
// cycle through connections and register them with the selector
for ( int ii = 0; ii < l_events.length; ii++ )
{
- ClientKey l_key = null;
+ TCPClientKey l_key = null;
SocketChannel l_channel = null;
try
{
- l_key = l_events[ii].getClientKey();
+ l_key = (TCPClientKey) l_events[ii].getClientKey();
l_channel = l_key.getSocket().getChannel();
// hands-off blocking sockets!
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/ClientKey.java Fri Oct 22 17:27:52 2004
@@ -17,9 +17,8 @@
package org.apache.seda.listener;
-import java.net.Socket;
-
import java.io.IOException;
+import java.net.InetSocketAddress;
/**
@@ -41,7 +40,7 @@
* @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
* @version $Rev$
*/
-public final class ClientKey
+public abstract class ClientKey
{
// ----------------------------------------------
// Private members.
@@ -53,11 +52,6 @@
private final Object outputLock = new Object();
/** Unique key or client id */
private final String clientId;
- /** Socket connection to client */
- private final Socket socket;
-
- /** Whether or not this key has expired: the client has disconnected. */
- private boolean hasExpired = false;
// ----------------------------------------------
@@ -76,23 +70,11 @@
*
* This makes the key unique at any single point in time.
*
- * @param a_socket newly established client socket connection to the
- * server.
+ * @param clientId the id of this client key
*/
- ClientKey( final Socket a_socket )
+ ClientKey( String clientId )
{
- // build the key ...
- StringBuffer l_buf = new StringBuffer();
- l_buf.append( a_socket.getLocalAddress().getHostAddress() );
- l_buf.append( ':' );
- l_buf.append( a_socket.getLocalPort() ).append( "<-" );
- l_buf.append( a_socket.getInetAddress().getHostAddress() );
- l_buf.append( ':' );
- l_buf.append( a_socket.getPort() );
-
- // set finals ...
- clientId = l_buf.toString();
- socket = a_socket;
+ this.clientId = clientId;
}
@@ -110,49 +92,28 @@
* than depending on developers to maintain a convention of checking for
* key expiration before use in other modules.
*/
- public String getClientId() throws KeyExpiryException
+ public final String getClientId() throws KeyExpiryException
{
checkExpiry();
return clientId;
}
-
/**
- * Gets the clients socket connection.
+ * Gets the local socket address.
*
- * @return the client's socket connection
- */
- public Socket getSocket() throws KeyExpiryException
- {
- checkExpiry();
- return socket;
- }
-
-
- /**
- * Gets the client's IP address.
- *
- * @return the client's ip address.
+ * @return the local socket address
* @throws KeyExpiryException to force the handling of expired keys
*/
- public String getClientAddress() throws KeyExpiryException
- {
- checkExpiry();
- return socket.getInetAddress().getHostAddress();
- }
-
-
+ public abstract InetSocketAddress getLocalAddress() throws KeyExpiryException;
+
+
/**
- * Gets the client's hostname.
+ * Gets the client's socket address.
*
- * @return the client's hostname.
+ * @return the client's socket address.
* @throws KeyExpiryException to force the handling of expired keys
*/
- public String getClientHost() throws KeyExpiryException
- {
- checkExpiry();
- return socket.getInetAddress().getHostName();
- }
+ public abstract InetSocketAddress getRemoteAddress() throws KeyExpiryException;
// ----------------------------------------------
@@ -166,7 +127,7 @@
* @return ouput lock object.
* @throws KeyExpiryException to force the handling of expired keys
*/
- public Object getOutputLock() throws KeyExpiryException
+ public final Object getOutputLock() throws KeyExpiryException
{
checkExpiry();
return outputLock;
@@ -179,7 +140,7 @@
* @return input lock object.
* @throws KeyExpiryException to force the handling of expired keys
*/
- public Object getInputLock() throws KeyExpiryException
+ public final Object getInputLock() throws KeyExpiryException
{
checkExpiry();
return inputLock;
@@ -199,10 +160,7 @@
* @return true if the client is no longer connected to the server, false
* if the client is connected.
*/
- public boolean hasExpired()
- {
- return hasExpired;
- }
+ public abstract boolean hasExpired();
/**
@@ -211,15 +169,7 @@
* only allow access by the ClientModule. Tries to close socket if it is
* still open.
*/
- void expire() throws IOException
- {
- hasExpired = true;
-
- if ( null != socket )
- {
- socket.close();
- }
- }
+ protected abstract void expire() throws IOException;
/**
@@ -234,13 +184,7 @@
* than depending on developers to maintain a convention of checking for
* key expiration before use in other modules.
*/
- void checkExpiry() throws KeyExpiryException
- {
- if( hasExpired )
- {
- throw new KeyExpiryException( this );
- }
- }
+ protected abstract void checkExpiry() throws KeyExpiryException;
// ----------------------------------------------
@@ -253,7 +197,7 @@
*
* @return the client id string.
*/
- public String toString()
+ public final String toString()
{
return clientId;
}
@@ -265,7 +209,7 @@
*
* @return the clientId hashCode value.
*/
- public int hashCode()
+ public final int hashCode()
{
return clientId.hashCode();
}
@@ -281,7 +225,7 @@
*
* @return true if an_obj equals this ClientKey, false otherwise.
*/
- public boolean equals( Object an_obj )
+ public final boolean equals( Object an_obj )
{
if( this == an_obj )
{
Added: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPClientKey.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPClientKey.java Fri Oct 22 17:27:52 2004
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.seda.listener;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+
+/**
+ * Every client that successfully binds anonymously or with a valid identity
+ * has a unique client key represented by this class. The key uniquely
+ * identifies the client based on the connection parameters: interface and port
+ * used on the server as well as the interface and port used by the client.
+ * <p>
+ * The ClientKey plays a central role in coordinating activities with the
+ * server across various threads. Threads within the same stage or across
+ * stages are synchronized on client resources using lock objects held by a
+ * ClientKey instance. Socket IO is managed using a pair of lock objects
+ * specificially for this purpose.
+ * </p>
+ *
+ * @todo do we really need these lock objects?
+ * @todo why are we carrying around the damn socket?
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public final class TCPClientKey extends ClientKey
+{
+ // ----------------------------------------------
+ // Private members.
+ // ----------------------------------------------
+
+ /** Socket connection to client */
+ private final Socket socket;
+
+ /** Whether or not this key has expired: the client has disconnected. */
+ private boolean hasExpired = false;
+
+
+ // ----------------------------------------------
+ // Constructors
+ // ----------------------------------------------
+
+
+ /**
+ * Generates a unique connection/client identifier String for a client
+ * socket connection. The key is composed of the local server address
+ * and port attached to the remote client address and port. If the
+ * server ip and port are 192.168.1.1:1389 and the client's ip and port are
+ * 34.23.12.1:5678 then the key string would be:
+ *
+ * 192.168.1.1:1389<-34.23.12.1:5678
+ *
+ * This makes the key unique at any single point in time.
+ *
+ * @param a_socket newly established client socket connection to the
+ * server.
+ */
+ TCPClientKey( final Socket a_socket )
+ {
+ super(getClientId(a_socket));
+ socket = a_socket;
+ }
+
+ private static String getClientId(Socket a_socket) {
+ // build the key ...
+ StringBuffer l_buf = new StringBuffer();
+ l_buf.append( a_socket.getLocalAddress().getHostAddress() );
+ l_buf.append( ':' );
+ l_buf.append( a_socket.getLocalPort() ).append( "<-" );
+ l_buf.append( a_socket.getInetAddress().getHostAddress() );
+ l_buf.append( ':' );
+ l_buf.append( a_socket.getPort() );
+ return l_buf.toString();
+ }
+
+
+ // ----------------------------------------------
+ // Accessors of conn. parameters to client id
+ // ----------------------------------------------
+
+
+ /**
+ * Gets the clients socket connection.
+ *
+ * @return the client's socket connection
+ */
+ public Socket getSocket() throws KeyExpiryException
+ {
+ checkExpiry();
+ return socket;
+ }
+
+ public InetSocketAddress getLocalAddress() throws KeyExpiryException {
+ checkExpiry();
+ return new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+
+ public InetSocketAddress getRemoteAddress() throws KeyExpiryException
+ {
+ checkExpiry();
+ return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
+ }
+
+
+ // ----------------------------------------------
+ // Key expiration methods.
+ // ----------------------------------------------
+
+
+ /**
+ * Determines if the client represented by this ClientKey is still
+ * connected to the server. Once disconnected the ClientKey is expired
+ * by the server so processing on behalf of the client does not continue.
+ *
+ * @return true if the client is no longer connected to the server, false
+ * if the client is connected.
+ */
+ public boolean hasExpired()
+ {
+ return hasExpired;
+ }
+
+
+ /**
+ * Expires this key to indicate the disconnection of the client represented
+ * by this key from the server. It is intentionally package friendly to
+ * only allow access by the ClientModule. Tries to close socket if it is
+ * still open.
+ */
+ protected void expire() throws IOException
+ {
+ hasExpired = true;
+
+ if ( null != socket )
+ {
+ socket.close();
+ }
+ }
+
+
+ /**
+ * Utility method to throw key expiration exception if this ClientKey has
+ * expired. This method is called by most accessor methods within this
+ * class with <code>hasExpired()</code> being the only exception. The
+ * purpose for this is to force ClientKey using modules to check for
+ * expiration rather rely upon them to check to see if the key is valid
+ * before use everytime.
+ *
+ * @throws KeyExpiryException to force the handling of expired keys rather
+ * than depending on developers to maintain a convention of checking for
+ * key expiration before use in other modules.
+ */
+ protected void checkExpiry() throws KeyExpiryException
+ {
+ if( hasExpired )
+ {
+ throw new KeyExpiryException( this );
+ }
+ }
+}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/TCPListenerManager.java Fri Oct 22 17:27:52 2004
@@ -17,8 +17,6 @@
package org.apache.seda.listener;
-import org.apache.seda.event.*;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -26,7 +24,19 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.*;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.ProtocolEvent;
+import org.apache.seda.event.ProtocolSubscriber;
/**
@@ -337,7 +347,7 @@
continue;
}
- ClientKey clientKey = new ClientKey( channel.socket() );
+ ClientKey clientKey = new TCPClientKey( channel.socket() );
ConnectEvent event = new ConnectEvent( this, clientKey );
router.publish( event );
}
@@ -420,7 +430,7 @@
Iterator list = clients.iterator();
while ( list.hasNext() )
{
- ClientKey key = ( ClientKey ) list.next();
+ TCPClientKey key = ( TCPClientKey ) list.next();
try
{
Added: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPClientKey.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPClientKey.java Fri Oct 22 17:27:52 2004
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.seda.listener;
+
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+
+
+/**
+ * Every client that successfully binds anonymously or with a valid identity
+ * has a unique client key represented by this class. The key uniquely
+ * identifies the client based on the connection parameters: interface and port
+ * used on the server as well as the interface and port used by the client.
+ * <p>
+ * The ClientKey plays a central role in coordinating activities with the
+ * server across various threads. Threads within the same stage or across
+ * stages are synchronized on client resources using lock objects held by a
+ * ClientKey instance. Socket IO is managed using a pair of lock objects
+ * specificially for this purpose.
+ * </p>
+ *
+ * @todo do we really need these lock objects?
+ * @todo why are we carrying around the damn socket?
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public final class UDPClientKey extends ClientKey
+{
+ // ----------------------------------------------
+ // Private members.
+ // ----------------------------------------------
+
+ /** Socket connection to client */
+ private final DatagramSocket socket;
+ /** Client address */
+ private final InetSocketAddress clientAddress;
+
+ /** Whether or not this key has expired: the client has disconnected. */
+ private boolean hasExpired = false;
+
+
+ // ----------------------------------------------
+ // Constructors
+ // ----------------------------------------------
+
+
+ /**
+ * Generates a unique connection/client identifier String for a client
+ * socket connection. The key is composed of the local server address
+ * and port attached to the remote client address and port. If the
+ * server ip and port are 192.168.1.1:1389 and the client's ip and port are
+ * 34.23.12.1:5678 then the key string would be:
+ *
+ * 192.168.1.1:1389<-34.23.12.1:5678
+ *
+ * This makes the key unique at any single point in time.
+ *
+ * @param a_socket newly established client socket connection to the
+ * server.
+ */
+ UDPClientKey( final DatagramSocket a_socket, final InetSocketAddress clientAddress )
+ {
+ super(getClientId(a_socket, clientAddress));
+
+ socket = a_socket;
+ this.clientAddress = clientAddress;
+ }
+
+ private static String getClientId(DatagramSocket a_socket, InetSocketAddress clientAddress) {
+ // build the key ...
+ StringBuffer l_buf = new StringBuffer();
+ l_buf.append( a_socket.getLocalAddress().getHostAddress() );
+ l_buf.append( ':' );
+ l_buf.append( a_socket.getLocalPort() ).append( "<-" );
+ l_buf.append( clientAddress.getAddress().getHostAddress() );
+ l_buf.append( ':' );
+ l_buf.append( clientAddress.getPort() );
+ return l_buf.toString();
+ }
+
+
+ // ----------------------------------------------
+ // Accessors of conn. parameters to client id
+ // ----------------------------------------------
+
+ public DatagramSocket getSocket() throws KeyExpiryException
+ {
+ checkExpiry();
+ return socket;
+ }
+
+ public InetSocketAddress getLocalAddress() throws KeyExpiryException
+ {
+ checkExpiry();
+ return new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+
+ public InetSocketAddress getRemoteAddress() throws KeyExpiryException
+ {
+ checkExpiry();
+ return clientAddress;
+ }
+
+ // ----------------------------------------------
+ // Key expiration methods.
+ // ----------------------------------------------
+
+
+ /**
+ * Determines if the client represented by this ClientKey is still
+ * connected to the server. Once disconnected the ClientKey is expired
+ * by the server so processing on behalf of the client does not continue.
+ *
+ * @return true if the client is no longer connected to the server, false
+ * if the client is connected.
+ */
+ public boolean hasExpired()
+ {
+ return hasExpired;
+ }
+
+
+ /**
+ * Expires this key to indicate the disconnection of the client represented
+ * by this key from the server. It is intentionally package friendly to
+ * only allow access by the ClientModule. Tries to close socket if it is
+ * still open.
+ */
+ protected void expire() throws IOException
+ {
+ hasExpired = true;
+
+ if ( null != socket )
+ {
+ socket.close();
+ }
+ }
+
+
+ /**
+ * Utility method to throw key expiration exception if this ClientKey has
+ * expired. This method is called by most accessor methods within this
+ * class with <code>hasExpired()</code> being the only exception. The
+ * purpose for this is to force ClientKey using modules to check for
+ * expiration rather rely upon them to check to see if the key is valid
+ * before use everytime.
+ *
+ * @throws KeyExpiryException to force the handling of expired keys rather
+ * than depending on developers to maintain a convention of checking for
+ * key expiration before use in other modules.
+ */
+ protected void checkExpiry() throws KeyExpiryException
+ {
+ if( hasExpired )
+ {
+ throw new KeyExpiryException( this );
+ }
+ }
+}
Added: incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPListenerManager.java
==============================================================================
--- (empty file)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/listener/UDPListenerManager.java Fri Oct 22 17:27:52 2004
@@ -0,0 +1,511 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.seda.listener;
+
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.seda.ResourceException;
+import org.apache.seda.buffer.BufferPool;
+import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.InputEvent;
+import org.apache.seda.event.ProtocolEvent;
+import org.apache.seda.event.ProtocolSubscriber;
+
+
+/**
+ * A listener manager that uses non-blocking NIO based constructs to detect
+ * client connections on server socket listeners.
+ *
+ * @author <a href="mailto:directory-dev@incubator.apache.org">Apache Directory Project</a>
+ * @version $Rev$
+ */
+public class UDPListenerManager
+ implements
+ DisconnectSubscriber,
+ ProtocolSubscriber,
+ ListenerManager,
+ Runnable
+{
+ /** event manager used to decouple source to sink relationships */
+ private final EventRouter router;
+ /** selector used to select a acceptable socket channel */
+ private final Selector selector;
+ /** the buffer pool we get direct buffers from */
+ private BufferPool bp = null;
+ /** a map of auth service names to their protocol providers */
+ private final Map protocols;
+ /** the client keys for accepted connections */
+ private final Set clients;
+ /** the set of listeners managed */
+ private final Set listeners;
+ /** new listeners waiting to be bound */
+ private final Set bindListeners;
+ /** old listeners waiting to be unbound */
+ private final Set unbindListeners;
+
+ /** the thread driving this Runnable */
+ private Thread thread = null;
+ /** parameter used to politely stop running thread */
+ private Boolean hasStarted = null;
+ /** the listner manager's monitor */
+ private ListenerManagerMonitor monitor = null;
+
+
+ /**
+ * Creates a default listener manager using an event router.
+ *
+ * @param router the router to publish events to
+ * @throws IOException
+ */
+ public UDPListenerManager( EventRouter router, BufferPool bp ) throws IOException
+ {
+ this.router = router;
+ this.clients = new HashSet();
+ this.selector = Selector.open();
+ this.protocols = new HashMap();
+ this.listeners = new HashSet();
+ this.hasStarted = new Boolean( false );
+ this.bindListeners = new HashSet();
+ this.unbindListeners = new HashSet();
+ this.bp = bp;
+
+ this.router.subscribe( DisconnectEvent.class, null, this );
+ this.monitor = new ListenerManagerMonitorAdapter();
+ }
+
+
+ /**
+ * Gets the monitor.
+ *
+ * @return Returns the monitor.
+ */
+ public ListenerManagerMonitor getMonitor()
+ {
+ return monitor;
+ }
+
+
+ /**
+ * Sets the monitor.
+ *
+ * @param monitor The monitor to set.
+ */
+ public void setMonitor( ListenerManagerMonitor monitor )
+ {
+ this.monitor = monitor;
+ }
+
+
+ /**
+ * @see org.apache.seda.listener.ListenerManager#bind(ListenerConfig)
+ */
+ public void bind( ListenerConfig listener ) throws IOException
+ {
+ ensureListenerConfigType(listener);
+
+ synchronized ( bindListeners )
+ {
+ bindListeners.add( listener );
+ }
+
+ selector.wakeup();
+ }
+
+
+ /**
+ * @see org.apache.seda.listener.ListenerManager#unbind(ListenerConfig)
+ */
+ public void unbind( ListenerConfig listener ) throws IOException
+ {
+ ensureListenerConfigType(listener);
+
+ synchronized ( unbindListeners )
+ {
+ unbindListeners.add( listener );
+ }
+
+ selector.wakeup();
+ }
+
+ private void ensureListenerConfigType(ListenerConfig listener) {
+ if (listener == null)
+ throw new NullPointerException();
+ if (!(listener instanceof UDPListenerConfig))
+ throw new IllegalArgumentException();
+ }
+
+ /**
+ * Binds all the listeners that have been collecting up waiting to be bound.
+ * This is not fail fast - meaning it will try all the connections in the
+ * ready to bind set even if one fails.
+ */
+ private void bind()
+ {
+ synchronized ( bindListeners )
+ {
+ Iterator list = bindListeners.iterator();
+ while ( list.hasNext() )
+ {
+ UDPListenerConfig listener =
+ ( UDPListenerConfig ) list.next();
+
+ try
+ {
+ DatagramChannel channel = DatagramChannel.open();
+ InetSocketAddress address =
+ new InetSocketAddress(
+ listener.getInetAddress(),
+ listener.getInetServiceEntry().getPort() );
+ channel.socket().bind(address);
+ channel.configureBlocking( false );
+ channel.register( selector, SelectionKey.OP_READ,
+ listener );
+
+ synchronized ( listeners )
+ {
+ listeners.add( listener );
+ }
+
+ bindListeners.remove( listener );
+ }
+ catch ( IOException e )
+ {
+ monitor.failedToBind( listener, e );
+ }
+
+ monitor.bindOccured( listener );
+ }
+ }
+ }
+
+
+ /**
+ * Unbinds listeners that have been collecting up waiting to be unbound.
+ * This is not fail fast - meaning it will try all the connections in the
+ * ready to unbind set even if one fails.
+ */
+ private void unbind()
+ {
+ SelectionKey key = null;
+
+ synchronized ( unbindListeners )
+ {
+ Iterator keys = selector.keys().iterator();
+ while ( keys.hasNext() )
+ {
+ key = ( SelectionKey ) keys.next();
+ ListenerConfig listener =
+ ( ListenerConfig ) key.attachment();
+
+ if ( unbindListeners.contains( listener ) )
+ {
+ try
+ {
+ key.channel().close();
+ }
+ catch ( IOException e )
+ {
+ monitor.failedToUnbind( listener, e );
+ }
+
+ key.cancel();
+
+ synchronized ( listeners )
+ {
+ listeners.remove( listener );
+ }
+
+ unbindListeners.remove( listener );
+ monitor.unbindOccured( listener );
+ }
+ }
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // DisconnectSubscriber Implementation
+ // ------------------------------------------------------------------------
+
+
+ /**
+ * Disconnects a client by removing the clientKey from the listener.
+ *
+ * @param event the disconnect event
+ */
+ public void inform( DisconnectEvent event )
+ {
+ clients.remove( event.getClientKey() );
+
+ try
+ {
+ event.getClientKey().expire();
+ }
+ catch ( IOException e )
+ {
+ monitor.failedToExpire( event.getClientKey(), e );
+ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.seda.event.Subscriber#inform(java.util.EventObject)
+ */
+ public void inform( EventObject event )
+ {
+ inform( ( DisconnectEvent ) event );
+ }
+
+
+ /**
+ * Informs this subscriber of a protocol event.
+ *
+ * @param event the protocol event to inform of
+ */
+ public void inform( ProtocolEvent event )
+ {
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Runnable implementation and start/stop controls
+ // ------------------------------------------------------------------------
+
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run()
+ {
+ while ( hasStarted.booleanValue() )
+ {
+ try
+ {
+ monitor.enteringSelect( selector );
+
+ bind();
+ unbind();
+
+ if ( 0 == selector.select() )
+ {
+ monitor.selectTimedOut( selector );
+ continue;
+ }
+ }
+ catch( IOException e )
+ {
+ monitor.failedToSelect( selector, e );
+ continue;
+ }
+
+
+ Iterator list = selector.selectedKeys().iterator();
+ while ( list.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) list.next();
+ list.remove();
+
+ if ( key.isReadable() )
+ {
+ DatagramChannel channel = (DatagramChannel)
+ key.channel();
+
+ ByteBuffer buf = null;
+ InetSocketAddress clientAddress;
+ boolean read = false;
+ try
+ {
+ buf = bp.getBuffer(this);
+ clientAddress = (InetSocketAddress) channel.receive(buf);
+ if (clientAddress != null) {
+ read = true;
+ }
+ }
+ catch ( IOException e )
+ {
+ // FIXME InputManagerMonitor.failedToRead?
+ monitor.failedToAccept( key, e );
+ continue;
+ }
+ catch ( ResourceException e ) {
+ // FIXME InputManagerMonitor.bufferUnavailable?
+ //monitor.bufferUnavailable( bp, e );
+ continue;
+ } finally {
+ if (!read && buf != null) {
+ bp.releaseClaim(buf, this);
+ }
+ }
+
+ if (read) {
+ ClientKey clientKey = new UDPClientKey( channel.socket(), clientAddress );
+ InputEvent event = new ConcreteInputEvent(clientKey, buf);
+ router.publish( event );
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Starts up this ListnerManager service.
+ *
+ * @throws IllegalStateException if this service has already started
+ */
+ public void start()
+ {
+ if ( hasStarted.booleanValue() )
+ {
+ throw new IllegalStateException( "Already started!" );
+ }
+
+ hasStarted = new Boolean( true );
+ thread = new Thread( this );
+ thread.start();
+ monitor.started();
+ }
+
+
+ /**
+ * Gracefully stops this ListenerManager service. Blocks calling thread
+ * until the service has fully stopped.
+ *
+ * @throws InterruptedException if this service's driver thread cannot start
+ */
+ public void stop() throws InterruptedException
+ {
+ hasStarted = new Boolean( false );
+ selector.wakeup();
+
+ /*
+ * First lets shutdown the listeners so we're not open to having new
+ * connections created while we are trying to shutdown. Plus we want
+ * to make the thread for this component do the work to prevent locking
+ * issues with the selector.
+ */
+ if ( ! listeners.isEmpty() )
+ {
+ Iterator list = listeners.iterator();
+ while( list.hasNext() )
+ {
+ ListenerConfig listener =
+ ( ListenerConfig ) list.next();
+
+ try
+ {
+ /*
+ * put the listening in the set ready to be unbound by
+ * the runnable's thread of execution
+ */
+ unbind( listener );
+ }
+ catch( IOException e )
+ {
+ // monitor.doSomthing( e );
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /*
+ * Now we gracefully disconnect the clients that are already connected
+ * so they can complete their current requests and recieve a
+ * notification of disconnect. At this point we don't know how we're
+ * going to do that so we just do it abruptly for the time being. This
+ * will need to be changed in the future.
+ */
+ if ( ! clients.isEmpty() )
+ {
+ synchronized( clients )
+ {
+ Iterator list = clients.iterator();
+ while ( list.hasNext() )
+ {
+ UDPClientKey key = ( UDPClientKey ) list.next();
+
+ try
+ {
+ DatagramSocket socket = key.getSocket();
+ socket.close();
+ list.remove();
+ }
+ catch( IOException e )
+ {
+ // monitor.doSomthing( e );
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /*
+ * now wait until the thread of execution for this runnable dies
+ */
+ if ( this.thread.isAlive() )
+ {
+ Thread.sleep( 100 );
+ selector.wakeup();
+ }
+
+ monitor.stopped();
+ }
+
+
+ /**
+ * A concrete InputEvent that uses the buffer pool to properly implement
+ * the interest claim and release methods.
+ *
+ * @author <a href="mailto:aok123@bellsouth.net">Alex Karasulu</a>
+ * @author $Author$
+ * @version $Revision$
+ */
+ class ConcreteInputEvent extends InputEvent
+ {
+ ConcreteInputEvent( ClientKey key, ByteBuffer buffer )
+ {
+ super( UDPListenerManager.this, key, buffer );
+ }
+
+ public ByteBuffer claimInterest( Object party )
+ {
+ bp.claimInterest( getBuffer(), party );
+ return getBuffer().asReadOnlyBuffer();
+ }
+
+ public void releaseInterest( Object party )
+ {
+ bp.releaseClaim( getBuffer(), party );
+ }
+ }
+}
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/output/DefaultOutputManager.java Fri Oct 22 17:27:52 2004
@@ -18,28 +18,27 @@
import java.io.IOException;
-
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-
-import java.util.Map;
-import java.util.HashMap;
import java.util.EventObject;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.seda.event.EventRouter;
-import org.apache.seda.event.OutputEvent;
-import org.apache.seda.stage.DefaultStage;
-import org.apache.seda.stage.StageHandler;
-import org.apache.seda.listener.ClientKey;
+import org.apache.seda.event.AbstractSubscriber;
import org.apache.seda.event.ConnectEvent;
+import org.apache.seda.event.ConnectSubscriber;
import org.apache.seda.event.DisconnectEvent;
+import org.apache.seda.event.DisconnectSubscriber;
+import org.apache.seda.event.EventRouter;
+import org.apache.seda.event.OutputEvent;
import org.apache.seda.event.OutputSubscriber;
-import org.apache.seda.event.ConnectSubscriber;
+import org.apache.seda.listener.ClientKey;
+import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.listener.TCPClientKey;
+import org.apache.seda.stage.DefaultStage;
import org.apache.seda.stage.DefaultStageConfig;
-import org.apache.seda.event.AbstractSubscriber;
import org.apache.seda.stage.LoggingStageMonitor;
-import org.apache.seda.event.DisconnectSubscriber;
-import org.apache.seda.listener.KeyExpiryException;
+import org.apache.seda.stage.StageHandler;
/**
@@ -124,7 +123,7 @@
*/
public void inform( ConnectEvent event )
{
- ClientKey key = event.getClientKey();
+ TCPClientKey key = (TCPClientKey) event.getClientKey();
try
{
Modified: incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java
==============================================================================
--- incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java (original)
+++ incubator/directory/seda/trunk/src/java/org/apache/seda/protocol/DefaultRequestProcessor.java Fri Oct 22 17:27:52 2004
@@ -151,7 +151,7 @@
try
{
- name = inetDb.getProtoByPort( key.getSocket().getLocalPort() );
+ name = inetDb.getProtoByPort( key.getLocalAddress().getPort() );
}
catch ( KeyExpiryException e )
{