You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/12/28 03:49:51 UTC
svn commit: r359355 [2/4] - in /directory/network/trunk: ./
src/java/org/apache/mina/common/
src/java/org/apache/mina/transport/socket/nio/support/
src/java/org/apache/mina/transport/vmpipe/support/
src/test/org/apache/mina/common/ xdocs/
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Tue Dec 27 18:49:31 2005
@@ -1,675 +1,675 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.util.Queue;
-
-/**
- * {@link IoConnector} for datagram transport (UDP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramSessionManager
-{
- private static volatile int nextId = 0;
-
- private final IoConnector wrapper;
- private final int id = nextId ++ ;
- private Selector selector;
- private boolean broadcast;
- private boolean reuseAddress;
- private int receiveBufferSize = -1;
- private int sendBufferSize = -1;
- private int trafficClass = -1;
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
- private final Queue flushingSessions = new Queue();
- private final Queue trafficControllingSessions = new Queue();
- private Worker worker;
-
- /**
- * Creates a new instance.
- */
- public DatagramConnectorDelegate( IoConnector wrapper )
- {
- this.wrapper = wrapper;
- }
-
- public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
- {
- return connect( address, null, handler, filterChainBuilder );
- }
-
- public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
- IoHandler handler, IoFilterChainBuilder filterChainBuilder )
- {
- if( address == null )
- throw new NullPointerException( "address" );
- if( handler == null )
- throw new NullPointerException( "handler" );
-
- if( !( address instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
-
- if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
- {
- throw new IllegalArgumentException( "Unexpected local address type: "
- + localAddress.getClass() );
- }
-
- if( filterChainBuilder == null )
- {
- filterChainBuilder = IoFilterChainBuilder.NOOP;
- }
-
- DatagramChannel ch = null;
- boolean initialized = false;
- try
- {
- ch = DatagramChannel.open();
- ch.socket().setReuseAddress( reuseAddress );
- ch.socket().setBroadcast( broadcast );
- if( receiveBufferSize > 0 )
- {
- ch.socket().setReceiveBufferSize( receiveBufferSize );
- }
- if( sendBufferSize > 0 )
- {
- ch.socket().setSendBufferSize( sendBufferSize );
- }
- if( trafficClass > 0 )
- {
- ch.socket().setTrafficClass( trafficClass );
- }
- if( localAddress != null )
- {
- ch.socket().bind( localAddress );
- }
- ch.connect( address );
- ch.configureBlocking( false );
- initialized = true;
- }
- catch( IOException e )
- {
- return ConnectFuture.newFailedFuture( e );
- }
- finally
- {
- if( !initialized && ch != null )
- {
- try
- {
- ch.disconnect();
- ch.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
-
- RegistrationRequest request = new RegistrationRequest( ch, handler, filterChainBuilder );
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- try
- {
- ch.disconnect();
- ch.close();
- }
- catch( IOException e2 )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e2 );
- }
-
- return ConnectFuture.newFailedFuture( e );
- }
-
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
- }
-
- selector.wakeup();
- return request;
- }
-
- public boolean getBroadcast()
- {
- return broadcast;
- }
-
- public void setBroadcast( boolean broadcast )
- {
- this.broadcast = broadcast;
- }
-
- public boolean getReuseAddress()
- {
- return reuseAddress;
- }
-
- public void setReuseAddress( boolean reuseAddress )
- {
- this.reuseAddress = reuseAddress;
- }
-
- public int getReceiveBufferSize()
- {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize( int receiveBufferSize )
- {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public int getSendBufferSize()
- {
- return sendBufferSize;
- }
-
- public void setSendBufferSize( int sendBufferSize )
- {
- this.sendBufferSize = sendBufferSize;
- }
-
- public int getTrafficClass()
- {
- return trafficClass;
- }
-
- public void setTrafficClass( int trafficClass )
- {
- this.trafficClass = trafficClass;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- public void closeSession( DatagramSessionImpl session )
- {
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply return
- // silently here because it we can simply conclude that
- // this session is not managed by this connector or
- // already closed.
- return;
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( session );
- }
- }
-
- selector.wakeup();
- }
-
- public void flushSession( DatagramSessionImpl session )
- {
- scheduleFlush( session );
- Selector selector = this.selector;
- if( selector != null )
- {
- selector.wakeup();
- }
- }
-
- private void scheduleFlush( DatagramSessionImpl session )
- {
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
- }
-
- public void updateTrafficMask( DatagramSessionImpl session )
- {
- scheduleTrafficControl( session );
- Selector selector = this.selector;
- if( selector != null )
- {
- selector.wakeup();
- }
- selector.wakeup();
- }
-
- private void scheduleTrafficControl( DatagramSessionImpl session )
- {
- synchronized( trafficControllingSessions )
- {
- trafficControllingSessions.push( session );
- }
- }
-
- private void doUpdateTrafficMask()
- {
- if( trafficControllingSessions.isEmpty() )
- return;
-
- for( ;; )
- {
- DatagramSessionImpl session;
-
- synchronized( trafficControllingSessions )
- {
- session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
- }
-
- if( session == null )
- break;
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- if( key == null )
- {
- scheduleTrafficControl( session );
- break;
- }
- // skip if channel is already closed
- if( !key.isValid() )
- {
- continue;
- }
-
- // The normal is OP_READ and, if there are write requests in the
- // session's write queue, set OP_WRITE to trigger flushing.
- int ops = SelectionKey.OP_READ;
- Queue writeRequestQueue = session.getWriteRequestQueue();
- synchronized( writeRequestQueue )
- {
- if( !writeRequestQueue.isEmpty() )
- {
- ops |= SelectionKey.OP_WRITE;
- }
- }
-
- // Now mask the preferred ops with the mask of the current session
- int mask = session.getTrafficMask().getInterestOps();
- key.interestOps( ops & mask );
- }
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( "DatagramConnector-" + id );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
- doUpdateTrafficMask();
-
- if( nKeys > 0 )
- {
- processReadySessions( selector.selectedKeys() );
- }
-
- flushSessions();
- cancelKeys();
-
- if( selector.keys().isEmpty() )
- {
- synchronized( DatagramConnectorDelegate.this )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
- }
-
- private void processReadySessions( Set keys )
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
- it.remove();
-
- DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
-
- if( key.isReadable() && session.getTrafficMask().isReadable() )
- {
- readSession( session );
- }
-
- if( key.isWritable() && session.getTrafficMask().isWritable() )
- {
- scheduleFlush( session );
- }
- }
- }
-
- private void readSession( DatagramSessionImpl session )
- {
-
- ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
- try
- {
- int readBytes = session.getChannel().read( readBuf.buf() );
- if( readBytes > 0 )
- {
- readBuf.flip();
- ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
- newBuf.put( readBuf );
- newBuf.flip();
-
- session.increaseReadBytes( readBytes );
- ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
- }
- }
- catch( IOException e )
- {
- ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
- finally
- {
- readBuf.release();
- }
- }
-
- private void flushSessions()
- {
- if( flushingSessions.size() == 0 )
- return;
-
- for( ;; )
- {
- DatagramSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = ( DatagramSessionImpl ) flushingSessions.pop();
- }
-
- if( session == null )
- break;
-
- try
- {
- flush( session );
- }
- catch( IOException e )
- {
- ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
- }
- }
-
- private void flush( DatagramSessionImpl session ) throws IOException
- {
- DatagramChannel ch = session.getChannel();
-
- Queue writeRequestQueue = session.getWriteRequestQueue();
-
- WriteRequest req;
- for( ;; )
- {
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
-
- if( req == null )
- break;
-
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
- {
- // pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf );
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if( key == null )
- {
- scheduleFlush( session );
- break;
- }
- if( !key.isValid() )
- {
- continue;
- }
-
- int pos = buf.position();
- int writtenBytes = ch.write( buf.buf() );
-
- if( writtenBytes == 0 )
- {
- // Kernel buffer is full
- key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
- }
- else if( writtenBytes > 0 )
- {
- key.interestOps( key.interestOps()
- & ( ~SelectionKey.OP_WRITE ) );
-
- // pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- session.increaseWrittenBytes( writtenBytes );
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) );
- }
- }
- }
-
- private void registerNew()
- {
- if( registerQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- break;
-
- DatagramSessionImpl session =
- new DatagramSessionImpl( wrapper, this, req.channel, req.handler );
-
- boolean success = false;
- try
- {
- this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
- req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
- ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session );
-
- SelectionKey key = req.channel.register( selector,
- SelectionKey.OP_READ, session );
-
- session.setSelectionKey( key );
-
- req.setSession( session );
- success = true;
- }
- catch( Throwable t )
- {
- req.setException( t );
- }
- finally
- {
- if( !success )
- {
- try
- {
- req.channel.disconnect();
- req.channel.close();
- }
- catch (IOException e)
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
- }
- }
-
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- DatagramSessionImpl session;
- synchronized( cancelQueue )
- {
- session = ( DatagramSessionImpl ) cancelQueue.pop();
- }
-
- if( session == null )
- break;
- else
- {
- SelectionKey key = session.getSelectionKey();
- DatagramChannel ch = ( DatagramChannel ) key.channel();
- try
- {
- ch.disconnect();
- ch.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- session.getCloseFuture().setClosed();
- key.cancel();
- selector.wakeup(); // wake up again to trigger thread death
- }
- }
- }
-
- private static class RegistrationRequest extends ConnectFuture
- {
- private final DatagramChannel channel;
- private final IoHandler handler;
- private final IoFilterChainBuilder filterChainBuilder;
-
- private RegistrationRequest( DatagramChannel channel,
- IoHandler handler,
- IoFilterChainBuilder filterChainBuilder )
- {
- this.channel = channel;
- this.handler = handler;
- this.filterChainBuilder = filterChainBuilder;
- }
- }
-}
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for datagram transport (UDP/IP).
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramSessionManager
+{
+ private static volatile int nextId = 0;
+
+ private final IoConnector wrapper;
+ private final int id = nextId ++ ;
+ private Selector selector;
+ private boolean broadcast;
+ private boolean reuseAddress;
+ private int receiveBufferSize = -1;
+ private int sendBufferSize = -1;
+ private int trafficClass = -1;
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+ private final Queue flushingSessions = new Queue();
+ private final Queue trafficControllingSessions = new Queue();
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ */
+ public DatagramConnectorDelegate( IoConnector wrapper )
+ {
+ this.wrapper = wrapper;
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+ {
+ return connect( address, null, handler, filterChainBuilder );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( !( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+ }
+
+ if( filterChainBuilder == null )
+ {
+ filterChainBuilder = IoFilterChainBuilder.NOOP;
+ }
+
+ DatagramChannel ch = null;
+ boolean initialized = false;
+ try
+ {
+ ch = DatagramChannel.open();
+ ch.socket().setReuseAddress( reuseAddress );
+ ch.socket().setBroadcast( broadcast );
+ if( receiveBufferSize > 0 )
+ {
+ ch.socket().setReceiveBufferSize( receiveBufferSize );
+ }
+ if( sendBufferSize > 0 )
+ {
+ ch.socket().setSendBufferSize( sendBufferSize );
+ }
+ if( trafficClass > 0 )
+ {
+ ch.socket().setTrafficClass( trafficClass );
+ }
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+ ch.connect( address );
+ ch.configureBlocking( false );
+ initialized = true;
+ }
+ catch( IOException e )
+ {
+ return ConnectFuture.newFailedFuture( e );
+ }
+ finally
+ {
+ if( !initialized && ch != null )
+ {
+ try
+ {
+ ch.disconnect();
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ RegistrationRequest request = new RegistrationRequest( ch, handler, filterChainBuilder );
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ try
+ {
+ ch.disconnect();
+ ch.close();
+ }
+ catch( IOException e2 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e2 );
+ }
+
+ return ConnectFuture.newFailedFuture( e );
+ }
+
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ }
+
+ selector.wakeup();
+ return request;
+ }
+
+ public boolean getBroadcast()
+ {
+ return broadcast;
+ }
+
+ public void setBroadcast( boolean broadcast )
+ {
+ this.broadcast = broadcast;
+ }
+
+ public boolean getReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress( boolean reuseAddress )
+ {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize( int receiveBufferSize )
+ {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getSendBufferSize()
+ {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize( int sendBufferSize )
+ {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getTrafficClass()
+ {
+ return trafficClass;
+ }
+
+ public void setTrafficClass( int trafficClass )
+ {
+ this.trafficClass = trafficClass;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ public void closeSession( DatagramSessionImpl session )
+ {
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply return
+ // silently here because it we can simply conclude that
+ // this session is not managed by this connector or
+ // already closed.
+ return;
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( session );
+ }
+ }
+
+ selector.wakeup();
+ }
+
+ public void flushSession( DatagramSessionImpl session )
+ {
+ scheduleFlush( session );
+ Selector selector = this.selector;
+ if( selector != null )
+ {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleFlush( DatagramSessionImpl session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ public void updateTrafficMask( DatagramSessionImpl session )
+ {
+ scheduleTrafficControl( session );
+ Selector selector = this.selector;
+ if( selector != null )
+ {
+ selector.wakeup();
+ }
+ selector.wakeup();
+ }
+
+ private void scheduleTrafficControl( DatagramSessionImpl session )
+ {
+ synchronized( trafficControllingSessions )
+ {
+ trafficControllingSessions.push( session );
+ }
+ }
+
+ private void doUpdateTrafficMask()
+ {
+ if( trafficControllingSessions.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ DatagramSessionImpl session;
+
+ synchronized( trafficControllingSessions )
+ {
+ session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleTrafficControl( session );
+ break;
+ }
+ // skip if channel is already closed
+ if( !key.isValid() )
+ {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+ int ops = SelectionKey.OP_READ;
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized( writeRequestQueue )
+ {
+ if( !writeRequestQueue.isEmpty() )
+ {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ }
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ key.interestOps( ops & mask );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "DatagramConnector-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+ doUpdateTrafficMask();
+
+ if( nKeys > 0 )
+ {
+ processReadySessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( DatagramConnectorDelegate.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ it.remove();
+
+ DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
+
+ if( key.isReadable() && session.getTrafficMask().isReadable() )
+ {
+ readSession( session );
+ }
+
+ if( key.isWritable() && session.getTrafficMask().isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+ }
+
+ private void readSession( DatagramSessionImpl session )
+ {
+
+ ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+ try
+ {
+ int readBytes = session.getChannel().read( readBuf.buf() );
+ if( readBytes > 0 )
+ {
+ readBuf.flip();
+ ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+ newBuf.put( readBuf );
+ newBuf.flip();
+
+ session.increaseReadBytes( readBytes );
+ ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
+ }
+ }
+ catch( IOException e )
+ {
+ ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+ finally
+ {
+ readBuf.release();
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ DatagramSessionImpl session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( DatagramSessionImpl ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( DatagramSessionImpl session ) throws IOException
+ {
+ DatagramChannel ch = session.getChannel();
+
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ WriteRequest req;
+ for( ;; )
+ {
+ synchronized( writeRequestQueue )
+ {
+ req = ( WriteRequest ) writeRequestQueue.first();
+ }
+
+ if( req == null )
+ break;
+
+ ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+ if( buf.remaining() == 0 )
+ {
+ // pop and fire event
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf );
+ continue;
+ }
+
+ SelectionKey key = session.getSelectionKey();
+ if( key == null )
+ {
+ scheduleFlush( session );
+ break;
+ }
+ if( !key.isValid() )
+ {
+ continue;
+ }
+
+ int pos = buf.position();
+ int writtenBytes = ch.write( buf.buf() );
+
+ if( writtenBytes == 0 )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ }
+ else if( writtenBytes > 0 )
+ {
+ key.interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+
+ // pop and fire event
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenBytes( writtenBytes );
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) );
+ }
+ }
+ }
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ DatagramSessionImpl session =
+ new DatagramSessionImpl( wrapper, this, req.channel, req.handler );
+
+ boolean success = false;
+ try
+ {
+ this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+ req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+ ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session );
+
+ SelectionKey key = req.channel.register( selector,
+ SelectionKey.OP_READ, session );
+
+ session.setSelectionKey( key );
+
+ req.setSession( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ req.setException( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ try
+ {
+ req.channel.disconnect();
+ req.channel.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ DatagramSessionImpl session;
+ synchronized( cancelQueue )
+ {
+ session = ( DatagramSessionImpl ) cancelQueue.pop();
+ }
+
+ if( session == null )
+ break;
+ else
+ {
+ SelectionKey key = session.getSelectionKey();
+ DatagramChannel ch = ( DatagramChannel ) key.channel();
+ try
+ {
+ ch.disconnect();
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ session.getCloseFuture().setClosed();
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ }
+ }
+ }
+
+ private static class RegistrationRequest extends ConnectFuture
+ {
+ private final DatagramChannel channel;
+ private final IoHandler handler;
+ private final IoFilterChainBuilder filterChainBuilder;
+
+ private RegistrationRequest( DatagramChannel channel,
+ IoHandler handler,
+ IoFilterChainBuilder filterChainBuilder )
+ {
+ this.channel = channel;
+ this.handler = handler;
+ this.filterChainBuilder = filterChainBuilder;
+ }
+ }
+}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Tue Dec 27 18:49:31 2005
@@ -1,69 +1,69 @@
-/*
- * @(#) $Id: DatagramConnectorDelegate.java 351888 2005-12-03 04:39:53Z trustin $
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoFilterChain} for datagram transport (UDP/IP).
- *
- * @author The Apache Directory Project
- */
-class DatagramFilterChain extends AbstractIoFilterChain {
-
- DatagramFilterChain( IoSession parent )
- {
- super( parent );
- }
-
- protected void doWrite( IoSession session, WriteRequest writeRequest )
- {
- DatagramSessionImpl s = ( DatagramSessionImpl ) session;
- Queue writeRequestQueue = s.getWriteRequestQueue();
-
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.push( writeRequest );
- if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
- {
- // Notify DatagramSessionManager only when writeRequestQueue was empty.
- s.getManagerDelegate().flushSession( s );
- }
- }
- }
-
- protected void doClose( IoSession session, CloseFuture closeFuture )
- {
- DatagramSessionImpl s = ( DatagramSessionImpl ) session;
- DatagramSessionManager manager = s.getManagerDelegate();
- if( manager instanceof DatagramConnectorDelegate )
- {
- ( ( DatagramConnectorDelegate ) manager ).closeSession( s );
- }
- else
- {
- closeFuture.setClosed();
- }
- }
-}
+/*
+ * @(#) $Id: DatagramConnectorDelegate.java 351888 2005-12-03 04:39:53Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for datagram transport (UDP/IP).
+ *
+ * @author The Apache Directory Project
+ */
+class DatagramFilterChain extends AbstractIoFilterChain {
+
+ DatagramFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ DatagramSessionImpl s = ( DatagramSessionImpl ) session;
+ Queue writeRequestQueue = s.getWriteRequestQueue();
+
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.push( writeRequest );
+ if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify DatagramSessionManager only when writeRequestQueue was empty.
+ s.getManagerDelegate().flushSession( s );
+ }
+ }
+ }
+
+ protected void doClose( IoSession session, CloseFuture closeFuture )
+ {
+ DatagramSessionImpl s = ( DatagramSessionImpl ) session;
+ DatagramSessionManager manager = s.getManagerDelegate();
+ if( manager instanceof DatagramConnectorDelegate )
+ {
+ ( ( DatagramConnectorDelegate ) manager ).closeSession( s );
+ }
+ else
+ {
+ closeFuture.setClosed();
+ }
+ }
+}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java Tue Dec 27 18:49:31 2005
@@ -1,514 +1,514 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketSessionManager;
-import org.apache.mina.util.Queue;
-
-/**
- * {@link IoAcceptor} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class SocketAcceptorDelegate extends BaseIoAcceptor implements SocketSessionManager
-{
- private static volatile int nextId = 0;
-
- private final IoAcceptor wrapper;
- private final int id = nextId ++ ;
- private final String threadName = "SocketAcceptor-" + id;
- private boolean reuseAddress = false;
- private int backlog = 50;
- private int receiveBufferSize = -1;
- private Selector selector;
- private final Map channels = new HashMap();
-
- private final Queue registerQueue = new Queue();
- private final Queue cancelQueue = new Queue();
-
- private Worker worker;
-
- /**
- * Creates a new instance.
- */
- public SocketAcceptorDelegate( IoAcceptor wrapper )
- {
- this.wrapper = wrapper;
- }
-
- /**
- * Binds to the specified <code>address</code> and handles incoming
- * connections with the specified <code>handler</code>. Backlog value
- * is configured to the value of <code>backlog</code> property.
- *
- * @throws IOException if failed to bind
- */
- public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
- {
- if( address == null )
- {
- throw new NullPointerException( "address" );
- }
-
- if( handler == null )
- {
- throw new NullPointerException( "handler" );
- }
-
- if( !( address instanceof InetSocketAddress ) )
- {
- throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
- }
-
- if( ( ( InetSocketAddress ) address ).getPort() == 0 )
- {
- throw new IllegalArgumentException( "Unsupported port number: 0" );
- }
-
- if( filterChainBuilder == null )
- {
- filterChainBuilder = IoFilterChainBuilder.NOOP;
- }
-
- RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder );
-
- synchronized( this )
- {
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
- startupWorker();
- }
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
-
- if( request.exception != null )
- {
- throw request.exception;
- }
- }
-
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
-
- worker.start();
- }
- }
-
- public void unbind( SocketAddress address )
- {
- // TODO: DIRMINA-93
- if( address == null )
- {
- throw new NullPointerException( "address" );
- }
-
- CancellationRequest request = new CancellationRequest( address );
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
- }
-
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
-
- if( request.exception != null )
- {
- request.exception.fillInStackTrace();
-
- throw request.exception;
- }
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( SocketAcceptorDelegate.this.threadName );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
- cancelKeys();
-
- if( nKeys > 0 )
- {
- processSessions( selector.selectedKeys() );
- }
-
- if( selector.keys().isEmpty() )
- {
- synchronized( SocketAcceptorDelegate.this )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
-
- private void processSessions( Set keys ) throws IOException
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- it.remove();
-
- if( !key.isAcceptable() )
- {
- continue;
- }
-
- ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
-
- SocketChannel ch = ssc.accept();
-
- if( ch == null )
- {
- continue;
- }
-
- boolean success = false;
- try
- {
- RegistrationRequest req = ( RegistrationRequest ) key.attachment();
- SocketSessionImpl session = new SocketSessionImpl( SocketAcceptorDelegate.this.wrapper, ch, req.handler );
- SocketAcceptorDelegate.this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
- req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
- ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
- session.getIoProcessor().addNew( session );
- success = true;
- }
- catch( Throwable t )
- {
- ExceptionMonitor.getInstance().exceptionCaught( t );
- }
- finally
- {
- if( !success )
- {
- ch.close();
- }
- }
- }
- }
- }
-
-
- private void registerNew()
- {
- if( registerQueue.isEmpty() )
- {
- return;
- }
-
- for( ;; )
- {
- RegistrationRequest req;
-
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- {
- break;
- }
-
- ServerSocketChannel ssc = null;
-
- try
- {
- ssc = ServerSocketChannel.open();
- ssc.configureBlocking( false );
-
- // Configure the server socket,
- ssc.socket().setReuseAddress( isReuseAddress() );
- if( getReceiveBufferSize() > 0 )
- {
- ssc.socket().setReceiveBufferSize( getReceiveBufferSize() );
- }
-
- // and bind.
- ssc.socket().bind( req.address, getBacklog() );
- ssc.register( selector, SelectionKey.OP_ACCEPT, req );
-
- channels.put( req.address, ssc );
- }
- catch( IOException e )
- {
- req.exception = e;
- }
- finally
- {
- synchronized( req )
- {
- req.done = true;
-
- req.notify();
- }
-
- if( ssc != null && req.exception != null )
- {
- try
- {
- ssc.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
- }
- }
-
-
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- {
- return;
- }
-
- for( ;; )
- {
- CancellationRequest request;
-
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
-
- if( request == null )
- {
- break;
- }
-
- ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
-
- // close the channel
- try
- {
- if( ssc == null )
- {
- request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
- }
- else
- {
- SelectionKey key = ssc.keyFor( selector );
-
- key.cancel();
-
- selector.wakeup(); // wake up again to trigger thread death
-
- ssc.close();
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- synchronized( request )
- {
- request.done = true;
-
- request.notify();
- }
- }
- }
- }
-
- public int getReceiveBufferSize()
- {
- return receiveBufferSize;
- }
-
- /**
- * @param receiveBufferSize <tt>-1</tt> to use the default value.
- */
- public void setReceiveBufferSize( int receiveBufferSize )
- {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public boolean isReuseAddress()
- {
- return reuseAddress;
- }
-
- public void setReuseAddress( boolean reuseAddress )
- {
- this.reuseAddress = reuseAddress;
- }
-
- public int getBacklog()
- {
- return backlog;
- }
-
- public void setBacklog( int backlog )
- {
- if( backlog <= 0 )
- {
- throw new IllegalArgumentException( "backlog: " + backlog );
- }
- this.backlog = backlog;
- }
-
- private static class RegistrationRequest
- {
- private final SocketAddress address;
- private final IoHandler handler;
- private final IoFilterChainBuilder filterChainBuilder;
- private IOException exception;
- private boolean done;
-
- private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
- {
- this.address = address;
- this.handler = handler;
- this.filterChainBuilder = filterChainBuilder;
- }
- }
-
-
- private static class CancellationRequest
- {
- private final SocketAddress address;
-
- private boolean done;
-
- private RuntimeException exception;
-
- private CancellationRequest( SocketAddress address )
- {
- this.address = address;
- }
- }
-}
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.transport.socket.nio.SocketSessionManager;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketAcceptorDelegate extends BaseIoAcceptor implements SocketSessionManager
+{
+ private static volatile int nextId = 0;
+
+ private final IoAcceptor wrapper;
+ private final int id = nextId ++ ;
+ private final String threadName = "SocketAcceptor-" + id;
+ private boolean reuseAddress = false;
+ private int backlog = 50;
+ private int receiveBufferSize = -1;
+ private Selector selector;
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ */
+ public SocketAcceptorDelegate( IoAcceptor wrapper )
+ {
+ this.wrapper = wrapper;
+ }
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming
+ * connections with the specified <code>handler</code>. Backlog value
+ * is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+ {
+ throw new IllegalArgumentException( "Unsupported port number: 0" );
+ }
+
+ if( filterChainBuilder == null )
+ {
+ filterChainBuilder = IoFilterChainBuilder.NOOP;
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder );
+
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ worker.start();
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ // TODO: DIRMINA-93
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( SocketAcceptorDelegate.this.threadName );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+ cancelKeys();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketAcceptorDelegate.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ SocketSessionImpl session = new SocketSessionImpl( SocketAcceptorDelegate.this.wrapper, ch, req.handler );
+ SocketAcceptorDelegate.this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+ req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+ session.getIoProcessor().addNew( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ ssc.socket().setReuseAddress( isReuseAddress() );
+ if( getReceiveBufferSize() > 0 )
+ {
+ ssc.socket().setReceiveBufferSize( getReceiveBufferSize() );
+ }
+
+ // and bind.
+ ssc.socket().bind( req.address, getBacklog() );
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ channels.put( req.address, ssc );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notify();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ;; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+
+ request.notify();
+ }
+ }
+ }
+ }
+
+ public int getReceiveBufferSize()
+ {
+ return receiveBufferSize;
+ }
+
+ /**
+ * @param receiveBufferSize <tt>-1</tt> to use the default value.
+ */
+ public void setReceiveBufferSize( int receiveBufferSize )
+ {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public boolean isReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress( boolean reuseAddress )
+ {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public int getBacklog()
+ {
+ return backlog;
+ }
+
+ public void setBacklog( int backlog )
+ {
+ if( backlog <= 0 )
+ {
+ throw new IllegalArgumentException( "backlog: " + backlog );
+ }
+ this.backlog = backlog;
+ }
+
+ private static class RegistrationRequest
+ {
+ private final SocketAddress address;
+ private final IoHandler handler;
+ private final IoFilterChainBuilder filterChainBuilder;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+ {
+ this.address = address;
+ this.handler = handler;
+ this.filterChainBuilder = filterChainBuilder;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+
+ private boolean done;
+
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}