You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/05/09 11:21:54 UTC
svn commit: r169259 [4/5] - in
/directory/network/branches/api_integration/src:
examples/org/apache/mina/examples/echoserver/
examples/org/apache/mina/examples/httpserver/
examples/org/apache/mina/examples/netcat/
examples/org/apache/mina/examples/reverser/
examples/org/apache/mina/examples/tennis/ java/org/apache/mina/common/
java/org/apache/mina/filter/ java/org/apache/mina/filter/codec/
java/org/apache/mina/handler/ java/org/apache/mina/io/
java/org/apache/mina/io/datagram/ java/org/apache/mina/io/filter/
java/org/apache/mina/io/handler/ java/org/apache/mina/io/socket/
java/org/apache/mina/protocol/ java/org/apache/mina/protocol/codec/
java/org/apache/mina/protocol/filter/
java/org/apache/mina/protocol/handler/ java/org/apache/mina/protocol/io/
java/org/apache/mina/protocol/vmpipe/ java/org/apache/mina/registry/
java/org/apache/mina/transport/ java/org/apache/mina/transport/socket/
java/org/apache/mina/transport/socket/bio/
java/org/apache/mina/transport/socket/nio/
java/org/apache/mina/transport/vmpipe/ java/org/apache/mina/util/
test/org/apache/mina/examples/echoserver/ test/org/apache/mina/io/
test/org/apache/mina/io/datagram/ test/org/apache/mina/io/socket/
test/org/apache/mina/protocol/ test/org/apache/mina/protocol/codec/
test/org/apache/mina/util/
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Mon May 9 02:21:50 2005
@@ -0,0 +1,528 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for datagram transport (UDP/IP).
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
+{
+ private static volatile int nextId = 0;
+
+ private final IoSessionManagerFilterChain filters =
+ new DatagramSessionManagerFilterChain( this );
+
+ private final int id = nextId ++ ;
+
+ private final Selector selector;
+
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+
+ private final Queue cancelQueue = new Queue();
+
+ private final Queue flushingSessions = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ *
+ * @throws IOException if failed to open a selector.
+ */
+ public DatagramAcceptor() throws IOException
+ {
+ selector = Selector.open();
+ }
+
+ public void bind( SocketAddress address, IoHandler handler )
+ throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( !( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+ if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+ throw new IllegalArgumentException( "Unsupported port number: 0" );
+
+ RegistrationRequest request = new RegistrationRequest( address, handler );
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ ExceptionUtil.throwException( request.exception );
+ }
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+
+ CancellationRequest request = new CancellationRequest( address );
+ synchronized( this )
+ {
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+ startupWorker();
+ }
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+ throw request.exception;
+ }
+ }
+
+ private synchronized void startupWorker()
+ {
+ if( worker == null )
+ {
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ void flushSession( DatagramSession session )
+ {
+ scheduleFlush( session );
+ selector.wakeup();
+ }
+
+ void closeSession( DatagramSession session )
+ {
+ }
+
+ private void scheduleFlush( DatagramSession session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "DatagramAcceptor-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processReadySessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( DatagramAcceptor.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
+ e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ it.remove();
+
+ DatagramChannel ch = ( DatagramChannel ) key.channel();
+
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ DatagramSession session = new DatagramSession(
+ filters, ch, req.handler );
+ session.setSelectionKey( key );
+
+ try
+ {
+ req.handler.sessionCreated( session );
+
+ if( key.isReadable() )
+ {
+ readSession( session );
+ }
+
+ if( key.isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( this, t );
+ }
+ }
+ }
+
+ private void readSession( DatagramSession session )
+ {
+
+ ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+ try
+ {
+ SocketAddress remoteAddress = session.getChannel().receive(
+ readBuf.buf() );
+ if( remoteAddress != null )
+ {
+ readBuf.flip();
+ session.setRemoteAddress( remoteAddress );
+
+ ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+ newBuf.put( readBuf );
+ newBuf.flip();
+
+ session.increaseReadBytes( newBuf.remaining() );
+ filters.messageReceived( session, newBuf );
+ }
+ }
+ catch( IOException e )
+ {
+ filters.exceptionCaught( session, e );
+ }
+ finally
+ {
+ readBuf.release();
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ DatagramSession session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( DatagramSession ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( DatagramSession session ) throws IOException
+ {
+ DatagramChannel ch = session.getChannel();
+
+ Queue writeBufferQueue = session.getWriteBufferQueue();
+
+ ByteBuffer buf;
+ for( ;; )
+ {
+ synchronized( writeBufferQueue )
+ {
+ buf = ( ByteBuffer ) writeBufferQueue.first();
+ }
+
+ if( buf == null )
+ break;
+
+ if( buf.remaining() == 0 )
+ {
+ // pop and fire event
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ }
+
+ // FIXME buffer must be released after messageSent is fired.
+ session.getManagerFilterChain().messageSent( session, buf );
+ continue;
+ }
+
+ int writtenBytes = ch
+ .send( buf.buf(), session.getRemoteAddress() );
+
+ SelectionKey key = session.getSelectionKey();
+ if( writtenBytes == 0 )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ }
+ else if( writtenBytes > 0 )
+ {
+ key.interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+
+ // pop and fire event
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ }
+
+ session.increaseWrittenBytes( writtenBytes );
+ session.getManagerFilterChain().messageSent( session, buf );
+ }
+ }
+ }
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ DatagramChannel ch = null;
+ try
+ {
+ ch = DatagramChannel.open();
+ ch.configureBlocking( false );
+ ch.socket().bind( req.address );
+ ch.register( selector, SelectionKey.OP_READ, req );
+ channels.put( req.address, ch );
+ }
+ catch( Throwable t )
+ {
+ req.exception = t;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+ req.notify();
+ }
+
+ if( ch != null && req.exception != null )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( Throwable e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+ }
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ CancellationRequest request;
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
+ // close the channel
+ try
+ {
+ if( ch == null )
+ {
+ request.exception = new IllegalArgumentException(
+ "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ch.keyFor( selector );
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ ch.close();
+ }
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( this, t );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notify();
+ }
+ }
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+
+ private static class RegistrationRequest
+ {
+ private final SocketAddress address;
+
+ private final IoHandler handler;
+
+ private Throwable exception;
+
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler )
+ {
+ this.address = address;
+ this.handler = handler;
+ }
+ }
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Mon May 9 02:21:50 2005
@@ -0,0 +1,501 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for datagram transport (UDP/IP).
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramConnector extends DatagramSessionManager implements IoConnector
+{
+ private static volatile int nextId = 0;
+
+ private final IoSessionManagerFilterChain filters =
+ new DatagramSessionManagerFilterChain( this );
+
+ private final int id = nextId ++ ;
+
+ private final Selector selector;
+
+ private final Queue registerQueue = new Queue();
+
+ private final Queue cancelQueue = new Queue();
+
+ private final Queue flushingSessions = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ *
+ * @throws IOException if failed to open a selector
+ */
+ public DatagramConnector() throws IOException
+ {
+ selector = Selector.open();
+ }
+
+ public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ return connect( address, null, handler);
+ }
+
+ public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
+ {
+ return connect( address, null, handler );
+ }
+
+ public IoSession connect( SocketAddress address, SocketAddress localAddress, int timeout, IoHandler handler ) throws IOException
+ {
+ return connect( address, localAddress, handler );
+ }
+
+ public IoSession connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler ) throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( !( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+ }
+
+ DatagramChannel ch = DatagramChannel.open();
+ boolean initialized = false;
+ try
+ {
+ ch.socket().setReuseAddress( true );
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+ ch.connect( address );
+ ch.configureBlocking( false );
+ initialized = true;
+ }
+ finally
+ {
+ if( !initialized )
+ {
+ ch.close();
+ }
+ }
+
+ RegistrationRequest request = new RegistrationRequest( ch, handler );
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ ExceptionUtil.throwException( request.exception );
+ }
+
+ return request.session;
+ }
+
+ private synchronized void startupWorker()
+ {
+ if( worker == null )
+ {
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ void closeSession( DatagramSession session )
+ {
+ synchronized( this )
+ {
+ SelectionKey key = session.getSelectionKey();
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( key );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+ }
+
+ void flushSession( DatagramSession session )
+ {
+ scheduleFlush( session );
+ selector.wakeup();
+ }
+
+ private void scheduleFlush( DatagramSession session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "DatagramAcceptor-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processReadySessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ cancelKeys();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( DatagramConnector.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( DatagramConnector.this,
+ e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private void processReadySessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ it.remove();
+
+ DatagramSession session = ( DatagramSession ) key.attachment();
+
+ if( key.isReadable() )
+ {
+ readSession( session );
+ }
+
+ if( key.isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+ }
+
+ private void readSession( DatagramSession session )
+ {
+
+ ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+ try
+ {
+ int readBytes = session.getChannel().read( readBuf.buf() );
+ if( readBytes > 0 )
+ {
+ readBuf.flip();
+ ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+ newBuf.put( readBuf );
+ newBuf.flip();
+
+ session.increaseReadBytes( readBytes );
+ filters.messageReceived( session, newBuf );
+ }
+ }
+ catch( IOException e )
+ {
+ filters.exceptionCaught( session, e );
+ }
+ finally
+ {
+ readBuf.release();
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ DatagramSession session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( DatagramSession ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( DatagramSession session ) throws IOException
+ {
+ DatagramChannel ch = session.getChannel();
+
+ Queue writeBufferQueue = session.getWriteBufferQueue();
+
+ ByteBuffer buf;
+ for( ;; )
+ {
+ synchronized( writeBufferQueue )
+ {
+ buf = ( ByteBuffer ) writeBufferQueue.first();
+ }
+
+ if( buf == null )
+ break;
+
+ if( buf.remaining() == 0 )
+ {
+ // pop and fire event
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ }
+
+ // FIXME buffer must be released after messageSent is fired.
+ session.getManagerFilterChain().messageSent( session, buf );
+ continue;
+ }
+
+ int writtenBytes = ch.write( buf.buf() );
+
+ SelectionKey key = session.getSelectionKey();
+ if( writtenBytes == 0 )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ }
+ else if( writtenBytes > 0 )
+ {
+ key.interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+
+ // pop and fire event
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ }
+
+ session.increaseWrittenBytes( writtenBytes );
+ session.getManagerFilterChain().messageSent( session, buf );
+ }
+ }
+ }
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ DatagramSession session = new DatagramSession(
+ filters, req.channel, req.handler );
+
+ try
+ {
+ req.handler.sessionCreated( session );
+
+ SelectionKey key = req.channel.register( selector,
+ SelectionKey.OP_READ, session );
+
+ session.setSelectionKey( key );
+ }
+ catch( Throwable t )
+ {
+ req.exception = t;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+ req.session = session;
+ req.notify();
+ }
+
+ if( req.exception != null )
+ {
+ try
+ {
+ req.channel.close();
+ }
+ catch (IOException e)
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+ }
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SelectionKey key;
+ synchronized( cancelQueue )
+ {
+ key = ( SelectionKey ) cancelQueue.pop();
+ }
+
+ if( key == null )
+ break;
+ else
+ {
+ DatagramChannel ch = ( DatagramChannel ) key.channel();
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ key.cancel();
+ selector.wakeup(); // wake up again to trigger thread death
+ }
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+
+ private static class RegistrationRequest
+ {
+ private final DatagramChannel channel;
+
+ private final IoHandler handler;
+
+ private boolean done;
+
+ private DatagramSession session;
+
+ private Throwable exception;
+
+ private RegistrationRequest( DatagramChannel channel,
+ IoHandler handler )
+ {
+ this.channel = channel;
+ this.handler = handler;
+ }
+ }
+}
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java Mon May 9 02:21:50 2005
@@ -0,0 +1,185 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.common.BaseIoSession;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoSessionFilterChain;
+import org.apache.mina.common.IoSessionManager;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoSession} for datagram transport (UDP/IP).
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+class DatagramSession extends BaseIoSession implements IoSession
+{
+ private final IoSessionManagerFilterChain managerFilterChain;
+
+ private final IoSessionFilterChain filterChain;
+
+ private final DatagramChannel ch;
+
+ private final DatagramSessionConfig config;
+
+ private final Queue writeBufferQueue;
+
+ private final IoHandler handler;
+
+ private final SocketAddress localAddress;
+
+ private SocketAddress remoteAddress;
+
+ private SelectionKey key;
+
+ private boolean disposed;
+
+ /**
+ * Creates a new instance.
+ */
+ DatagramSession( IoSessionManagerFilterChain managerFilterChain,
+ DatagramChannel ch, IoHandler defaultHandler )
+ {
+ this.managerFilterChain = managerFilterChain;
+ this.filterChain = new IoSessionFilterChain( managerFilterChain );
+ this.ch = ch;
+ this.config = new DatagramSessionConfig( ch );
+ this.writeBufferQueue = new Queue();
+ this.handler = defaultHandler;
+ this.remoteAddress = ch.socket().getRemoteSocketAddress();
+ this.localAddress = ch.socket().getLocalSocketAddress();
+ }
+
+ IoSessionManagerFilterChain getManagerFilterChain()
+ {
+ return managerFilterChain;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filterChain;
+ }
+
+ DatagramChannel getChannel()
+ {
+ return ch;
+ }
+
+ SelectionKey getSelectionKey()
+ {
+ return key;
+ }
+
+ void setSelectionKey( SelectionKey key )
+ {
+ this.key = key;
+ }
+
+ public IoHandler getHandler()
+ {
+ return handler;
+ }
+
+ synchronized void notifyClose()
+ {
+ if( !disposed )
+ {
+ disposed = true;
+ notify();
+ }
+ }
+
+ public synchronized void close( boolean wait )
+ {
+ if( disposed )
+ {
+ return;
+ }
+
+ IoSessionManager manager = managerFilterChain.getManager();
+ if( manager instanceof DatagramConnector )
+ {
+ ( ( DatagramConnector ) manager ).closeSession( this );
+ if( wait )
+ {
+ while( disposed )
+ {
+ try
+ {
+ wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ Queue getWriteBufferQueue()
+ {
+ return writeBufferQueue;
+ }
+
+ public void write( Object message )
+ {
+ filterChain.filterWrite( this, message );
+ }
+
+ public TransportType getTransportType()
+ {
+ return TransportType.DATAGRAM;
+ }
+
+ public boolean isConnected()
+ {
+ return ch.isConnected();
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return config;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return remoteAddress;
+ }
+
+ void setRemoteAddress( SocketAddress remoteAddress )
+ {
+ this.remoteAddress = remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return localAddress;
+ }
+}
\ No newline at end of file
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java Mon May 9 02:21:50 2005
@@ -0,0 +1,65 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.SocketException;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.common.BaseIoSessionConfig;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link SessionConfig} for datagram transport (UDP/IP).
+ * You can downcast {@link SessionConfig} instance returned by
+ * {@link IoSession#getConfig()} or {@link IoSession#getConfig()}
+ * if you've created datagram session using {@link DatagramAcceptor} or
+ * {@link DatagramConnector}.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class DatagramSessionConfig extends BaseIoSessionConfig
+{
+ private final DatagramChannel ch;
+
+ DatagramSessionConfig( DatagramChannel ch )
+ {
+ this.ch = ch;
+ }
+
+ public boolean getReuseAddress() throws SocketException
+ {
+ return ch.socket().getReuseAddress();
+ }
+
+ public void setReuseAddress( boolean on ) throws SocketException
+ {
+ ch.socket().setReuseAddress( on );
+ }
+
+ public int getTrafficClass() throws SocketException
+ {
+ return ch.socket().getTrafficClass();
+ }
+
+ public void setTrafficClass( int tc ) throws SocketException
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+}
\ No newline at end of file
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java Mon May 9 02:21:50 2005
@@ -0,0 +1,45 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.BaseIoSessionManager;
+import org.apache.mina.common.IoSessionManager;
+
+/**
+ * A base class for {@link DatagramAcceptor} and {@link DatagramConnector}.
+ * Session interacts with this abstract class instead of those two concrete
+ * classes.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+abstract class DatagramSessionManager extends BaseIoSessionManager implements IoSessionManager
+{
+ /**
+ * Requests this processor to flush the write buffer of the specified
+ * session. This method is invoked by MINA internally.
+ */
+ abstract void flushSession( DatagramSession session );
+
+ /**
+ * Requests this processor to close the specified session.
+ * This method is invoked by MINA internally.
+ */
+ abstract void closeSession( DatagramSession session );
+}
\ No newline at end of file
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java Mon May 9 02:21:50 2005
@@ -0,0 +1,32 @@
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for datagram transport (UDP/IP).
+ *
+ * @author The Apache Directory Project
+ */
+class DatagramSessionManagerFilterChain extends IoSessionManagerFilterChain {
+
+ DatagramSessionManagerFilterChain( DatagramSessionManager processor )
+ {
+ super( processor );
+ }
+
+ protected void doWrite( IoSession session, Object message )
+ {
+ DatagramSession s = ( DatagramSession ) session;
+ Queue writeBufferQueue = s.getWriteBufferQueue();
+
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.push( message );
+ }
+
+ ( ( DatagramSessionManager ) getManager() ).flushSession( s );
+ }
+}
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Mon May 9 02:21:50 2005
@@ -0,0 +1,471 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.BaseIoSessionManager;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class SocketAcceptor extends BaseIoSessionManager implements IoAcceptor
+{
+ private static volatile int nextId = 0;
+
+ private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
+
+ private final int id = nextId ++ ;
+
+ private final Selector selector;
+
+ private final Map channels = new HashMap();
+
+ private final Queue registerQueue = new Queue();
+
+ private final Queue cancelQueue = new Queue();
+
+ private int backlog = 50;
+
+ private Worker worker;
+
+
+ /**
+ * Creates a new instance.
+ *
+ * @throws IOException
+ */
+ public SocketAcceptor() throws IOException
+ {
+ selector = Selector.open();
+ }
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming
+ * connections with the specified <code>handler</code>. Backlog value
+ * is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+ {
+ throw new IllegalArgumentException( "Unsupported port number: 0" );
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
+
+ synchronized( this )
+ {
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker()
+ {
+ if( worker == null )
+ {
+ worker = new Worker();
+
+ worker.start();
+ }
+ }
+
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ CancellationRequest request = new CancellationRequest( address );
+ synchronized( this )
+ {
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+ }
+
+ /**
+ * Returns the default backlog value which is used when user binds.
+ */
+ public int getBacklog()
+ {
+ return backlog;
+ }
+
+ /**
+ * Sets the default backlog value which is used when user binds.
+ */
+ public void setBacklog( int defaultBacklog )
+ {
+ if( defaultBacklog <= 0 )
+ {
+ throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
+ }
+ this.backlog = defaultBacklog;
+ }
+
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "SocketAcceptor-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+ cancelKeys();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketAcceptor.this )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ try
+ {
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ SocketSession session = new SocketSession( filters, ch, req.handler );
+ req.handler.sessionCreated( session );
+ SocketIoProcessor.getInstance().addSession( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ;; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+ ssc.socket().bind( req.address, req.backlog );
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ channels.put( req.address, ssc );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notify();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ;; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+
+ request.notify();
+ }
+ }
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+
+ private static class RegistrationRequest
+ {
+ private final SocketAddress address;
+
+ private final int backlog;
+
+ private final IoHandler handler;
+
+ private IOException exception;
+
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
+ {
+ this.address = address;
+ this.backlog = backlog;
+ this.handler = handler;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+
+ private boolean done;
+
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java Mon May 9 02:21:50 2005
@@ -0,0 +1,381 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.BaseIoSessionManager;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketConnector extends BaseIoSessionManager implements IoConnector
+{
+ private static volatile int nextId = 0;
+
+ private final int id = nextId++;
+
+ private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
+
+ private final Selector selector;
+
+ private final Queue connectQueue = new Queue();
+
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ *
+ * @throws IOException
+ */
+ public SocketConnector() throws IOException
+ {
+ selector = Selector.open();
+ }
+
+ public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
+ {
+ return connect( address, null, Integer.MAX_VALUE, handler);
+ }
+
+ public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
+ {
+ return connect( address, localAddress, Integer.MAX_VALUE, handler);
+ }
+
+ public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
+ {
+ return connect( address, null, timeout, handler);
+ }
+
+ public IoSession connect( SocketAddress address, SocketAddress localAddress,
+ int timeout, IoHandler handler ) throws IOException
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( timeout <= 0 )
+ throw new IllegalArgumentException( "Illegal timeout: " + timeout );
+
+ if( ! ( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+
+ SocketChannel ch = SocketChannel.open();
+ boolean success = false;
+ try
+ {
+ ch.socket().setReuseAddress( true );
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+
+ ch.configureBlocking( false );
+
+ if( ch.connect( address ) )
+ {
+ SocketSession session = newSession( ch, handler );
+ success = true;
+ return session;
+ }
+
+ success = true;
+ }
+ finally
+ {
+ if( !success )
+ {
+ ch.close();
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
+ synchronized( this )
+ {
+ synchronized( connectQueue )
+ {
+ connectQueue.push( request );
+ }
+ startupWorker();
+ }
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ ExceptionUtil.throwException( request.exception );
+ }
+
+ return request.session;
+ }
+
+ private synchronized void startupWorker()
+ {
+ if( worker == null )
+ {
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ private void registerNew()
+ {
+ if( connectQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ ConnectionRequest req;
+ synchronized( connectQueue )
+ {
+ req = ( ConnectionRequest ) connectQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register( selector, SelectionKey.OP_CONNECT, req );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ synchronized( req )
+ {
+ req.done = true;
+ req.notify();
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isConnectable() )
+ continue;
+
+ SocketChannel ch = ( SocketChannel ) key.channel();
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ try
+ {
+ ch.finishConnect();
+ SocketSession session = newSession( ch, entry.handler );
+ entry.session = session;
+ }
+ catch( Throwable e )
+ {
+ entry.exception = e;
+ }
+ finally
+ {
+ key.cancel();
+ if( entry.session == null )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( this, e );
+ }
+ }
+
+ synchronized( entry )
+ {
+ entry.done = true;
+ entry.notify();
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions( Set keys )
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isValid() )
+ continue;
+
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ if( currentTime >= entry.deadline )
+ {
+ entry.exception = new ConnectException();
+ entry.done = true;
+
+ synchronized( entry )
+ {
+ entry.notify();
+ }
+
+ key.cancel();
+ }
+ }
+ }
+
+ private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
+ {
+ SocketSession session = new SocketSession( filters, ch, handler );
+ try
+ {
+ handler.sessionCreated( session );
+ }
+ catch( Throwable e )
+ {
+ ExceptionUtil.throwException( e );
+ }
+ SocketIoProcessor.getInstance().addSession( session );
+ return session;
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "SocketConnector-" + id );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select( 1000 );
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ processTimedOutSessions( selector.keys() );
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketConnector.this )
+ {
+ if( selector.keys().isEmpty() &&
+ connectQueue.isEmpty() )
+ {
+ worker = null;
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ exceptionMonitor.exceptionCaught( SocketConnector.this, e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private static class ConnectionRequest
+ {
+ private final SocketChannel channel;
+
+ private final long deadline;
+
+ private final IoHandler handler;
+
+ private SocketSession session;
+
+ private boolean done;
+
+ private Throwable exception;
+
+ private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
+ {
+ this.channel = channel;
+ this.deadline = System.currentTimeMillis() + timeout * 1000L;
+ this.handler = handler;
+ }
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filters;
+ }
+}
\ No newline at end of file
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Mon May 9 02:21:50 2005
@@ -0,0 +1,544 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.Queue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound.
+ * This class is used by MINA internally.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+class SocketIoProcessor
+{
+ private static final SocketIoProcessor instance;
+
+ static
+ {
+ SocketIoProcessor tmp;
+
+ try
+ {
+ tmp = new SocketIoProcessor();
+ }
+ catch( IOException e )
+ {
+ InternalError error = new InternalError(
+ "Failed to open selector." );
+ error.initCause( e );
+ throw error;
+ }
+
+ instance = tmp;
+ }
+
+ private final Selector selector;
+
+ private final Queue newSessions = new Queue();
+
+ private final Queue removingSessions = new Queue();
+
+ private final Queue flushingSessions = new Queue();
+
+ private final Queue readableSessions = new Queue();
+
+ private Worker worker;
+
+ private long lastIdleCheckTime = System.currentTimeMillis();
+
+ private SocketIoProcessor() throws IOException
+ {
+ selector = Selector.open();
+ }
+
+ static SocketIoProcessor getInstance()
+ {
+ return instance;
+ }
+
+ void addSession( SocketSession session )
+ {
+ synchronized( this )
+ {
+ synchronized( newSessions )
+ {
+ newSessions.push( session );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+ }
+
+ void removeSession( SocketSession session )
+ {
+ scheduleRemove( session );
+ startupWorker();
+ selector.wakeup();
+ }
+
+ private synchronized void startupWorker()
+ {
+ if( worker == null )
+ {
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ void flushSession( SocketSession session )
+ {
+ scheduleFlush( session );
+ selector.wakeup();
+ }
+
+ void addReadableSession( SocketSession session )
+ {
+ synchronized( readableSessions )
+ {
+ readableSessions.push( session );
+ }
+ selector.wakeup();
+ }
+
+ private void addSessions()
+ {
+ if( newSessions.isEmpty() )
+ return;
+
+ SocketSession session;
+
+ for( ;; )
+ {
+ synchronized( newSessions )
+ {
+ session = ( SocketSession ) newSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SocketChannel ch = session.getChannel();
+ boolean registered;
+
+ try
+ {
+ ch.configureBlocking( false );
+ session.setSelectionKey( ch.register( selector,
+ SelectionKey.OP_READ,
+ session ) );
+ registered = true;
+ }
+ catch( IOException e )
+ {
+ registered = false;
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+
+ if( registered )
+ {
+ session.getManagerFilterChain().sessionOpened( session );
+ }
+ }
+ }
+
+ private void removeSessions()
+ {
+ if( removingSessions.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SocketSession session;
+
+ synchronized( removingSessions )
+ {
+ session = ( SocketSession ) removingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getSelectionKey();
+ if( !key.isValid() ) // skip if channel is already closed
+ {
+ continue;
+ }
+
+ try
+ {
+ key.cancel();
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ finally
+ {
+ releaseWriteBuffers( session );
+
+ session.getManagerFilterChain().sessionClosed( session );
+ session.notifyClose();
+ }
+ }
+ }
+
+ private void processSessions( Set selectedKeys )
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ SocketSession session = ( SocketSession ) key.attachment();
+
+ if( key.isReadable() )
+ {
+ read( session );
+ }
+
+ if( key.isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void read( SocketSession session )
+ {
+ ByteBuffer buf = ByteBuffer.allocate(
+ (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() );
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ int readBytes = 0;
+ int ret;
+
+ buf.clear();
+
+ try
+ {
+ while( ( ret = ch.read( buf.buf() ) ) > 0 )
+ {
+ readBytes += ret;
+ }
+ }
+ finally
+ {
+ buf.flip();
+ }
+
+ session.increaseReadBytes( readBytes );
+ session.setIdle( IdleStatus.BOTH_IDLE, false );
+ session.setIdle( IdleStatus.READER_IDLE, false );
+
+ if( readBytes > 0 )
+ {
+ ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
+ newBuf.put( buf );
+ newBuf.flip();
+ session.getManagerFilterChain().messageReceived( session, newBuf );
+ }
+ if( ret < 0 )
+ {
+ scheduleRemove( session );
+ }
+ }
+ catch( Throwable e )
+ {
+ if( e instanceof IOException )
+ scheduleRemove( session );
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ finally
+ {
+ buf.release();
+ }
+ }
+
+ private void scheduleRemove( SocketSession session )
+ {
+ synchronized( removingSessions )
+ {
+ removingSessions.push( session );
+ }
+ }
+
+ private void scheduleFlush( SocketSession session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private void notifyIdleSessions()
+ {
+ Set keys = selector.keys();
+ Iterator it;
+ SocketSession session;
+
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+
+ if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
+ {
+ lastIdleCheckTime = currentTime;
+ it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ session = ( SocketSession ) key.attachment();
+
+ notifyIdleSession( session, currentTime );
+ }
+ }
+ }
+
+ private void notifyIdleSession( SocketSession session, long currentTime )
+ {
+ IoSessionConfig config = session.getConfig();
+
+ notifyIdleSession0( session, currentTime, config
+ .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+ IdleStatus.BOTH_IDLE, session.getLastIoTime() );
+ notifyIdleSession0( session, currentTime, config
+ .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+ IdleStatus.READER_IDLE, session.getLastReadTime() );
+ notifyIdleSession0( session, currentTime, config
+ .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+ IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
+
+ notifyWriteTimeoutSession( session, currentTime, config
+ .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ }
+
+ private void notifyIdleSession0( SocketSession session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime )
+ {
+ if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
+ && ( currentTime - lastIoTime ) >= idleTime )
+ {
+ session.setIdle( status, true );
+ session.getManagerFilterChain().sessionIdle( session, status );
+ }
+ }
+
+ private void notifyWriteTimeoutSession( SocketSession session,
+ long currentTime,
+ long writeTimeout, long lastIoTime )
+ {
+ if( writeTimeout > 0
+ && ( currentTime - lastIoTime ) >= writeTimeout
+ && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
+ {
+ session
+ .getManagerFilterChain()
+ .exceptionCaught( session, new WriteTimeoutException() );
+ }
+ }
+
+ private void flushSessions()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ SocketSession session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( SocketSession ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ if( !session.isConnected() )
+ {
+ releaseWriteBuffers( session );
+ continue;
+ }
+
+ try
+ {
+ flush( session );
+ }
+ catch( IOException e )
+ {
+ scheduleRemove( session );
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void releaseWriteBuffers( SocketSession session )
+ {
+ Queue writeBufferQueue = session.getWriteBufferQueue();
+ ByteBuffer buf;
+
+ while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
+ {
+ try
+ {
+ buf.release();
+ }
+ catch( IllegalStateException e )
+ {
+ session.getManagerFilterChain().exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void flush( SocketSession session ) throws IOException
+ {
+ SocketChannel ch = session.getChannel();
+
+ Queue writeBufferQueue = session.getWriteBufferQueue();
+
+ ByteBuffer buf;
+ for( ;; )
+ {
+ synchronized( writeBufferQueue )
+ {
+ buf = ( ByteBuffer ) writeBufferQueue.first();
+ }
+
+ if( buf == null )
+ break;
+
+ if( buf.remaining() == 0 )
+ {
+ synchronized( writeBufferQueue )
+ {
+ writeBufferQueue.pop();
+ }
+
+ // FIXME buffer is not released
+ session.getManagerFilterChain().messageSent( session, buf );
+ continue;
+ }
+
+ int writtenBytes = 0;
+ try
+ {
+ writtenBytes = ch.write( buf.buf() );
+ }
+ finally
+ {
+ if( writtenBytes > 0 )
+ {
+ session.increaseWrittenBytes( writtenBytes );
+ session.setIdle( IdleStatus.BOTH_IDLE, false );
+ session.setIdle( IdleStatus.WRITER_IDLE, false );
+ }
+
+ SelectionKey key = session.getSelectionKey();
+ if( buf.hasRemaining() )
+ {
+ // Kernel buffer is full
+ key
+ .interestOps( key.interestOps()
+ | SelectionKey.OP_WRITE );
+ break;
+ }
+ else
+ {
+ key.interestOps( key.interestOps()
+ & ( ~SelectionKey.OP_WRITE ) );
+ }
+ }
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( "SocketIoProcessor" );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select( 1000 );
+ addSessions();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ flushSessions();
+ removeSessions();
+ notifyIdleSessions();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketIoProcessor.this )
+ {
+ if( selector.keys().isEmpty() &&
+ newSessions.isEmpty() )
+ {
+ worker = null;
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ e.printStackTrace();
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision