You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/12/24 09:57:34 UTC
svn commit: r123285 - incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram
Author: trustin
Date: Fri Dec 24 00:57:32 2004
New Revision: 123285
URL: http://svn.apache.org/viewcvs?view=rev&rev=123285
Log:
* Added DatagramConnector
Added:
incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java (contents, props changed)
Modified:
incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java
incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java
incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java
Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java?view=diff&rev=123285&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java&r1=123284&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java&r2=123285
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java (original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramAcceptor.java Fri Dec 24 00:57:32 2004
@@ -144,6 +144,10 @@
scheduleFlush( session );
selector.wakeup();
}
+
+ public void closeSession( DatagramSession session )
+ {
+ }
private void scheduleFlush( DatagramSession session )
{
@@ -182,12 +186,12 @@
}
}
- cancelKeys();
-
if( nKeys > 0 )
{
processReadySessions( selector.selectedKeys() );
}
+
+ cancelKeys();
flushSessions();
}
catch( IOException e )
@@ -221,9 +225,9 @@
DatagramAcceptor.this,
filterManager,
ch,
- key,
( IoHandler ) key
.attachment() );
+ session.setSelectionKey(key);
if( key.isReadable() )
{
Added: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java?view=auto&rev=123285
==============================================================================
--- (empty file)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramConnector.java Fri Dec 24 00:57:32 2004
@@ -0,0 +1,447 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.io.datagram;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.io.Connector;
+import org.apache.mina.io.ExceptionMonitor;
+import org.apache.mina.io.IoHandler;
+import org.apache.mina.io.IoHandlerFilter;
+import org.apache.mina.io.IoSession;
+import org.apache.mina.util.DefaultExceptionMonitor;
+import org.apache.mina.util.IoHandlerFilterManager;
+import org.apache.mina.util.Queue;
+
+/**
+ * TODO Insert type comment.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramConnector implements Connector, DatagramProcessor
+{
+ private static volatile int nextId = 0;
+
+ private final IoHandlerFilterManager filterManager = new IoHandlerFilterManager();
+
+ private final int id = nextId++;
+
+ private final Selector selector;
+
+ private final Queue registerQueue = new Queue();
+
+ private final Queue cancelQueue = new Queue();
+
+ private final Queue flushingSessions = new Queue();
+
+ private ExceptionMonitor exceptionMonitor = new DefaultExceptionMonitor();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ *
+ * @throws IOException
+ */
+ public DatagramConnector() throws IOException
+ {
+ selector = Selector.open();
+ }
+
+ public IoSession connect( SocketAddress address, IoHandler defaultHandler )
+ throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( defaultHandler == null )
+ throw new NullPointerException( "defaultHandler" );
+
+ if( ! ( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ DatagramChannel ch = DatagramChannel.open();
+ ch.configureBlocking( false );
+ ch.socket().connect( address );
+
+ RegistrationRequest request = new RegistrationRequest( ch,
+ defaultHandler );
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+
+ if( worker == null )
+ {
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( request.session == null )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ return request.session;
+ }
+
+ public IoSession connect( SocketAddress address, int timeout,
+ IoHandler defaultHandler ) throws IOException
+ {
+ return connect( address, defaultHandler );
+ }
+
+ public void closeSession( DatagramSession session )
+ {
+ synchronized( this )
+ {
+ SelectionKey key = session.getSelectionKey();
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( key );
+ }
+ }
+
+ selector.wakeup();
+ }
+
+ public void flushSession( DatagramSession session )
+ {
+ scheduleFlush( session );
+ selector.wakeup();
+ }
+
+ private void scheduleFlush( DatagramSession session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "DatagramAcceptor-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( DatagramConnector.this )
+ {
+ if( selector.keys().isEmpty() )
+ {
+ worker = null;
+ break;
+ }
+ }
+ }
+
+ if( nKeys > 0 )
+ {
+ processReadySessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ cancelKeys();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramConnector.this,
+ e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ it.remove();
+
+ DatagramSession session = ( DatagramSession ) key.attachment();
+
+ if( key.isReadable() )
+ {
+ readSession( session );
+ }
+
+ if( key.isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+ }
+
+ private void readSession( DatagramSession session )
+ {
+
+ ByteBuffer readBuf = ByteBuffer.allocate( 1500 );
+ try
+ {
+ int readBytes = session.getChannel().read( readBuf );
+ if( readBytes > 0 )
+ {
+ readBuf.flip();
+ filterManager.fireDataRead( session, readBuf );
+ }
+ }
+ catch( IOException e )
+ {
+ filterManager.fireExceptionCaught( session, e );
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ DatagramSession session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( DatagramSession ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ session.getFilterManager().fireExceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( DatagramSession session ) throws IOException
+ {
+ DatagramChannel ch = session.getChannel();
+
+ Queue writeBufferQueue = session.getWriteBufferQueue();
+ Queue writeMarkerQueue = session.getWriteMarkerQueue();
+
+ ByteBuffer buf;
+ Object marker;
+ for( ;; )
+ {
+ synchronized( writeBufferQueue )
+ {
+ buf = ( ByteBuffer ) writeBufferQueue.first();
+ marker = writeMarkerQueue.first();
+ }
+
+ if( buf == null )
+ break;
+
+ if( buf.remaining() == 0 )
+ {
+ // pop and fire event
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ writeMarkerQueue.pop();
+ }
+ session.getFilterManager().fireDataWritten( session, marker );
+ continue;
+ }
+
+ int writtenBytes = ch.write( buf );
+
+ SelectionKey key = session.getSelectionKey();
+ if( writtenBytes == 0 )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ }
+ else
+ {
+ key
+ .interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+
+ // pop and fire event
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ writeMarkerQueue.pop();
+ }
+ session.getFilterManager().fireDataWritten( session, marker );
+ }
+ }
+ }
+
+ private void registerNew() throws ClosedChannelException
+ {
+ if( registerQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ DatagramSession session = new DatagramSession( this,
+ filterManager,
+ req.channel,
+ req.handler );
+
+ SelectionKey key = req.channel.register( selector,
+ SelectionKey.OP_READ,
+ session );
+
+ session.setSelectionKey( key );
+
+ synchronized( req )
+ {
+ req.session = session;
+ req.notify();
+ }
+ }
+ }
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SelectionKey key;
+ synchronized( cancelQueue )
+ {
+ key = ( SelectionKey ) cancelQueue.pop();
+ }
+
+ if( key == null )
+ break;
+ else
+ {
+ try
+ {
+ key.channel().close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ }
+ }
+ }
+
+ public void addFilter( int priority, IoHandlerFilter filter )
+ {
+ filterManager.addFilter( priority, filter );
+ }
+
+ public void removeFilter( IoHandlerFilter filter )
+ {
+ filterManager.removeFilter( filter );
+ }
+
+ private static class RegistrationRequest
+ {
+ private final DatagramChannel channel;
+
+ private final IoHandler handler;
+
+ private DatagramSession session;
+
+ private RegistrationRequest( DatagramChannel channel, IoHandler handler )
+ {
+ this.channel = channel;
+ this.handler = handler;
+ }
+ }
+
+ public ExceptionMonitor getExceptionMonitor()
+ {
+ return exceptionMonitor;
+ }
+
+ public void setExceptionMonitor( ExceptionMonitor monitor )
+ {
+ if( monitor == null )
+ {
+ monitor = new DefaultExceptionMonitor();
+ }
+
+ this.exceptionMonitor = monitor;
+ }
+}
\ No newline at end of file
Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java?view=diff&rev=123285&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java&r1=123284&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java&r2=123285
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java (original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramProcessor.java Fri Dec 24 00:57:32 2004
@@ -27,4 +27,6 @@
public interface DatagramProcessor
{
void flushSession( DatagramSession session );
+
+ void closeSession( DatagramSession session );
}
Modified: incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java?view=diff&rev=123285&p1=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java&r1=123284&p2=incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java&r2=123285
==============================================================================
--- incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java (original)
+++ incubator/directory/network/trunk/mina/src/java/org/apache/mina/io/datagram/DatagramSession.java Fri Dec 24 00:57:32 2004
@@ -54,10 +54,10 @@
private final SocketAddress localAddress;
- private final SelectionKey key;
-
private SocketAddress remoteAddress;
+ private SelectionKey key;
+
private Object attachment;
private long readBytes;
@@ -77,8 +77,9 @@
/**
* Creates a new instance.
*/
- DatagramSession( DatagramProcessor parent, IoHandlerFilterManager filterManager, DatagramChannel ch,
- SelectionKey key, IoHandler defaultHandler )
+ DatagramSession( DatagramProcessor parent,
+ IoHandlerFilterManager filterManager, DatagramChannel ch,
+ IoHandler defaultHandler )
{
this.parent = parent;
this.filterManager = filterManager;
@@ -89,7 +90,6 @@
this.handler = defaultHandler;
this.remoteAddress = ch.socket().getRemoteSocketAddress();
this.localAddress = ch.socket().getLocalSocketAddress();
- this.key = key;
}
IoHandlerFilterManager getFilterManager()
@@ -107,6 +107,11 @@
return key;
}
+ void setSelectionKey( SelectionKey key )
+ {
+ this.key = key;
+ }
+
public IoHandler getHandler()
{
return handler;
@@ -176,8 +181,8 @@
{
return remoteAddress;
}
-
- void setRemoteAddress(SocketAddress remoteAddress)
+
+ void setRemoteAddress( SocketAddress remoteAddress )
{
this.remoteAddress = remoteAddress;
}