You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/10/26 07:58:31 UTC
svn commit: r328570 - in /directory/network/trunk/src:
java/org/apache/mina/common/ java/org/apache/mina/common/support/
java/org/apache/mina/filter/ java/org/apache/mina/transport/socket/nio/
java/org/apache/mina/transport/socket/nio/support/ java/org...
Author: trustin
Date: Tue Oct 25 22:58:10 2005
New Revision: 328570
URL: http://svn.apache.org/viewcvs?rev=328570&view=rev
Log:
* Added IoFilter.sessionCreated()
* Added DelegatedIoAcceptor and DelegatedIoConnector
* Implementation of SocketAcceptor and SocketConnector moved to SocketAcceptorDelegate and SocketConnectorDelegate
** Changed the access modifier of some classes that doesn't need to expose.
Added:
directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoAcceptor.java (with props)
directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoConnector.java (with props)
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java (with props)
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java (with props)
Modified:
directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java
directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java
directory/network/trunk/src/java/org/apache/mina/common/support/AbstractIoFilterChain.java
directory/network/trunk/src/java/org/apache/mina/common/support/IoSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/filter/BlacklistFilter.java
directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java
directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
Modified: directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoFilter.java Tue Oct 25 22:58:10 2005
@@ -60,6 +60,11 @@
void filterRemoved( NextFilter nextFilter, Object parent ) throws Exception;
/**
+ * Filters {@link IoHandler#sessionCreated(IoSession)} event.
+ */
+ void sessionCreated( NextFilter nextFilter, IoSession session ) throws Exception;
+
+ /**
* Filters {@link IoHandler#sessionOpened(IoSession)} event.
*/
void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception;
@@ -112,6 +117,11 @@
*/
public interface NextFilter
{
+ /**
+ * Forwards <tt>sessionCreated</tt> event to next filter.
+ */
+ void sessionCreated( IoSession session );
+
/**
* Forwards <tt>sessionOpened</tt> event to next filter.
*/
Modified: directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/IoFilterAdapter.java Tue Oct 25 22:58:10 2005
@@ -35,6 +35,11 @@
public void filterRemoved( NextFilter nextFilter, Object parent ) throws Exception
{
}
+
+ public void sessionCreated( NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ nextFilter.sessionCreated( session );
+ }
public void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception
{
Modified: directory/network/trunk/src/java/org/apache/mina/common/support/AbstractIoFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/support/AbstractIoFilterChain.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/support/AbstractIoFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/support/AbstractIoFilterChain.java Tue Oct 25 22:58:10 2005
@@ -91,6 +91,11 @@
public void filterRemoved( NextFilter nextFilter, Object parent )
{
}
+
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ nextFilter.sessionCreated( session );
+ }
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
@@ -164,6 +169,10 @@
{
}
+ public void sessionCreated( NextFilter nextFilter, IoSession session ) throws Exception
+ {
+ session.getHandler().sessionCreated( session );
+ }
public void sessionOpened( NextFilter nextFilter, IoSession session ) throws Exception
{
session.getHandler().sessionOpened( session );
@@ -370,6 +379,24 @@
}
}
+ public void sessionCreated( IoSession session )
+ {
+ Entry head = this.head;
+ callNextSessionCreated(head, session);
+ }
+
+ private void callNextSessionCreated( Entry entry, IoSession session )
+ {
+ try
+ {
+ entry.filter.sessionCreated( entry.nextFilter, session );
+ }
+ catch( Throwable e )
+ {
+ exceptionCaught( session, e );
+ }
+ }
+
public void sessionOpened( IoSession session )
{
Entry head = this.head;
@@ -600,6 +627,11 @@
this.filter = filter;
this.nextFilter = new NextFilter()
{
+ public void sessionCreated( IoSession session )
+ {
+ Entry nextEntry = Entry.this.nextEntry;
+ callNextSessionCreated( nextEntry, session );
+ }
public void sessionOpened( IoSession session )
{
Added: directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoAcceptor.java?rev=328570&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoAcceptor.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoAcceptor.java Tue Oct 25 22:58:10 2005
@@ -0,0 +1,89 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A delegated {@link IoAcceptor} that wraps the other {@link IoAcceptor}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DelegatedIoAcceptor implements IoAcceptor
+{
+ protected final IoAcceptor delegate;
+
+ /**
+ * Creates a new instance.
+ */
+ protected DelegatedIoAcceptor( IoAcceptor delegate )
+ {
+ this.delegate = delegate;
+ }
+
+ public void bind( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ delegate.bind( address, handler );
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ delegate.unbind( address );
+ }
+
+ public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
+ {
+ return delegate.newSession( remoteAddress, localAddress );
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return delegate.getFilterChain();
+ }
+
+ public ExceptionMonitor getExceptionMonitor()
+ {
+ return delegate.getExceptionMonitor();
+ }
+
+ public void setExceptionMonitor( ExceptionMonitor monitor )
+ {
+ delegate.setExceptionMonitor( monitor );
+ }
+
+ /* TODO: DIRMINA-93
+ public boolean isDisconnectClientsOnUnbind()
+ {
+ return delegate.isDisconnectClientsOnUnbind();
+ }
+
+ public void setDisconnectClientsOnUnbind( boolean disconnectClientsOnUnbind )
+ {
+ delegate.setDisconnectClientsOnUnbind( disconnectClientsOnUnbind );
+ }
+ */
+}
Propchange: directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoAcceptor.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoConnector.java?rev=328570&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoConnector.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoConnector.java Tue Oct 25 22:58:10 2005
@@ -0,0 +1,88 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.common.support;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+
+/**
+ * A delegated {@link IoConnector} that wraps the other {@link IoConnector}.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DelegatedIoConnector implements IoConnector
+{
+ protected final IoConnector delegate;
+
+ /**
+ * Creates a new instance.
+ */
+ protected DelegatedIoConnector( IoConnector delegate )
+ {
+ this.delegate = delegate;
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ return delegate.connect( address, handler );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler ) throws IOException
+ {
+ return delegate.connect( address, localAddress, handler );
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return delegate.getFilterChain();
+ }
+
+ public ExceptionMonitor getExceptionMonitor()
+ {
+ return delegate.getExceptionMonitor();
+ }
+
+ public void setExceptionMonitor( ExceptionMonitor monitor )
+ {
+ delegate.setExceptionMonitor( monitor );
+ }
+
+ public int getConnectTimeout()
+ {
+ return delegate.getConnectTimeout();
+ }
+
+ public long getConnectTimeoutMillis()
+ {
+ return delegate.getConnectTimeoutMillis();
+ }
+
+ public void setConnectTimeout( int connectTimeout )
+ {
+ delegate.setConnectTimeout( connectTimeout );
+ }
+}
Propchange: directory/network/trunk/src/java/org/apache/mina/common/support/DelegatedIoConnector.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/network/trunk/src/java/org/apache/mina/common/support/IoSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/common/support/IoSessionManagerFilterChain.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/common/support/IoSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/common/support/IoSessionManagerFilterChain.java Tue Oct 25 22:58:10 2005
@@ -58,6 +58,11 @@
public void filterRemoved( NextFilter nextFilter, Object parent )
{
}
+
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ ( ( IoSessionFilterChain ) session.getFilterChain() ).sessionCreated( session );
+ }
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
Modified: directory/network/trunk/src/java/org/apache/mina/filter/BlacklistFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/BlacklistFilter.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/BlacklistFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/BlacklistFilter.java Tue Oct 25 22:58:10 2005
@@ -54,6 +54,19 @@
{
blacklist.remove( address );
}
+
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ if( !isBlocked( session ) )
+ {
+ // forward if not blocked
+ nextFilter.sessionCreated( session );
+ }
+ else
+ {
+ session.close();
+ }
+ }
/**
* Forwards event if and if only the remote address of session is not
Modified: directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/LoggingFilter.java Tue Oct 25 22:58:10 2005
@@ -60,6 +60,11 @@
{
}
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ nextFilter.sessionCreated( session );
+ }
+
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
SessionLog.info( session, "OPENED" );
Modified: directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/filter/ThreadPoolFilter.java Tue Oct 25 22:58:10 2005
@@ -569,6 +569,11 @@
}
}
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ nextFilter.sessionCreated( session );
+ }
+
public void sessionOpened( NextFilter nextFilter,
IoSession session )
{
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Tue Oct 25 22:58:10 2005
@@ -18,14 +18,8 @@
*/
package org.apache.mina.transport.socket.nio;
-import java.io.IOException;
-import java.net.SocketAddress;
-
-import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoSession;
+import org.apache.mina.common.support.DelegatedIoAcceptor;
import org.apache.mina.transport.socket.nio.support.DatagramAcceptorDelegate;
/**
@@ -34,49 +28,13 @@
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
*/
-public class DatagramAcceptor implements IoAcceptor
+public class DatagramAcceptor extends DelegatedIoAcceptor
{
- private final IoAcceptor delegate = new DatagramAcceptorDelegate();
-
- public void bind( SocketAddress address, IoHandler handler ) throws IOException
- {
- delegate.bind( address, handler );
- }
-
- public void unbind( SocketAddress address )
- {
- delegate.unbind( address );
- }
-
- public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
- {
- return delegate.newSession( remoteAddress, localAddress );
- }
-
- public IoFilterChain getFilterChain()
- {
- return delegate.getFilterChain();
- }
-
- public ExceptionMonitor getExceptionMonitor()
- {
- return delegate.getExceptionMonitor();
- }
-
- public void setExceptionMonitor( ExceptionMonitor monitor )
- {
- delegate.setExceptionMonitor( monitor );
- }
-
- /* TODO: DIRMINA-93
- public boolean isDisconnectClientsOnUnbind()
- {
- return delegate.isDisconnectClientsOnUnbind();
- }
-
- public void setDisconnectClientsOnUnbind( boolean disconnectClientsOnUnbind )
+ /**
+ * Creates a new instance.
+ */
+ public DatagramAcceptor()
{
- delegate.setDisconnectClientsOnUnbind( disconnectClientsOnUnbind );
+ super( new DatagramAcceptorDelegate() );
}
- */
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Tue Oct 25 22:58:10 2005
@@ -18,14 +18,8 @@
*/
package org.apache.mina.transport.socket.nio;
-import java.io.IOException;
-import java.net.SocketAddress;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.DelegatedIoConnector;
import org.apache.mina.transport.socket.nio.support.DatagramConnectorDelegate;
/**
@@ -34,55 +28,13 @@
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
*/
-public class DatagramConnector implements IoConnector
+public class DatagramConnector extends DelegatedIoConnector
{
- private final IoConnector delegate = new DatagramConnectorDelegate();
-
/**
* Creates a new instance.
*/
public DatagramConnector()
{
- }
-
- public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
- {
- return delegate.connect( address, handler );
- }
-
- public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
- IoHandler handler ) throws IOException
- {
- return delegate.connect( address, localAddress, handler );
- }
-
- public IoFilterChain getFilterChain()
- {
- return delegate.getFilterChain();
- }
-
- public ExceptionMonitor getExceptionMonitor()
- {
- return delegate.getExceptionMonitor();
- }
-
- public void setExceptionMonitor( ExceptionMonitor monitor )
- {
- delegate.setExceptionMonitor( monitor );
- }
-
- public int getConnectTimeout()
- {
- return delegate.getConnectTimeout();
- }
-
- public long getConnectTimeoutMillis()
- {
- return delegate.getConnectTimeoutMillis();
- }
-
- public void setConnectTimeout( int connectTimeout )
- {
- delegate.setConnectTimeout( connectTimeout );
+ super( new DatagramConnectorDelegate() );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Tue Oct 25 22:58:10 2005
@@ -18,27 +18,9 @@
*/
package org.apache.mina.transport.socket.nio;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.common.support.IoSessionManagerFilterChain;
-import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
-import org.apache.mina.transport.socket.nio.support.SocketSessionImpl;
-import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
-import org.apache.mina.util.Queue;
+import org.apache.mina.common.support.DelegatedIoAcceptor;
+import org.apache.mina.transport.socket.nio.support.SocketAcceptorDelegate;
/**
* {@link IoAcceptor} for socket transport (TCP/IP).
@@ -46,409 +28,19 @@
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
*/
-public class SocketAcceptor extends BaseIoAcceptor
+public class SocketAcceptor extends DelegatedIoAcceptor
{
- private static volatile int nextId = 0;
-
- private final int id = nextId ++ ;
-
- private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
-
- private boolean reuseAddress = false;
- private int backlog = 50;
- private int receiveBufferSize = -1;
-
- private Selector selector;
- private final Map channels = new HashMap();
-
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
-
- private Worker worker;
-
-
/**
* Creates a new instance.
*/
public SocketAcceptor()
{
- }
-
- /**
- * Binds to the specified <code>address</code> and handles incoming
- * connections with the specified <code>handler</code>. Backlog value
- * is configured to the value of <code>backlog</code> property.
- *
- * @throws IOException if failed to bind
- */
- public void bind( SocketAddress address, IoHandler handler ) throws IOException
- {
- if( address == null )
- {
- throw new NullPointerException( "address" );
- }
-
- if( handler == null )
- {
- throw new NullPointerException( "handler" );
- }
-
- if( !( address instanceof InetSocketAddress ) )
- {
- throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
- }
-
- if( ( ( InetSocketAddress ) address ).getPort() == 0 )
- {
- throw new IllegalArgumentException( "Unsupported port number: 0" );
- }
-
- RegistrationRequest request = new RegistrationRequest( address, handler );
-
- synchronized( this )
- {
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
- startupWorker();
- }
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
-
- if( request.exception != null )
- {
- throw request.exception;
- }
- }
-
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
-
- worker.start();
- }
- }
-
- public void unbind( SocketAddress address )
- {
- // FIXME: Make this work properly when disconnectClients is true.
- if( address == null )
- {
- throw new NullPointerException( "address" );
- }
-
- CancellationRequest request = new CancellationRequest( address );
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
- }
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
-
- if( request.exception != null )
- {
- request.exception.fillInStackTrace();
-
- throw request.exception;
- }
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( "SocketAcceptor-" + id );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
- cancelKeys();
-
- if( nKeys > 0 )
- {
- processSessions( selector.selectedKeys() );
- }
-
- if( selector.keys().isEmpty() )
- {
- synchronized( SocketAcceptor.this )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
-
- private void processSessions( Set keys ) throws IOException
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- it.remove();
-
- if( !key.isAcceptable() )
- {
- continue;
- }
-
- ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
-
- SocketChannel ch = ssc.accept();
-
- if( ch == null )
- {
- continue;
- }
-
- boolean success = false;
- try
- {
- RegistrationRequest req = ( RegistrationRequest ) key.attachment();
- SocketSessionImpl session = new SocketSessionImpl( filters, ch, req.handler );
- req.handler.sessionCreated( session );
- SocketIoProcessor.getInstance().addSession( session );
- success = true;
- }
- catch( Throwable t )
- {
- exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
- }
- finally
- {
- if( !success )
- {
- ch.close();
- }
- }
- }
- }
- }
-
-
- private void registerNew()
- {
- if( registerQueue.isEmpty() )
- {
- return;
- }
-
- for( ;; )
- {
- RegistrationRequest req;
-
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- {
- break;
- }
-
- ServerSocketChannel ssc = null;
-
- try
- {
- ssc = ServerSocketChannel.open();
- ssc.configureBlocking( false );
-
- // Configure the server socket,
- ssc.socket().setReuseAddress( isReuseAddress() );
- if( getReceiveBufferSize() > 0 )
- {
- ssc.socket().setReceiveBufferSize( getReceiveBufferSize() );
- }
-
- // and bind.
- ssc.socket().bind( req.address, getBacklog() );
- ssc.register( selector, SelectionKey.OP_ACCEPT, req );
-
- channels.put( req.address, ssc );
- }
- catch( IOException e )
- {
- req.exception = e;
- }
- finally
- {
- synchronized( req )
- {
- req.done = true;
-
- req.notify();
- }
-
- if( ssc != null && req.exception != null )
- {
- try
- {
- ssc.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( this, e );
- }
- }
- }
- }
- }
-
-
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- {
- return;
- }
-
- for( ;; )
- {
- CancellationRequest request;
-
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
-
- if( request == null )
- {
- break;
- }
-
- ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
-
- // close the channel
- try
- {
- if( ssc == null )
- {
- request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
- }
- else
- {
- SelectionKey key = ssc.keyFor( selector );
-
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
-
- ssc.close();
- }
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( this, e );
- }
- finally
- {
- synchronized( request )
- {
- request.done = true;
-
- request.notify();
- }
- }
- }
- }
-
- public IoFilterChain getFilterChain()
- {
- return filters;
+ super( new SocketAcceptorDelegate() );
}
public int getReceiveBufferSize()
{
- return receiveBufferSize;
+ return ( ( SocketAcceptorDelegate ) delegate ).getReceiveBufferSize();
}
/**
@@ -456,59 +48,26 @@
*/
public void setReceiveBufferSize( int receiveBufferSize )
{
- this.receiveBufferSize = receiveBufferSize;
+ ( ( SocketAcceptorDelegate ) delegate ).setReceiveBufferSize( receiveBufferSize );
}
public boolean isReuseAddress()
{
- return reuseAddress;
+ return ( ( SocketAcceptorDelegate ) delegate ).isReuseAddress();
}
public void setReuseAddress( boolean reuseAddress )
{
- this.reuseAddress = reuseAddress;
+ ( ( SocketAcceptorDelegate ) delegate ).setReuseAddress( reuseAddress );
}
public int getBacklog()
{
- return backlog;
+ return ( ( SocketAcceptorDelegate ) delegate ).getBacklog();
}
public void setBacklog( int backlog )
{
- if( backlog <= 0 )
- {
- throw new IllegalArgumentException( "backlog: " + backlog );
- }
- this.backlog = backlog;
- }
-
- private static class RegistrationRequest
- {
- private final SocketAddress address;
- private final IoHandler handler;
- private IOException exception;
- private boolean done;
-
- private RegistrationRequest( SocketAddress address, IoHandler handler )
- {
- this.address = address;
- this.handler = handler;
- }
- }
-
-
- private static class CancellationRequest
- {
- private final SocketAddress address;
-
- private boolean done;
-
- private RuntimeException exception;
-
- private CancellationRequest( SocketAddress address )
- {
- this.address = address;
- }
+ ( ( SocketAcceptorDelegate ) delegate ).setBacklog( backlog );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java Tue Oct 25 22:58:10 2005
@@ -18,27 +18,9 @@
*/
package org.apache.mina.transport.socket.nio;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.common.support.IoSessionManagerFilterChain;
-import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
-import org.apache.mina.transport.socket.nio.support.SocketSessionImpl;
-import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
-import org.apache.mina.util.ExceptionUtil;
-import org.apache.mina.util.Queue;
+import org.apache.mina.common.support.DelegatedIoConnector;
+import org.apache.mina.transport.socket.nio.support.SocketConnectorDelegate;
/**
* {@link IoConnector} for socket transport (TCP/IP).
@@ -46,297 +28,13 @@
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
*/
-public class SocketConnector extends BaseIoConnector
+public class SocketConnector extends DelegatedIoConnector
{
- private static volatile int nextId = 0;
-
- private final int id = nextId++;
-
- private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
-
- private Selector selector;
-
- private final Queue connectQueue = new Queue();
-
- private Worker worker;
-
/**
* Creates a new instance.
*/
public SocketConnector()
{
- }
-
- public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
- {
- return connect( address, null, handler);
- }
-
- public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
- IoHandler handler ) throws IOException
- {
- if( address == null )
- throw new NullPointerException( "address" );
- if( handler == null )
- throw new NullPointerException( "handler" );
-
- if( ! ( address instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
-
- if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected local address type: "
- + localAddress.getClass() );
-
- SocketChannel ch = SocketChannel.open();
- boolean success = false;
- try
- {
- ch.socket().setReuseAddress( true );
- if( localAddress != null )
- {
- ch.socket().bind( localAddress );
- }
-
- ch.configureBlocking( false );
-
- if( ch.connect( address ) )
- {
- SocketSessionImpl session = newSession( ch, handler );
- success = true;
- ConnectFuture future = new ConnectFuture();
- future.setSession( session );
- return future;
- }
-
- success = true;
- }
- finally
- {
- if( !success )
- {
- ch.close();
- }
- }
-
- ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler );
- synchronized( this )
- {
- synchronized( connectQueue )
- {
- connectQueue.push( request );
- }
- startupWorker();
- selector.wakeup();
- }
-
- return request;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- private void registerNew()
- {
- if( connectQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- ConnectionRequest req;
- synchronized( connectQueue )
- {
- req = ( ConnectionRequest ) connectQueue.pop();
- }
-
- if( req == null )
- break;
-
- SocketChannel ch = req.channel;
- try
- {
- ch.register( selector, SelectionKey.OP_CONNECT, req );
- }
- catch( IOException e )
- {
- req.setException( e );
- }
- }
- }
-
- private void processSessions( Set keys )
- {
- Iterator it = keys.iterator();
-
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- if( !key.isConnectable() )
- continue;
-
- SocketChannel ch = ( SocketChannel ) key.channel();
- ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
-
- boolean success = false;
- try
- {
- ch.finishConnect();
- SocketSessionImpl session = newSession( ch, entry.handler );
- entry.setSession( session );
- success = true;
- }
- catch( Throwable e )
- {
- entry.setException( e );
- }
- finally
- {
- key.cancel();
- if( !success )
- {
- try
- {
- ch.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( this, e );
- }
- }
- }
- }
-
- keys.clear();
- }
-
- private void processTimedOutSessions( Set keys )
- {
- long currentTime = System.currentTimeMillis();
- Iterator it = keys.iterator();
-
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- if( !key.isValid() )
- continue;
-
- ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
-
- if( currentTime >= entry.deadline )
- {
- entry.setException( new ConnectException() );
- key.cancel();
- }
- }
- }
-
- private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler ) throws IOException
- {
- SocketSessionImpl session = new SocketSessionImpl( filters, ch, handler );
- try
- {
- handler.sessionCreated( session );
- }
- catch( Throwable e )
- {
- ExceptionUtil.throwException( e );
- }
- SocketIoProcessor.getInstance().addSession( session );
- return session;
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( "SocketConnector-" + id );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select( 1000 );
-
- registerNew();
-
- if( nKeys > 0 )
- {
- processSessions( selector.selectedKeys() );
- }
-
- processTimedOutSessions( selector.keys() );
-
- if( selector.keys().isEmpty() )
- {
- synchronized( SocketConnector.this )
- {
- if( selector.keys().isEmpty() &&
- connectQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( SocketConnector.this, e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( SocketConnector.this, e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
- }
-
- private static class ConnectionRequest extends ConnectFuture
- {
- private final SocketChannel channel;
-
- private final long deadline;
-
- private final IoHandler handler;
-
- private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
- {
- this.channel = channel;
- this.deadline = System.currentTimeMillis() + timeout * 1000L;
- this.handler = handler;
- }
- }
-
- public IoFilterChain getFilterChain()
- {
- return filters;
+ super( new SocketConnectorDelegate() );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Tue Oct 25 22:58:10 2005
@@ -200,7 +200,7 @@
try
{
- req.handler.sessionCreated( s );
+ s.getManagerFilterChain().sessionCreated( s );
}
catch( Throwable t )
{
@@ -323,7 +323,7 @@
try
{
- req.handler.sessionCreated( session );
+ session.getManagerFilterChain().sessionCreated( session );
if( key.isReadable() )
{
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Tue Oct 25 22:58:10 2005
@@ -406,7 +406,7 @@
boolean success = false;
try
{
- req.handler.sessionCreated( session );
+ session.getManagerFilterChain().sessionCreated( session );
SelectionKey key = req.channel.register( selector,
SelectionKey.OP_READ, session );
Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java?rev=328570&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java Tue Oct 25 22:58:10 2005
@@ -0,0 +1,513 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
+import org.apache.mina.transport.socket.nio.support.SocketSessionImpl;
+import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketAcceptorDelegate extends BaseIoAcceptor
+{
+ private static volatile int nextId = 0;
+
+ private final int id = nextId ++ ;
+
+ private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
+
+ private boolean reuseAddress = false;
+ private int backlog = 50;
+ private int receiveBufferSize = -1;
+
+ private Selector selector;
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private Worker worker;
+
+
+ /**
+ * Creates a new instance.
+ */
+ public SocketAcceptorDelegate()
+ {
+ }
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming
+ * connections with the specified <code>handler</code>. Backlog value
+ * is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+ {
+ throw new IllegalArgumentException( "Unsupported port number: 0" );
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler );
+
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ worker.start();
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ // FIXME: Make this work properly when disconnectClients is true.
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "SocketAcceptor-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+ cancelKeys();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketAcceptorDelegate.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( SocketAcceptorDelegate.this, e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( SocketAcceptorDelegate.this, e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ SocketSessionImpl session = new SocketSessionImpl( filters, ch, req.handler );
+ session.getManagerFilterChain().sessionCreated( session );
+ SocketIoProcessor.getInstance().addSession( session );
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( SocketAcceptorDelegate.this, t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ ssc.socket().setReuseAddress( isReuseAddress() );
+ if( getReceiveBufferSize() > 0 )
+ {
+ ssc.socket().setReceiveBufferSize( getReceiveBufferSize() );
+ }
+
+ // and bind.
+ ssc.socket().bind( req.address, getBacklog() );
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ channels.put( req.address, ssc );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notify();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ;; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+
+ request.notify();
+ }
+ }
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return receiveBufferSize;
+ }
+
+ /**
+ * @param receiveBufferSize <tt>-1</tt> to use the default value.
+ */
+ public void setReceiveBufferSize( int receiveBufferSize )
+ {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public boolean isReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress( boolean reuseAddress )
+ {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public int getBacklog()
+ {
+ return backlog;
+ }
+
+ public void setBacklog( int backlog )
+ {
+ if( backlog <= 0 )
+ {
+ throw new IllegalArgumentException( "backlog: " + backlog );
+ }
+ this.backlog = backlog;
+ }
+
+ private static class RegistrationRequest
+ {
+ private final SocketAddress address;
+ private final IoHandler handler;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler )
+ {
+ this.address = address;
+ this.handler = handler;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+
+ private boolean done;
+
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
Propchange: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java?rev=328570&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java Tue Oct 25 22:58:10 2005
@@ -0,0 +1,342 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
+import org.apache.mina.transport.socket.nio.support.SocketSessionImpl;
+import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketConnectorDelegate extends BaseIoConnector
+{
+ private static volatile int nextId = 0;
+
+ private final int id = nextId++;
+
+ private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
+
+ private Selector selector;
+
+ private final Queue connectQueue = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ */
+ public SocketConnectorDelegate()
+ {
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ return connect( address, null, handler);
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler ) throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( ! ( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+
+ SocketChannel ch = SocketChannel.open();
+ boolean success = false;
+ try
+ {
+ ch.socket().setReuseAddress( true );
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+
+ ch.configureBlocking( false );
+
+ if( ch.connect( address ) )
+ {
+ SocketSessionImpl session = newSession( ch, handler );
+ success = true;
+ ConnectFuture future = new ConnectFuture();
+ future.setSession( session );
+ return future;
+ }
+
+ success = true;
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler );
+ synchronized( this )
+ {
+ synchronized( connectQueue )
+ {
+ connectQueue.push( request );
+ }
+ startupWorker();
+ selector.wakeup();
+ }
+
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ private void registerNew()
+ {
+ if( connectQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ ConnectionRequest req;
+ synchronized( connectQueue )
+ {
+ req = ( ConnectionRequest ) connectQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register( selector, SelectionKey.OP_CONNECT, req );
+ }
+ catch( IOException e )
+ {
+ req.setException( e );
+ }
+ }
+ }
+
+ private void processSessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isConnectable() )
+ continue;
+
+ SocketChannel ch = ( SocketChannel ) key.channel();
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ SocketSessionImpl session = newSession( ch, entry.handler );
+ entry.setSession( session );
+ success = true;
+ }
+ catch( Throwable e )
+ {
+ entry.setException( e );
+ }
+ finally
+ {
+ key.cancel();
+ if( !success )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions( Set keys )
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isValid() )
+ continue;
+
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ if( currentTime >= entry.deadline )
+ {
+ entry.setException( new ConnectException() );
+ key.cancel();
+ }
+ }
+ }
+
+ private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler ) throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl( filters, ch, handler );
+ try
+ {
+ session.getManagerFilterChain().sessionCreated( session );
+ }
+ catch( Throwable e )
+ {
+ ExceptionUtil.throwException( e );
+ }
+ SocketIoProcessor.getInstance().addSession( session );
+ return session;
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "SocketConnector-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select( 1000 );
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ processTimedOutSessions( selector.keys() );
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketConnectorDelegate.this )
+ {
+ if( selector.keys().isEmpty() &&
+ connectQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( SocketConnectorDelegate.this, e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( SocketConnectorDelegate.this, e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private static class ConnectionRequest extends ConnectFuture
+ {
+ private final SocketChannel channel;
+
+ private final long deadline;
+
+ private final IoHandler handler;
+
+ private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
+ {
+ this.channel = channel;
+ this.deadline = System.currentTimeMillis() + timeout * 1000L;
+ this.handler = handler;
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+}
\ No newline at end of file
Propchange: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Tue Oct 25 22:58:10 2005
@@ -39,7 +39,7 @@
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$,
*/
-public class SocketIoProcessor
+class SocketIoProcessor
{
private static final SocketIoProcessor instance;
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java Tue Oct 25 22:58:10 2005
@@ -42,7 +42,7 @@
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev$, $Date$
*/
-public class SocketSessionImpl extends BaseIoSession implements SocketSession
+class SocketSessionImpl extends BaseIoSession implements SocketSession
{
private static final int DEFAULT_READ_BUFFER_SIZE = 1024;
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java Tue Oct 25 22:58:10 2005
@@ -14,7 +14,7 @@
*
* @author The Apache Directory Project
*/
-public class SocketSessionManagerFilterChain extends IoSessionManagerFilterChain {
+class SocketSessionManagerFilterChain extends IoSessionManagerFilterChain {
public SocketSessionManagerFilterChain( IoSessionManager manager )
{
Modified: directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Tue Oct 25 22:58:10 2005
@@ -64,7 +64,7 @@
// initialize remote session
try
{
- remoteEntry.getHandler().sessionCreated( remoteSession );
+ remoteEntry.getManagerFilterChain().sessionCreated( remoteSession );
}
catch( Throwable t )
{
Modified: directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/common/IoFilterChainTest.java Tue Oct 25 22:58:10 2005
@@ -52,14 +52,15 @@
public void testDefault()
{
- run( "HSO HMR HMS HSI HEC HSC" );
+ run( "HS0 HSO HMR HMS HSI HEC HSC" );
}
public void testChained() throws Exception
{
chain.addLast( "A", new EventOrderTestFilter( 'A' ) );
chain.addLast( "B", new EventOrderTestFilter( 'B' ) );
- run( "ASO BSO HSO" +
+ run( "AS0 BS0 HS0" +
+ "ASO BSO HSO" +
"AMR BMR HMR" +
"BFW AFW AMS BMS HMS" +
"ASI BSI HSI" +
@@ -80,6 +81,7 @@
private void run( String expectedResult )
{
+ chain.sessionCreated( session );
chain.sessionOpened( session );
chain.messageReceived( session, new Object() );
chain.filterWrite( session, new WriteRequest( new Object() ) );
@@ -115,6 +117,10 @@
{
private IoHandler handler = new IoHandlerAdapter()
{
+ public void sessionCreated(IoSession session) {
+ result += "HS0";
+ }
+
public void sessionOpened(IoSession session) {
result += "HSO";
}
@@ -192,7 +198,14 @@
this.id = id;
}
- public void sessionOpened(NextFilter nextFilter, IoSession session) {
+ public void sessionCreated( NextFilter nextFilter, IoSession session )
+ {
+ result += id + "S0";
+ nextFilter.sessionCreated( session );
+ }
+
+ public void sessionOpened( NextFilter nextFilter, IoSession session )
+ {
result += id + "SO";
nextFilter.sessionOpened( session );
}
Modified: directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java?rev=328570&r1=328569&r2=328570&view=diff
==============================================================================
--- directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java (original)
+++ directory/network/trunk/src/test/org/apache/mina/filter/ThreadPoolFilterRegressionTest.java Tue Oct 25 22:58:10 2005
@@ -185,6 +185,10 @@
public void filterClose( IoSession session, CloseFuture closeFuture )
{
}
+
+ public void sessionCreated( IoSession session )
+ {
+ }
}
public static void main( String[] args )