You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/10/19 10:56:38 UTC
svn commit: r326489 [1/2] - in
/directory/network/trunk/src/java/org/apache/mina/transport: socket/nio/
socket/nio/support/ vmpipe/ vmpipe/support/
Author: trustin
Date: Wed Oct 19 01:56:19 2005
New Revision: 326489
URL: http://svn.apache.org/viewcvs?rev=326489&view=rev
Log:
Moved the classes used internally to <original package name>.support package.
Added:
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
- copied, changed from r326410, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSession.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSession.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipe.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java
- copied, changed from r326410, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeFilter.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java
- copied, changed from r326410, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeIdleStatusChecker.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSession.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java
- copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSessionManagerFilterChain.java
Removed:
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSession.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSessionConfig.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSessionManagerFilterChain.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeFilter.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeIdleStatusChecker.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSessionManagerFilterChain.java
Modified:
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Wed Oct 19 01:56:19 2005
@@ -19,27 +19,15 @@
package org.apache.mina.transport.socket.nio;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionManagerConfig;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoAcceptorConfig;
-import org.apache.mina.common.support.IoSessionManagerFilterChain;
-import org.apache.mina.util.ExceptionUtil;
-import org.apache.mina.util.Queue;
+import org.apache.mina.transport.socket.nio.support.DatagramAcceptorImpl;
/**
* {@link IoAcceptor} for datagram transport (UDP/IP).
@@ -47,569 +35,47 @@
* @author Trustin Lee (trustin@apache.org)
* @version $Rev$, $Date$
*/
-public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
+public class DatagramAcceptor implements IoAcceptor
{
- private static volatile int nextId = 0;
-
- private final IoSessionManagerFilterChain filters =
- new DatagramSessionManagerFilterChain( this );
-
- private final int id = nextId ++ ;
-
- private Selector selector;
-
- private final Map channels = new HashMap();
-
- private final Queue registerQueue = new Queue();
-
- private final Queue cancelQueue = new Queue();
-
- private final Queue flushingSessions = new Queue();
-
- private Worker worker;
-
- /**
- * Creates a new instance.
- */
- public DatagramAcceptor()
- {
- }
-
- public void bind( SocketAddress address, IoHandler handler )
- throws IOException
+ private final IoAcceptor stub = new DatagramAcceptorImpl();
+
+ public void bind( SocketAddress address, IoHandler handler ) throws IOException
{
- if( address == null )
- throw new NullPointerException( "address" );
- if( handler == null )
- throw new NullPointerException( "handler" );
-
- if( !( address instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
- if( ( ( InetSocketAddress ) address ).getPort() == 0 )
- throw new IllegalArgumentException( "Unsupported port number: 0" );
-
- RegistrationRequest request = new RegistrationRequest( address, handler );
- synchronized( this )
- {
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
- startupWorker();
- }
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
-
- if( request.exception != null )
- {
- ExceptionUtil.throwException( request.exception );
- }
+ stub.bind( address, handler );
}
public void unbind( SocketAddress address )
{
- unbind( address, true );
+ stub.unbind( address );
}
public void unbind( SocketAddress address, boolean disconnectClients )
{
- // FIXME: Make this work properly when disconnectClients is true.
- if( address == null )
- throw new NullPointerException( "address" );
-
- CancellationRequest request = new CancellationRequest( address );
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply throw
- // IllegalArgumentException here because we can simply
- // conclude that nothing is bound to the selector.
- throw new IllegalArgumentException( "Address not bound: " + address );
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( request );
- }
- }
- selector.wakeup();
-
- synchronized( request )
- {
- while( !request.done )
- {
- try
- {
- request.wait();
- }
- catch( InterruptedException e )
- {
- }
- }
- }
-
- if( request.exception != null )
- {
- request.exception.fillInStackTrace();
- throw request.exception;
- }
- }
-
- public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
- {
- if( remoteAddress == null )
- {
- throw new NullPointerException( "remoteAddress" );
- }
- if( localAddress == null )
- {
- throw new NullPointerException( "localAddress" );
- }
-
- Selector selector = this.selector;
- DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
- if( selector == null || ch == null )
- {
- throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
- }
-
- SelectionKey key = ch.keyFor( selector );
- if( key == null )
- {
- throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress );
- }
-
- RegistrationRequest req = ( RegistrationRequest ) key.attachment();
- DatagramSession s = new DatagramSession( filters, ch, req.handler );
- s.setRemoteAddress( remoteAddress );
- s.setSelectionKey( key );
-
- try
- {
- req.handler.sessionCreated( s );
- }
- catch( Throwable t )
- {
- exceptionMonitor.exceptionCaught( this, t );
- }
-
- return s;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- void flushSession( DatagramSession session )
- {
- scheduleFlush( session );
- selector.wakeup();
- }
-
- void closeSession( DatagramSession session )
- {
- }
-
- private void scheduleFlush( DatagramSession session )
- {
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( "DatagramAcceptor-" + id );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
-
- if( nKeys > 0 )
- {
- processReadySessions( selector.selectedKeys() );
- }
-
- flushSessions();
- cancelKeys();
-
- if( selector.keys().isEmpty() )
- {
- synchronized( DatagramAcceptor.this )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
- e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
- }
-
- private void processReadySessions( Set keys )
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
- it.remove();
-
- DatagramChannel ch = ( DatagramChannel ) key.channel();
-
- RegistrationRequest req = ( RegistrationRequest ) key.attachment();
- DatagramSession session = new DatagramSession(
- filters, ch, req.handler );
- session.setSelectionKey( key );
-
- try
- {
- req.handler.sessionCreated( session );
-
- if( key.isReadable() )
- {
- readSession( session );
- }
-
- if( key.isWritable() )
- {
- scheduleFlush( session );
- }
- }
- catch( Throwable t )
- {
- exceptionMonitor.exceptionCaught( this, t );
- }
- }
- }
-
- private void readSession( DatagramSession session )
- {
-
- ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
- try
- {
- SocketAddress remoteAddress = session.getChannel().receive(
- readBuf.buf() );
- if( remoteAddress != null )
- {
- readBuf.flip();
- session.setRemoteAddress( remoteAddress );
-
- ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
- newBuf.put( readBuf );
- newBuf.flip();
-
- session.increaseReadBytes( newBuf.remaining() );
- filters.messageReceived( session, newBuf );
- }
- }
- catch( IOException e )
- {
- filters.exceptionCaught( session, e );
- }
- finally
- {
- readBuf.release();
- }
- }
-
- private void flushSessions()
- {
- if( flushingSessions.size() == 0 )
- return;
-
- for( ;; )
- {
- DatagramSession session;
-
- synchronized( flushingSessions )
- {
- session = ( DatagramSession ) flushingSessions.pop();
- }
-
- if( session == null )
- break;
-
- try
- {
- flush( session );
- }
- catch( IOException e )
- {
- session.getManagerFilterChain().exceptionCaught( session, e );
- }
- }
- }
-
- private void flush( DatagramSession session ) throws IOException
- {
- DatagramChannel ch = session.getChannel();
-
- Queue writeRequestQueue = session.getWriteRequestQueue();
-
- WriteRequest req;
- for( ;; )
- {
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
-
- if( req == null )
- break;
-
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
- {
- // pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- session.getManagerFilterChain().messageSent( session, buf );
- continue;
- }
-
- int pos = buf.position();
- int writtenBytes = ch
- .send( buf.buf(), session.getRemoteAddress() );
-
- SelectionKey key = session.getSelectionKey();
- if( writtenBytes == 0 )
- {
- // Kernel buffer is full
- key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
- }
- else if( writtenBytes > 0 )
- {
- key.interestOps( key.interestOps()
- & ( ~SelectionKey.OP_WRITE ) );
-
- // pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- session.increaseWrittenBytes( writtenBytes );
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
- }
- }
+ stub.unbind( address, disconnectClients );
}
- private void registerNew()
+ public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
{
- if( registerQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- break;
-
- DatagramChannel ch = null;
- try
- {
- ch = DatagramChannel.open();
- ch.configureBlocking( false );
- ch.socket().bind( req.address );
- ch.register( selector, SelectionKey.OP_READ, req );
- channels.put( req.address, ch );
- }
- catch( Throwable t )
- {
- req.exception = t;
- }
- finally
- {
- synchronized( req )
- {
- req.done = true;
- req.notify();
- }
-
- if( ch != null && req.exception != null )
- {
- try
- {
- ch.close();
- }
- catch( Throwable e )
- {
- exceptionMonitor.exceptionCaught( this, e );
- }
- }
- }
- }
+ return stub.newSession( remoteAddress, localAddress );
}
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- CancellationRequest request;
- synchronized( cancelQueue )
- {
- request = ( CancellationRequest ) cancelQueue.pop();
- }
-
- if( request == null )
- {
- break;
- }
-
- DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
- // close the channel
- try
- {
- if( ch == null )
- {
- request.exception = new IllegalArgumentException(
- "Address not bound: " + request.address );
- }
- else
- {
- SelectionKey key = ch.keyFor( selector );
- key.cancel();
- selector.wakeup(); // wake up again to trigger thread death
- ch.close();
- }
- }
- catch( Throwable t )
- {
- exceptionMonitor.exceptionCaught( this, t );
- }
- finally
- {
- synchronized( request )
- {
- request.done = true;
- request.notify();
- }
- }
- }
- }
-
- /**
- * This method just returns dummy configuration instance because datagram
- * transport type (UDP/IP) doesn't provide any configuration.
- */
public IoSessionManagerConfig getConfig()
{
- return new BaseIoAcceptorConfig();
+ return stub.getConfig();
}
public IoFilterChain getFilterChain()
{
- return filters;
+ return stub.getFilterChain();
}
- private static class RegistrationRequest
+ public ExceptionMonitor getExceptionMonitor()
{
- private final SocketAddress address;
-
- private final IoHandler handler;
-
- private Throwable exception;
-
- private boolean done;
-
- private RegistrationRequest( SocketAddress address, IoHandler handler )
- {
- this.address = address;
- this.handler = handler;
- }
+ return stub.getExceptionMonitor();
}
- private static class CancellationRequest
+ public void setExceptionMonitor( ExceptionMonitor monitor )
{
- private final SocketAddress address;
- private boolean done;
- private RuntimeException exception;
-
- private CancellationRequest( SocketAddress address )
- {
- this.address = address;
- }
+ stub.setExceptionMonitor( monitor );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Wed Oct 19 01:56:19 2005
@@ -19,24 +19,15 @@
package org.apache.mina.transport.socket.nio;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Set;
-import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSessionManagerConfig;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoConnectorConfig;
-import org.apache.mina.common.support.IoSessionManagerFilterChain;
-import org.apache.mina.util.Queue;
+import org.apache.mina.transport.socket.nio.support.DatagramConnectorImpl;
/**
* {@link IoConnector} for datagram transport (UDP/IP).
@@ -44,24 +35,9 @@
* @author Trustin Lee (trustin@apache.org)
* @version $Rev$, $Date$
*/
-public class DatagramConnector extends DatagramSessionManager implements IoConnector
+public class DatagramConnector implements IoConnector
{
- private static volatile int nextId = 0;
-
- private final IoSessionManagerFilterChain filters =
- new DatagramSessionManagerFilterChain( this );
-
- private final int id = nextId ++ ;
-
- private Selector selector;
-
- private final Queue registerQueue = new Queue();
-
- private final Queue cancelQueue = new Queue();
-
- private final Queue flushingSessions = new Queue();
-
- private Worker worker;
+ private final IoConnector stub = new DatagramConnectorImpl();
/**
* Creates a new instance.
@@ -72,423 +48,18 @@
public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
{
- return connect( address, null, handler);
+ return stub.connect( address, handler );
}
public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
IoHandler handler ) throws IOException
{
- if( address == null )
- throw new NullPointerException( "address" );
- if( handler == null )
- throw new NullPointerException( "handler" );
-
- if( !( address instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
-
- if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
- {
- throw new IllegalArgumentException( "Unexpected local address type: "
- + localAddress.getClass() );
- }
-
- DatagramChannel ch = DatagramChannel.open();
- boolean initialized = false;
- try
- {
- ch.socket().setReuseAddress( true );
- if( localAddress != null )
- {
- ch.socket().bind( localAddress );
- }
- ch.connect( address );
- ch.configureBlocking( false );
- initialized = true;
- }
- finally
- {
- if( !initialized )
- {
- ch.close();
- }
- }
-
- RegistrationRequest request = new RegistrationRequest( ch, handler );
- synchronized( this )
- {
- synchronized( registerQueue )
- {
- registerQueue.push( request );
- }
- startupWorker();
- }
-
- selector.wakeup();
- return request;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- void closeSession( DatagramSession session )
- {
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- // IOException is thrown only when Worker thread is not
- // running and failed to open a selector. We simply return
- // silently here because it we can simply conclude that
- // this session is not managed by this connector or
- // already closed.
- return;
- }
-
- synchronized( cancelQueue )
- {
- cancelQueue.push( session );
- }
- }
-
- selector.wakeup();
- }
-
- void flushSession( DatagramSession session )
- {
- scheduleFlush( session );
- selector.wakeup();
- }
-
- private void scheduleFlush( DatagramSession session )
- {
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( "DatagramAcceptor-" + id );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select();
-
- registerNew();
-
- if( nKeys > 0 )
- {
- processReadySessions( selector.selectedKeys() );
- }
-
- flushSessions();
- cancelKeys();
-
- if( selector.keys().isEmpty() )
- {
- synchronized( DatagramConnector.this )
- {
- if( selector.keys().isEmpty() &&
- registerQueue.isEmpty() &&
- cancelQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( DatagramConnector.this, e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( DatagramConnector.this,
- e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
- }
-
- private void processReadySessions( Set keys )
- {
- Iterator it = keys.iterator();
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
- it.remove();
-
- DatagramSession session = ( DatagramSession ) key.attachment();
-
- if( key.isReadable() )
- {
- readSession( session );
- }
-
- if( key.isWritable() )
- {
- scheduleFlush( session );
- }
- }
- }
-
- private void readSession( DatagramSession session )
- {
-
- ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
- try
- {
- int readBytes = session.getChannel().read( readBuf.buf() );
- if( readBytes > 0 )
- {
- readBuf.flip();
- ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
- newBuf.put( readBuf );
- newBuf.flip();
-
- session.increaseReadBytes( readBytes );
- filters.messageReceived( session, newBuf );
- }
- }
- catch( IOException e )
- {
- filters.exceptionCaught( session, e );
- }
- finally
- {
- readBuf.release();
- }
- }
-
- private void flushSessions()
- {
- if( flushingSessions.size() == 0 )
- return;
-
- for( ;; )
- {
- DatagramSession session;
-
- synchronized( flushingSessions )
- {
- session = ( DatagramSession ) flushingSessions.pop();
- }
-
- if( session == null )
- break;
-
- try
- {
- flush( session );
- }
- catch( IOException e )
- {
- session.getManagerFilterChain().exceptionCaught( session, e );
- }
- }
- }
-
- private void flush( DatagramSession session ) throws IOException
- {
- DatagramChannel ch = session.getChannel();
-
- Queue writeRequestQueue = session.getWriteRequestQueue();
-
- WriteRequest req;
- for( ;; )
- {
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
-
- if( req == null )
- break;
-
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
- {
- // pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- session.getManagerFilterChain().messageSent( session, buf );
- continue;
- }
-
- int pos = buf.position();
- int writtenBytes = ch.write( buf.buf() );
-
- SelectionKey key = session.getSelectionKey();
- if( writtenBytes == 0 )
- {
- // Kernel buffer is full
- key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
- }
- else if( writtenBytes > 0 )
- {
- key.interestOps( key.interestOps()
- & ( ~SelectionKey.OP_WRITE ) );
-
- // pop and fire event
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- session.increaseWrittenBytes( writtenBytes );
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
- }
- }
- }
-
- private void registerNew()
- {
- if( registerQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- RegistrationRequest req;
- synchronized( registerQueue )
- {
- req = ( RegistrationRequest ) registerQueue.pop();
- }
-
- if( req == null )
- break;
-
- DatagramSession session = new DatagramSession(
- filters, req.channel, req.handler );
-
- boolean success = false;
- try
- {
- req.handler.sessionCreated( session );
-
- SelectionKey key = req.channel.register( selector,
- SelectionKey.OP_READ, session );
-
- session.setSelectionKey( key );
-
- req.setSession( session );
- success = true;
- }
- catch( Throwable t )
- {
- req.setException( t );
- }
- finally
- {
- if( !success )
- {
- try
- {
- req.channel.close();
- }
- catch (IOException e)
- {
- exceptionMonitor.exceptionCaught( this, e );
- }
- }
- }
- }
- }
-
- private void cancelKeys()
- {
- if( cancelQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- DatagramSession session;
- synchronized( cancelQueue )
- {
- session = ( DatagramSession ) cancelQueue.pop();
- }
-
- if( session == null )
- break;
- else
- {
- SelectionKey key = session.getSelectionKey();
- DatagramChannel ch = ( DatagramChannel ) key.channel();
- try
- {
- ch.close();
- }
- catch( IOException e )
- {
- exceptionMonitor.exceptionCaught( this, e );
- }
- session.notifyClose();
- key.cancel();
- selector.wakeup(); // wake up again to trigger thread death
- }
- }
+ return stub.connect( address, localAddress, handler );
}
public IoFilterChain getFilterChain()
{
- return filters;
- }
-
- private static class RegistrationRequest extends ConnectFuture
- {
- private final DatagramChannel channel;
-
- private final IoHandler handler;
-
- private RegistrationRequest( DatagramChannel channel,
- IoHandler handler )
- {
- this.channel = channel;
- this.handler = handler;
- }
+ return stub.getFilterChain();
}
/**
@@ -497,6 +68,16 @@
*/
public IoSessionManagerConfig getConfig()
{
- return new BaseIoConnectorConfig();
+ return stub.getConfig();
+ }
+
+ public ExceptionMonitor getExceptionMonitor()
+ {
+ return stub.getExceptionMonitor();
+ }
+
+ public void setExceptionMonitor( ExceptionMonitor monitor )
+ {
+ stub.setExceptionMonitor( monitor );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Wed Oct 19 01:56:19 2005
@@ -36,6 +36,9 @@
import org.apache.mina.common.IoSessionManagerConfig;
import org.apache.mina.common.support.BaseIoAcceptor;
import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
+import org.apache.mina.transport.socket.nio.support.SocketSession;
+import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
import org.apache.mina.util.Queue;
/**
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java Wed Oct 19 01:56:19 2005
@@ -37,6 +37,9 @@
import org.apache.mina.common.support.BaseIoConnectorConfig;
import org.apache.mina.common.support.BaseIoSessionManager;
import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
+import org.apache.mina.transport.socket.nio.support.SocketSession;
+import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
import org.apache.mina.util.ExceptionUtil;
import org.apache.mina.util.Queue;
Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java?rev=326489&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java Wed Oct 19 01:56:19 2005
@@ -0,0 +1,616 @@
+/*
+ * @(#) $Id: DatagramAcceptor.java 326451 2005-10-19 08:07:09Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerConfig;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.common.support.BaseIoAcceptorConfig;
+import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for datagram transport (UDP/IP).
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev: 326451 $, $Date: 2005-10-19 17:07:09 +0900 (ì, 19 10 2005) $
+ */
+public class DatagramAcceptorImpl extends BaseIoAcceptor implements IoAcceptor, DatagramSessionManager
+{
+ private static volatile int nextId = 0;
+
+ private final IoSessionManagerFilterChain filters =
+ new DatagramSessionManagerFilterChain( this );
+
+ private final int id = nextId ++ ;
+
+ private Selector selector;
+
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+
+ private final Queue cancelQueue = new Queue();
+
+ private final Queue flushingSessions = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ */
+ public DatagramAcceptorImpl()
+ {
+ }
+
+ public void bind( SocketAddress address, IoHandler handler )
+ throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( !( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+ if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+ throw new IllegalArgumentException( "Unsupported port number: 0" );
+
+ RegistrationRequest request = new RegistrationRequest( address, handler );
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ ExceptionUtil.throwException( request.exception );
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ unbind( address, true );
+ }
+
+ public void unbind( SocketAddress address, boolean disconnectClients )
+ {
+ // FIXME: Make this work properly when disconnectClients is true.
+ if( address == null )
+ throw new NullPointerException( "address" );
+
+ CancellationRequest request = new CancellationRequest( address );
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+ }
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+ throw request.exception;
+ }
+ }
+
+ public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
+ {
+ if( remoteAddress == null )
+ {
+ throw new NullPointerException( "remoteAddress" );
+ }
+ if( localAddress == null )
+ {
+ throw new NullPointerException( "localAddress" );
+ }
+
+ Selector selector = this.selector;
+ DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
+ if( selector == null || ch == null )
+ {
+ throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
+ }
+
+ SelectionKey key = ch.keyFor( selector );
+ if( key == null )
+ {
+ throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress );
+ }
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ DatagramSession s = new DatagramSession( filters, ch, req.handler );
+ s.setRemoteAddress( remoteAddress );
+ s.setSelectionKey( key );
+
+ try
+ {
+ req.handler.sessionCreated( s );
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( this, t );
+ }
+
+ return s;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ public void flushSession( DatagramSession session )
+ {
+ scheduleFlush( session );
+ selector.wakeup();
+ }
+
+ public void closeSession( DatagramSession session )
+ {
+ }
+
+ private void scheduleFlush( DatagramSession session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "DatagramAcceptor-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processReadySessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( DatagramAcceptorImpl.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramAcceptorImpl.this, e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramAcceptorImpl.this,
+ e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ it.remove();
+
+ DatagramChannel ch = ( DatagramChannel ) key.channel();
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ DatagramSession session = new DatagramSession(
+ filters, ch, req.handler );
+ session.setSelectionKey( key );
+
+ try
+ {
+ req.handler.sessionCreated( session );
+
+ if( key.isReadable() )
+ {
+ readSession( session );
+ }
+
+ if( key.isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( this, t );
+ }
+ }
+ }
+
+ private void readSession( DatagramSession session )
+ {
+
+ ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+ try
+ {
+ SocketAddress remoteAddress = session.getChannel().receive(
+ readBuf.buf() );
+ if( remoteAddress != null )
+ {
+ readBuf.flip();
+ session.setRemoteAddress( remoteAddress );
+
+ ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+ newBuf.put( readBuf );
+ newBuf.flip();
+
+ session.increaseReadBytes( newBuf.remaining() );
+ filters.messageReceived( session, newBuf );
+ }
+ }
+ catch( IOException e )
+ {
+ filters.exceptionCaught( session, e );
+ }
+ finally
+ {
+ readBuf.release();
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ DatagramSession session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( DatagramSession ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( DatagramSession session ) throws IOException
+ {
+ DatagramChannel ch = session.getChannel();
+
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ WriteRequest req;
+ for( ;; )
+ {
+ synchronized( writeRequestQueue )
+ {
+ req = ( WriteRequest ) writeRequestQueue.first();
+ }
+
+ if( req == null )
+ break;
+
+ ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+ if( buf.remaining() == 0 )
+ {
+ // pop and fire event
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ session.getManagerFilterChain().messageSent( session, buf );
+ continue;
+ }
+
+ int pos = buf.position();
+ int writtenBytes = ch
+ .send( buf.buf(), session.getRemoteAddress() );
+
+ SelectionKey key = session.getSelectionKey();
+ if( writtenBytes == 0 )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ }
+ else if( writtenBytes > 0 )
+ {
+ key.interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+
+ // pop and fire event
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenBytes( writtenBytes );
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
+ }
+ }
+ }
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ DatagramChannel ch = null;
+ try
+ {
+ ch = DatagramChannel.open();
+ ch.configureBlocking( false );
+ ch.socket().bind( req.address );
+ ch.register( selector, SelectionKey.OP_READ, req );
+ channels.put( req.address, ch );
+ }
+ catch( Throwable t )
+ {
+ req.exception = t;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+ req.notify();
+ }
+
+ if( ch != null && req.exception != null )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( Throwable e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+ }
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ CancellationRequest request;
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
+ // close the channel
+ try
+ {
+ if( ch == null )
+ {
+ request.exception = new IllegalArgumentException(
+ "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ch.keyFor( selector );
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ ch.close();
+ }
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( this, t );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notify();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method just returns dummy configuration instance because datagram
+ * transport type (UDP/IP) doesn't provide any configuration.
+ */
+ public IoSessionManagerConfig getConfig()
+ {
+ return new BaseIoAcceptorConfig();
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+
+ private static class RegistrationRequest
+ {
+ private final SocketAddress address;
+
+ private final IoHandler handler;
+
+ private Throwable exception;
+
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler )
+ {
+ this.address = address;
+ this.handler = handler;
+ }
+ }
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java?rev=326489&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java Wed Oct 19 01:56:19 2005
@@ -0,0 +1,503 @@
+/*
+ * @(#) $Id: DatagramConnector.java 326451 2005-10-19 08:07:09Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSessionManagerConfig;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionManager;
+import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for datagram transport (UDP/IP).
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev: 326451 $, $Date: 2005-10-19 17:07:09 +0900 (ì, 19 10 2005) $
+ */
+public class DatagramConnectorImpl extends BaseIoSessionManager implements IoConnector, DatagramSessionManager
+{
+ private static volatile int nextId = 0;
+
+ private final IoSessionManagerFilterChain filters =
+ new DatagramSessionManagerFilterChain( this );
+
+ private final int id = nextId ++ ;
+
+ private Selector selector;
+
+ private final Queue registerQueue = new Queue();
+
+ private final Queue cancelQueue = new Queue();
+
+ private final Queue flushingSessions = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ */
+ public DatagramConnectorImpl()
+ {
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ return connect( address, null, handler);
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler ) throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( !( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+ }
+
+ DatagramChannel ch = DatagramChannel.open();
+ boolean initialized = false;
+ try
+ {
+ ch.socket().setReuseAddress( true );
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+ ch.connect( address );
+ ch.configureBlocking( false );
+ initialized = true;
+ }
+ finally
+ {
+ if( !initialized )
+ {
+ ch.close();
+ }
+ }
+
+ RegistrationRequest request = new RegistrationRequest( ch, handler );
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ public void closeSession( DatagramSession session )
+ {
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply return
+ // silently here because it we can simply conclude that
+ // this session is not managed by this connector or
+ // already closed.
+ return;
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( session );
+ }
+ }
+
+ selector.wakeup();
+ }
+
+ public void flushSession( DatagramSession session )
+ {
+ scheduleFlush( session );
+ selector.wakeup();
+ }
+
+ private void scheduleFlush( DatagramSession session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "DatagramConnector-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processReadySessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( DatagramConnectorImpl.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramConnectorImpl.this, e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramConnectorImpl.this,
+ e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ it.remove();
+
+ DatagramSession session = ( DatagramSession ) key.attachment();
+
+ if( key.isReadable() )
+ {
+ readSession( session );
+ }
+
+ if( key.isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+ }
+
+ private void readSession( DatagramSession session )
+ {
+
+ ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+ try
+ {
+ int readBytes = session.getChannel().read( readBuf.buf() );
+ if( readBytes > 0 )
+ {
+ readBuf.flip();
+ ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+ newBuf.put( readBuf );
+ newBuf.flip();
+
+ session.increaseReadBytes( readBytes );
+ filters.messageReceived( session, newBuf );
+ }
+ }
+ catch( IOException e )
+ {
+ filters.exceptionCaught( session, e );
+ }
+ finally
+ {
+ readBuf.release();
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ DatagramSession session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( DatagramSession ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( DatagramSession session ) throws IOException
+ {
+ DatagramChannel ch = session.getChannel();
+
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ WriteRequest req;
+ for( ;; )
+ {
+ synchronized( writeRequestQueue )
+ {
+ req = ( WriteRequest ) writeRequestQueue.first();
+ }
+
+ if( req == null )
+ break;
+
+ ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+ if( buf.remaining() == 0 )
+ {
+ // pop and fire event
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ session.getManagerFilterChain().messageSent( session, buf );
+ continue;
+ }
+
+ int pos = buf.position();
+ int writtenBytes = ch.write( buf.buf() );
+
+ SelectionKey key = session.getSelectionKey();
+ if( writtenBytes == 0 )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ }
+ else if( writtenBytes > 0 )
+ {
+ key.interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+
+ // pop and fire event
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenBytes( writtenBytes );
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
+ }
+ }
+ }
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ DatagramSession session = new DatagramSession(
+ filters, req.channel, req.handler );
+
+ boolean success = false;
+ try
+ {
+ req.handler.sessionCreated( session );
+
+ SelectionKey key = req.channel.register( selector,
+ SelectionKey.OP_READ, session );
+
+ session.setSelectionKey( key );
+
+ req.setSession( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ req.setException( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ try
+ {
+ req.channel.close();
+ }
+ catch (IOException e)
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+ }
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ DatagramSession session;
+ synchronized( cancelQueue )
+ {
+ session = ( DatagramSession ) cancelQueue.pop();
+ }
+
+ if( session == null )
+ break;
+ else
+ {
+ SelectionKey key = session.getSelectionKey();
+ DatagramChannel ch = ( DatagramChannel ) key.channel();
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ session.notifyClose();
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ }
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+
+ private static class RegistrationRequest extends ConnectFuture
+ {
+ private final DatagramChannel channel;
+
+ private final IoHandler handler;
+
+ private RegistrationRequest( DatagramChannel channel,
+ IoHandler handler )
+ {
+ this.channel = channel;
+ this.handler = handler;
+ }
+ }
+
+ /**
+ * This method just returns dummy configuration instance because datagram
+ * transport type (UDP/IP) doesn't provide any configuration.
+ */
+ public IoSessionManagerConfig getConfig()
+ {
+ return new BaseIoConnectorConfig();
+ }
+}
Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java (from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java&r1=326451&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java Wed Oct 19 01:56:19 2005
@@ -16,7 +16,7 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
@@ -50,7 +50,7 @@
private final DatagramChannel ch;
- private final DatagramSessionConfig config;
+ private final DatagramSessionConfigImpl config;
private final Queue writeRequestQueue;
@@ -73,7 +73,7 @@
this.managerFilterChain = managerFilterChain;
this.filterChain = new IoSessionFilterChain( this, managerFilterChain );
this.ch = ch;
- this.config = new DatagramSessionConfig( ch );
+ this.config = new DatagramSessionConfigImpl( ch );
this.writeRequestQueue = new Queue();
this.handler = defaultHandler;
this.remoteAddress = ch.socket().getRemoteSocketAddress();
@@ -110,7 +110,7 @@
return handler;
}
- public void notifyClose()
+ void notifyClose()
{
closeFuture.setClosed( true );
}
@@ -120,9 +120,9 @@
if( !closeFuture.isReady() )
{
IoSessionManager manager = managerFilterChain.getManager();
- if( manager instanceof DatagramConnector )
+ if( manager instanceof DatagramConnectorImpl )
{
- ( ( DatagramConnector ) manager ).closeSession( this );
+ ( ( DatagramConnectorImpl ) manager ).closeSession( this );
}
else
{
Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java?rev=326489&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java Wed Oct 19 01:56:19 2005
@@ -0,0 +1,68 @@
+/*
+ * @(#) $Id: DatagramSessionConfig.java 326451 2005-10-19 08:07:09Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.net.SocketException;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.transport.socket.nio.DatagramAcceptor;
+import org.apache.mina.transport.socket.nio.DatagramConnector;
+import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
+
+/**
+ * A {@link IoSessionConfig} for datagram transport (UDP/IP).
+ * You can downcast {@link IoSessionConfig} instance returned by
+ * {@link IoSession#getConfig()} if you've created datagram session using
+ * {@link DatagramAcceptor} or {@link DatagramConnector}.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev: 326451 $, $Date: 2005-10-19 17:07:09 +0900 (ì, 19 10 2005) $,
+ */
+class DatagramSessionConfigImpl extends BaseIoSessionConfig implements DatagramSessionConfig
+{
+ private final DatagramChannel ch;
+
+ DatagramSessionConfigImpl( DatagramChannel ch )
+ {
+ this.ch = ch;
+ }
+
+ public boolean getReuseAddress() throws SocketException
+ {
+ return ch.socket().getReuseAddress();
+ }
+
+ public void setReuseAddress( boolean on ) throws SocketException
+ {
+ ch.socket().setReuseAddress( on );
+ }
+
+ public int getTrafficClass() throws SocketException
+ {
+ return ch.socket().getTrafficClass();
+ }
+
+ public void setTrafficClass( int tc ) throws SocketException
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+}
\ No newline at end of file
Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java (from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java&r1=326451&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java Wed Oct 19 01:56:19 2005
@@ -16,29 +16,27 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
-import org.apache.mina.common.support.BaseIoSessionManager;
+import org.apache.mina.common.IoSessionManager;
/**
- * A base class for {@link DatagramAcceptor} and {@link DatagramConnector}.
- * Session interacts with this abstract class instead of those two concrete
- * classes.
+ * A base interface for {@link DatagramAcceptorImpl} and {@link DatagramConnectorImpl}.
*
* @author Trustin Lee (trustin@apache.org)
* @version $Rev$, $Date$
*/
-abstract class DatagramSessionManager extends BaseIoSessionManager
+interface DatagramSessionManager extends IoSessionManager
{
/**
* Requests this processor to flush the write buffer of the specified
* session. This method is invoked by MINA internally.
*/
- abstract void flushSession( DatagramSession session );
+ void flushSession( DatagramSession session );
/**
* Requests this processor to close the specified session.
* This method is invoked by MINA internally.
*/
- abstract void closeSession( DatagramSession session );
+ void closeSession( DatagramSession session );
}
Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java (from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java&r1=326451&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java Wed Oct 19 01:56:19 2005
@@ -1,4 +1,4 @@
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (from r326410, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java&r1=326410&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Wed Oct 19 01:56:19 2005
@@ -16,7 +16,7 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
@@ -31,6 +31,7 @@
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.WriteTimeoutException;
import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.Queue;
/**
@@ -40,7 +41,7 @@
* @author Trustin Lee (trustin@apache.org)
* @version $Rev$, $Date$,
*/
-class SocketIoProcessor
+public class SocketIoProcessor
{
private static final SocketIoProcessor instance;
@@ -82,12 +83,12 @@
selector = Selector.open();
}
- static SocketIoProcessor getInstance()
+ public static SocketIoProcessor getInstance()
{
return instance;
}
- void addSession( SocketSession session )
+ public void addSession( SocketSession session )
{
synchronized( this )
{