You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/12/28 03:49:51 UTC
svn commit: r359355 [3/4] - in /directory/network/trunk: ./
src/java/org/apache/mina/common/
src/java/org/apache/mina/transport/socket/nio/support/
src/java/org/apache/mina/transport/vmpipe/support/
src/test/org/apache/mina/common/ xdocs/
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java Tue Dec 27 18:49:31 2005
@@ -1,378 +1,378 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.transport.socket.nio.SocketSessionManager;
-import org.apache.mina.util.ExceptionUtil;
-import org.apache.mina.util.Queue;
-
-/**
- * {@link IoConnector} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class SocketConnectorDelegate extends BaseIoConnector implements SocketSessionManager
-{
- private static volatile int nextId = 0;
-
- private final IoConnector wrapper;
- private final int id = nextId++;
- private final String threadName = "SocketConnector-" + id;
- private Selector selector;
- private final Queue connectQueue = new Queue();
- private Worker worker;
-
- /**
- * Creates a new instance.
- */
- public SocketConnectorDelegate( IoConnector wrapper )
- {
- this.wrapper = wrapper;
- }
-
- public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
- {
- return connect( address, null, handler, filterChainBuilder );
- }
-
- public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
- IoHandler handler, IoFilterChainBuilder filterChainBuilder )
- {
- if( address == null )
- throw new NullPointerException( "address" );
- if( handler == null )
- throw new NullPointerException( "handler" );
-
- if( ! ( address instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected address type: "
- + address.getClass() );
-
- if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
- throw new IllegalArgumentException( "Unexpected local address type: "
- + localAddress.getClass() );
-
- if( filterChainBuilder == null )
- {
- filterChainBuilder = IoFilterChainBuilder.NOOP;
- }
-
- SocketChannel ch = null;
- boolean success = false;
- try
- {
- ch = SocketChannel.open();
- ch.socket().setReuseAddress( true );
- if( localAddress != null )
- {
- ch.socket().bind( localAddress );
- }
-
- ch.configureBlocking( false );
-
- if( ch.connect( address ) )
- {
- SocketSessionImpl session = newSession( ch, handler, filterChainBuilder );
- success = true;
- ConnectFuture future = new ConnectFuture();
- future.setSession( session );
- return future;
- }
-
- success = true;
- }
- catch( IOException e )
- {
- return ConnectFuture.newFailedFuture( e );
- }
- finally
- {
- if( !success && ch != null )
- {
- try
- {
- ch.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
-
- ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler, filterChainBuilder );
- synchronized( this )
- {
- try
- {
- startupWorker();
- }
- catch( IOException e )
- {
- try
- {
- ch.close();
- }
- catch( IOException e2 )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e2 );
- }
-
- return ConnectFuture.newFailedFuture( e );
- }
- synchronized( connectQueue )
- {
- connectQueue.push( request );
- }
- selector.wakeup();
- }
-
- return request;
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- private void registerNew()
- {
- if( connectQueue.isEmpty() )
- return;
-
- for( ;; )
- {
- ConnectionRequest req;
- synchronized( connectQueue )
- {
- req = ( ConnectionRequest ) connectQueue.pop();
- }
-
- if( req == null )
- break;
-
- SocketChannel ch = req.channel;
- try
- {
- ch.register( selector, SelectionKey.OP_CONNECT, req );
- }
- catch( IOException e )
- {
- req.setException( e );
- }
- }
- }
-
- private void processSessions( Set keys )
- {
- Iterator it = keys.iterator();
-
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- if( !key.isConnectable() )
- continue;
-
- SocketChannel ch = ( SocketChannel ) key.channel();
- ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
-
- boolean success = false;
- try
- {
- ch.finishConnect();
- SocketSessionImpl session = newSession( ch, entry.handler, entry.filterChainBuilder );
- entry.setSession( session );
- success = true;
- }
- catch( Throwable e )
- {
- entry.setException( e );
- }
- finally
- {
- key.cancel();
- if( !success )
- {
- try
- {
- ch.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- }
- }
- }
-
- keys.clear();
- }
-
- private void processTimedOutSessions( Set keys )
- {
- long currentTime = System.currentTimeMillis();
- Iterator it = keys.iterator();
-
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
-
- if( !key.isValid() )
- continue;
-
- ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
-
- if( currentTime >= entry.deadline )
- {
- entry.setException( new ConnectException() );
- key.cancel();
- }
- }
- }
-
- private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
- {
- SocketSessionImpl session = new SocketSessionImpl( wrapper, ch, handler );
- try
- {
- this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
- filterChainBuilder.buildFilterChain( session.getFilterChain() );
- ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
- }
- catch( Throwable e )
- {
- ExceptionUtil.throwException( e );
- }
- session.getIoProcessor().addNew( session );
- return session;
- }
-
- public int getProcessors()
- {
- throw new UnsupportedOperationException();
- }
-
- public void setProcessors( int nProcessor )
- {
- throw new UnsupportedOperationException();
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( SocketConnectorDelegate.this.threadName );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select( 1000 );
-
- registerNew();
-
- if( nKeys > 0 )
- {
- processSessions( selector.selectedKeys() );
- }
-
- processTimedOutSessions( selector.keys() );
-
- if( selector.keys().isEmpty() )
- {
- synchronized( SocketConnectorDelegate.this )
- {
- if( selector.keys().isEmpty() &&
- connectQueue.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
- }
-
- private static class ConnectionRequest extends ConnectFuture
- {
- private final SocketChannel channel;
- private final long deadline;
- private final IoHandler handler;
- private final IoFilterChainBuilder filterChainBuilder;
-
- private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
- {
- this.channel = channel;
- this.deadline = System.currentTimeMillis() + timeout * 1000L;
- this.handler = handler;
- this.filterChainBuilder = filterChainBuilder;
- }
- }
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.transport.socket.nio.SocketSessionManager;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketConnectorDelegate extends BaseIoConnector implements SocketSessionManager
+{
+ private static volatile int nextId = 0;
+
+ private final IoConnector wrapper;
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+ private Selector selector;
+ private final Queue connectQueue = new Queue();
+ private Worker worker;
+
+ /**
+ * Creates a new instance.
+ */
+ public SocketConnectorDelegate( IoConnector wrapper )
+ {
+ this.wrapper = wrapper;
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+ {
+ return connect( address, null, handler, filterChainBuilder );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( ! ( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+
+ if( filterChainBuilder == null )
+ {
+ filterChainBuilder = IoFilterChainBuilder.NOOP;
+ }
+
+ SocketChannel ch = null;
+ boolean success = false;
+ try
+ {
+ ch = SocketChannel.open();
+ ch.socket().setReuseAddress( true );
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+
+ ch.configureBlocking( false );
+
+ if( ch.connect( address ) )
+ {
+ SocketSessionImpl session = newSession( ch, handler, filterChainBuilder );
+ success = true;
+ ConnectFuture future = new ConnectFuture();
+ future.setSession( session );
+ return future;
+ }
+
+ success = true;
+ }
+ catch( IOException e )
+ {
+ return ConnectFuture.newFailedFuture( e );
+ }
+ finally
+ {
+ if( !success && ch != null )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest( ch, getConnectTimeout(), handler, filterChainBuilder );
+ synchronized( this )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e2 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e2 );
+ }
+
+ return ConnectFuture.newFailedFuture( e );
+ }
+ synchronized( connectQueue )
+ {
+ connectQueue.push( request );
+ }
+ selector.wakeup();
+ }
+
+ return request;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ private void registerNew()
+ {
+ if( connectQueue.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ ConnectionRequest req;
+ synchronized( connectQueue )
+ {
+ req = ( ConnectionRequest ) connectQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register( selector, SelectionKey.OP_CONNECT, req );
+ }
+ catch( IOException e )
+ {
+ req.setException( e );
+ }
+ }
+ }
+
+ private void processSessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isConnectable() )
+ continue;
+
+ SocketChannel ch = ( SocketChannel ) key.channel();
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ SocketSessionImpl session = newSession( ch, entry.handler, entry.filterChainBuilder );
+ entry.setSession( session );
+ success = true;
+ }
+ catch( Throwable e )
+ {
+ entry.setException( e );
+ }
+ finally
+ {
+ key.cancel();
+ if( !success )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions( Set keys )
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isValid() )
+ continue;
+
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ if( currentTime >= entry.deadline )
+ {
+ entry.setException( new ConnectException() );
+ key.cancel();
+ }
+ }
+ }
+
+ private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl( wrapper, ch, handler );
+ try
+ {
+ this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+ filterChainBuilder.buildFilterChain( session.getFilterChain() );
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+ }
+ catch( Throwable e )
+ {
+ ExceptionUtil.throwException( e );
+ }
+ session.getIoProcessor().addNew( session );
+ return session;
+ }
+
+ public int getProcessors()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setProcessors( int nProcessor )
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( SocketConnectorDelegate.this.threadName );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select( 1000 );
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ processTimedOutSessions( selector.keys() );
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketConnectorDelegate.this )
+ {
+ if( selector.keys().isEmpty() &&
+ connectQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
+ private static class ConnectionRequest extends ConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoFilterChainBuilder filterChainBuilder;
+
+ private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+ {
+ this.channel = channel;
+ this.deadline = System.currentTimeMillis() + timeout * 1000L;
+ this.handler = handler;
+ this.filterChainBuilder = filterChainBuilder;
+ }
+ }
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java Tue Dec 27 18:49:31 2005
@@ -1,47 +1,47 @@
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoFilterChain} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project
- */
-class SocketFilterChain extends AbstractIoFilterChain {
-
- public SocketFilterChain( IoSession parent )
- {
- super( parent );
- }
-
- protected void doWrite( IoSession session, WriteRequest writeRequest )
- {
- SocketSessionImpl s = ( SocketSessionImpl ) session;
- Queue writeRequestQueue = s.getWriteRequestQueue();
-
- ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.push( writeRequest );
- if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
- {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
- s.getIoProcessor().flush( s );
- }
- }
- }
-
- protected void doClose( IoSession session, CloseFuture closeFuture ) throws IOException
- {
- SocketSessionImpl s = ( SocketSessionImpl ) session;
- s.getIoProcessor().remove( s );
- }
-}
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project
+ */
+class SocketFilterChain extends AbstractIoFilterChain {
+
+ public SocketFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ SocketSessionImpl s = ( SocketSessionImpl ) session;
+ Queue writeRequestQueue = s.getWriteRequestQueue();
+
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.push( writeRequest );
+ if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+ {
+ // Notify SocketIoProcessor only when writeRequestQueue was empty.
+ s.getIoProcessor().flush( s );
+ }
+ }
+ }
+
+ protected void doClose( IoSession session, CloseFuture closeFuture ) throws IOException
+ {
+ SocketSessionImpl s = ( SocketSessionImpl ) session;
+ s.getIoProcessor().remove( s );
+ }
+}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Tue Dec 27 18:49:31 2005
@@ -1,673 +1,673 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.util.Queue;
-
-/**
- * Performs all I/O operations for sockets which is connected or bound.
- * This class is used by MINA internally.
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$,
- */
-class SocketIoProcessor
-{
- private static final String PROCESSORS_PROPERTY = "mina.socket.processors";
- private static final String THREAD_PREFIX = "SocketIoProcessor-";
- private static final int DEFAULT_PROCESSORS = 1;
- private static final int PROCESSOR_COUNT;
- private static final SocketIoProcessor[] PROCESSORS;
-
- private static int nextId;
-
- static
- {
- PROCESSOR_COUNT = configureProcessorCount();
- PROCESSORS = createProcessors();
- }
-
- /**
- * Returns the {@link SocketIoProcessor} to be used for a newly
- * created session
- *
- * @return The processor to be employed
- */
- static synchronized SocketIoProcessor getInstance()
- {
- SocketIoProcessor processor = PROCESSORS[ nextId ++ ];
- nextId %= PROCESSOR_COUNT;
- return processor;
- }
-
- private final String threadName;
- private Selector selector;
-
- private final Queue newSessions = new Queue();
- private final Queue removingSessions = new Queue();
- private final Queue flushingSessions = new Queue();
- private final Queue trafficControllingSessions = new Queue();
-
- private Worker worker;
- private long lastIdleCheckTime = System.currentTimeMillis();
-
- private SocketIoProcessor( String threadName )
- {
- this.threadName = threadName;
- }
-
- void addNew( SocketSessionImpl session ) throws IOException
- {
- synchronized( this )
- {
- synchronized( newSessions )
- {
- newSessions.push( session );
- }
- startupWorker();
- }
-
- selector.wakeup();
- }
-
- void remove( SocketSessionImpl session ) throws IOException
- {
- scheduleRemove( session );
- startupWorker();
- selector.wakeup();
- }
-
- private synchronized void startupWorker() throws IOException
- {
- if( worker == null )
- {
- selector = Selector.open();
- worker = new Worker();
- worker.start();
- }
- }
-
- void flush( SocketSessionImpl session )
- {
- scheduleFlush( session );
- Selector selector = this.selector;
- if( selector != null )
- {
- selector.wakeup();
- }
- }
-
- void updateTrafficMask( SocketSessionImpl session )
- {
- scheduleTrafficControl( session );
- Selector selector = this.selector;
- if( selector != null )
- {
- selector.wakeup();
- }
- }
-
- private void scheduleRemove( SocketSessionImpl session )
- {
- synchronized( removingSessions )
- {
- removingSessions.push( session );
- }
- }
-
- private void scheduleFlush( SocketSessionImpl session )
- {
- synchronized( flushingSessions )
- {
- flushingSessions.push( session );
- }
- }
-
- private void scheduleTrafficControl( SocketSessionImpl session )
- {
- synchronized( trafficControllingSessions )
- {
- trafficControllingSessions.push( session );
- }
- }
-
- private void doAddNew()
- {
- if( newSessions.isEmpty() )
- return;
-
- SocketSessionImpl session;
-
- for( ;; )
- {
- synchronized( newSessions )
- {
- session = ( SocketSessionImpl ) newSessions.pop();
- }
-
- if( session == null )
- break;
-
- SocketChannel ch = session.getChannel();
- boolean registered;
-
- try
- {
- ch.configureBlocking( false );
- session.setSelectionKey( ch.register( selector,
- SelectionKey.OP_READ,
- session ) );
- registered = true;
- }
- catch( IOException e )
- {
- registered = false;
- ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
-
- if( registered )
- {
- ( ( SocketFilterChain ) session.getFilterChain() ).sessionOpened( session );
- }
- }
- }
-
- private void doRemove()
- {
- if( removingSessions.isEmpty() )
- return;
-
- for( ;; )
- {
- SocketSessionImpl session;
-
- synchronized( removingSessions )
- {
- session = ( SocketSessionImpl ) removingSessions.pop();
- }
-
- if( session == null )
- break;
-
- SocketChannel ch = session.getChannel();
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.close() is called before addSession() is processed)
- if( key == null )
- {
- scheduleRemove( session );
- break;
- }
- // skip if channel is already closed
- if( !key.isValid() )
- {
- continue;
- }
-
- try
- {
- key.cancel();
- ch.close();
- }
- catch( IOException e )
- {
- ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
- finally
- {
- releaseWriteBuffers( session );
-
- ( ( SocketFilterChain ) session.getFilterChain() ).sessionClosed( session );
- session.getCloseFuture().setClosed();
- }
- }
- }
-
- private void process( Set selectedKeys )
- {
- Iterator it = selectedKeys.iterator();
-
- while( it.hasNext() )
- {
- SelectionKey key = ( SelectionKey ) it.next();
- SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
-
- if( key.isReadable() && session.getTrafficMask().isReadable() )
- {
- read( session );
- }
-
- if( key.isWritable() && session.getTrafficMask().isWritable() )
- {
- scheduleFlush( session );
- }
- }
-
- selectedKeys.clear();
- }
-
- private void read( SocketSessionImpl session )
- {
- ByteBuffer buf = ByteBuffer.allocate( session.getSessionReceiveBufferSize() );
- SocketChannel ch = session.getChannel();
-
- try
- {
- int readBytes = 0;
- int ret;
-
- buf.clear();
-
- try
- {
- while( ( ret = ch.read( buf.buf() ) ) > 0 )
- {
- readBytes += ret;
- }
- }
- finally
- {
- buf.flip();
- }
-
- session.increaseReadBytes( readBytes );
-
- if( readBytes > 0 )
- {
- ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
- newBuf.put( buf );
- newBuf.flip();
- ( ( SocketFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
- }
- if( ret < 0 )
- {
- scheduleRemove( session );
- }
- }
- catch( Throwable e )
- {
- if( e instanceof IOException )
- scheduleRemove( session );
- ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
- finally
- {
- buf.release();
- }
- }
-
- private void notifyIdleness()
- {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if( ( currentTime - lastIdleCheckTime ) >= 1000 )
- {
- lastIdleCheckTime = currentTime;
- Set keys = selector.keys();
- if( keys != null )
- {
- for( Iterator it = keys.iterator(); it.hasNext(); )
- {
- SelectionKey key = ( SelectionKey ) it.next();
- SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
- notifyIdleness( session, currentTime );
- }
- }
- }
- }
-
- private void notifyIdleness( SocketSessionImpl session, long currentTime )
- {
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
- IdleStatus.BOTH_IDLE,
- Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
- IdleStatus.READER_IDLE,
- Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
- notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
- IdleStatus.WRITER_IDLE,
- Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
-
- notifyWriteTimeout( session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime() );
- }
-
- private void notifyIdleness0( SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime )
- {
- if( idleTime > 0 && lastIoTime != 0
- && ( currentTime - lastIoTime ) >= idleTime )
- {
- session.increaseIdleCount( status );
- ( ( SocketFilterChain ) session.getFilterChain() ).sessionIdle( session, status );
- }
- }
-
- private void notifyWriteTimeout( SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime )
- {
- SelectionKey key = session.getSelectionKey();
- if( writeTimeout > 0
- && ( currentTime - lastIoTime ) >= writeTimeout
- && key != null && key.isValid()
- && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
- {
- ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, new WriteTimeoutException() );
- }
- }
-
- private void doFlush()
- {
- if( flushingSessions.size() == 0 )
- return;
-
- for( ;; )
- {
- SocketSessionImpl session;
-
- synchronized( flushingSessions )
- {
- session = ( SocketSessionImpl ) flushingSessions.pop();
- }
-
- if( session == null )
- break;
-
- if( !session.isConnected() )
- {
- releaseWriteBuffers( session );
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.write() is called before addSession() is processed)
- if( key == null )
- {
- scheduleFlush( session );
- break;
- }
- // skip if channel is already closed
- if( !key.isValid() )
- {
- continue;
- }
-
- try
- {
- doFlush( session );
- }
- catch( IOException e )
- {
- scheduleRemove( session );
- ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
- }
- }
-
- private void releaseWriteBuffers( SocketSessionImpl session )
- {
- Queue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
-
- while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
- {
- try
- {
- ( ( ByteBuffer ) req.getMessage() ).release();
- }
- catch( IllegalStateException e )
- {
- ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
- }
- finally
- {
- req.getFuture().setWritten( false );
- }
- }
- }
-
- private void doFlush( SocketSessionImpl session ) throws IOException
- {
- // Clear OP_WRITE
- SelectionKey key = session.getSelectionKey();
- key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
-
- SocketChannel ch = session.getChannel();
- Queue writeRequestQueue = session.getWriteRequestQueue();
-
- WriteRequest req;
- for( ;; )
- {
- synchronized( writeRequestQueue )
- {
- req = ( WriteRequest ) writeRequestQueue.first();
- }
-
- if( req == null )
- break;
-
- ByteBuffer buf = ( ByteBuffer ) req.getMessage();
- if( buf.remaining() == 0 )
- {
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.pop();
- }
-
- req.getFuture().setWritten( true );
- session.increaseWrittenWriteRequests();
- ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, buf.reset() );
- continue;
- }
-
- int writtenBytes = ch.write( buf.buf() );
- if( writtenBytes > 0 )
- {
- session.increaseWrittenBytes( writtenBytes );
- }
-
- if( buf.hasRemaining() )
- {
- // Kernel buffer is full
- key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
- break;
- }
- }
- }
-
- private void doUpdateTrafficMask()
- {
- if( trafficControllingSessions.isEmpty() )
- return;
-
- for( ;; )
- {
- SocketSessionImpl session;
-
- synchronized( trafficControllingSessions )
- {
- session = ( SocketSessionImpl ) trafficControllingSessions.pop();
- }
-
- if( session == null )
- break;
-
- SelectionKey key = session.getSelectionKey();
- // Retry later if session is not yet fully initialized.
- // (In case that Session.suspend??() or session.resume??() is
- // called before addSession() is processed)
- if( key == null )
- {
- scheduleTrafficControl( session );
- break;
- }
- // skip if channel is already closed
- if( !key.isValid() )
- {
- continue;
- }
-
- // The normal is OP_READ and, if there are write requests in the
- // session's write queue, set OP_WRITE to trigger flushing.
- int ops = SelectionKey.OP_READ;
- Queue writeRequestQueue = session.getWriteRequestQueue();
- synchronized( writeRequestQueue )
- {
- if( !writeRequestQueue.isEmpty() )
- {
- ops |= SelectionKey.OP_WRITE;
- }
- }
-
- // Now mask the preferred ops with the mask of the current session
- int mask = session.getTrafficMask().getInterestOps();
- key.interestOps( ops & mask );
- }
- }
-
- /**
- * Configures the number of processors employed.
- * We first check for a system property "mina.IoProcessors". If this
- * property is present and can be interpreted as an integer value greater
- * or equal to 1, this value is used as the number of processors.
- * Otherwise a default of 1 processor is employed.
- *
- * @return The nubmer of processors to employ
- */
- private static int configureProcessorCount()
- {
- int processors = DEFAULT_PROCESSORS;
- String processorProperty = System.getProperty( PROCESSORS_PROPERTY );
- if ( processorProperty != null )
- {
- try
- {
- processors = Integer.parseInt( processorProperty );
- }
- catch ( NumberFormatException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- processors = Math.max( processors, 1 );
-
- System.setProperty( PROCESSORS_PROPERTY, String.valueOf( processors ) );
- }
-
- return processors;
- }
-
- private static SocketIoProcessor[] createProcessors()
- {
- SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ];
- for ( int i = 0; i < PROCESSOR_COUNT; i ++ )
- {
- processors[i] = new SocketIoProcessor( THREAD_PREFIX + i );
- }
- return processors;
- }
-
- private class Worker extends Thread
- {
- public Worker()
- {
- super( SocketIoProcessor.this.threadName );
- }
-
- public void run()
- {
- for( ;; )
- {
- try
- {
- int nKeys = selector.select( 1000 );
- doAddNew();
- doUpdateTrafficMask();
-
- if( nKeys > 0 )
- {
- process( selector.selectedKeys() );
- }
-
- doFlush();
- doRemove();
- notifyIdleness();
-
- if( selector.keys().isEmpty() )
- {
- synchronized( SocketIoProcessor.this )
- {
- if( selector.keys().isEmpty() &&
- newSessions.isEmpty() )
- {
- worker = null;
- try
- {
- selector.close();
- }
- catch( IOException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- finally
- {
- selector = null;
- }
- break;
- }
- }
- }
- }
- catch( Throwable t )
- {
- ExceptionMonitor.getInstance().exceptionCaught( t );
-
- try
- {
- Thread.sleep( 1000 );
- }
- catch( InterruptedException e1 )
- {
- }
- }
- }
- }
- }
-
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.util.Queue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound.
+ * This class is used by MINA internally.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$,
+ */
+class SocketIoProcessor
+{
+ private static final String PROCESSORS_PROPERTY = "mina.socket.processors";
+ private static final String THREAD_PREFIX = "SocketIoProcessor-";
+ private static final int DEFAULT_PROCESSORS = 1;
+ private static final int PROCESSOR_COUNT;
+ private static final SocketIoProcessor[] PROCESSORS;
+
+ private static int nextId;
+
+ static
+ {
+ PROCESSOR_COUNT = configureProcessorCount();
+ PROCESSORS = createProcessors();
+ }
+
+ /**
+ * Returns the {@link SocketIoProcessor} to be used for a newly
+ * created session
+ *
+ * @return The processor to be employed
+ */
+ static synchronized SocketIoProcessor getInstance()
+ {
+ SocketIoProcessor processor = PROCESSORS[ nextId ++ ];
+ nextId %= PROCESSOR_COUNT;
+ return processor;
+ }
+
+ private final String threadName;
+ private Selector selector;
+
+ private final Queue newSessions = new Queue();
+ private final Queue removingSessions = new Queue();
+ private final Queue flushingSessions = new Queue();
+ private final Queue trafficControllingSessions = new Queue();
+
+ private Worker worker;
+ private long lastIdleCheckTime = System.currentTimeMillis();
+
+ private SocketIoProcessor( String threadName )
+ {
+ this.threadName = threadName;
+ }
+
+ void addNew( SocketSessionImpl session ) throws IOException
+ {
+ synchronized( this )
+ {
+ synchronized( newSessions )
+ {
+ newSessions.push( session );
+ }
+ startupWorker();
+ }
+
+ selector.wakeup();
+ }
+
+ void remove( SocketSessionImpl session ) throws IOException
+ {
+ scheduleRemove( session );
+ startupWorker();
+ selector.wakeup();
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ void flush( SocketSessionImpl session )
+ {
+ scheduleFlush( session );
+ Selector selector = this.selector;
+ if( selector != null )
+ {
+ selector.wakeup();
+ }
+ }
+
+ void updateTrafficMask( SocketSessionImpl session )
+ {
+ scheduleTrafficControl( session );
+ Selector selector = this.selector;
+ if( selector != null )
+ {
+ selector.wakeup();
+ }
+ }
+
+ private void scheduleRemove( SocketSessionImpl session )
+ {
+ synchronized( removingSessions )
+ {
+ removingSessions.push( session );
+ }
+ }
+
+ private void scheduleFlush( SocketSessionImpl session )
+ {
+ synchronized( flushingSessions )
+ {
+ flushingSessions.push( session );
+ }
+ }
+
+ private void scheduleTrafficControl( SocketSessionImpl session )
+ {
+ synchronized( trafficControllingSessions )
+ {
+ trafficControllingSessions.push( session );
+ }
+ }
+
+ private void doAddNew()
+ {
+ if( newSessions.isEmpty() )
+ return;
+
+ SocketSessionImpl session;
+
+ for( ;; )
+ {
+ synchronized( newSessions )
+ {
+ session = ( SocketSessionImpl ) newSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SocketChannel ch = session.getChannel();
+ boolean registered;
+
+ try
+ {
+ ch.configureBlocking( false );
+ session.setSelectionKey( ch.register( selector,
+ SelectionKey.OP_READ,
+ session ) );
+ registered = true;
+ }
+ catch( IOException e )
+ {
+ registered = false;
+ ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+
+ if( registered )
+ {
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionOpened( session );
+ }
+ }
+ }
+
+ private void doRemove()
+ {
+ if( removingSessions.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SocketSessionImpl session;
+
+ synchronized( removingSessions )
+ {
+ session = ( SocketSessionImpl ) removingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SocketChannel ch = session.getChannel();
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.close() is called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleRemove( session );
+ break;
+ }
+ // skip if channel is already closed
+ if( !key.isValid() )
+ {
+ continue;
+ }
+
+ try
+ {
+ key.cancel();
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+ finally
+ {
+ releaseWriteBuffers( session );
+
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionClosed( session );
+ session.getCloseFuture().setClosed();
+ }
+ }
+ }
+
+ private void process( Set selectedKeys )
+ {
+ Iterator it = selectedKeys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+
+ if( key.isReadable() && session.getTrafficMask().isReadable() )
+ {
+ read( session );
+ }
+
+ if( key.isWritable() && session.getTrafficMask().isWritable() )
+ {
+ scheduleFlush( session );
+ }
+ }
+
+ selectedKeys.clear();
+ }
+
+ private void read( SocketSessionImpl session )
+ {
+ ByteBuffer buf = ByteBuffer.allocate( session.getSessionReceiveBufferSize() );
+ SocketChannel ch = session.getChannel();
+
+ try
+ {
+ int readBytes = 0;
+ int ret;
+
+ buf.clear();
+
+ try
+ {
+ while( ( ret = ch.read( buf.buf() ) ) > 0 )
+ {
+ readBytes += ret;
+ }
+ }
+ finally
+ {
+ buf.flip();
+ }
+
+ session.increaseReadBytes( readBytes );
+
+ if( readBytes > 0 )
+ {
+ ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
+ newBuf.put( buf );
+ newBuf.flip();
+ ( ( SocketFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
+ }
+ if( ret < 0 )
+ {
+ scheduleRemove( session );
+ }
+ }
+ catch( Throwable e )
+ {
+ if( e instanceof IOException )
+ scheduleRemove( session );
+ ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+ finally
+ {
+ buf.release();
+ }
+ }
+
+ private void notifyIdleness()
+ {
+ // process idle sessions
+ long currentTime = System.currentTimeMillis();
+ if( ( currentTime - lastIdleCheckTime ) >= 1000 )
+ {
+ lastIdleCheckTime = currentTime;
+ Set keys = selector.keys();
+ if( keys != null )
+ {
+ for( Iterator it = keys.iterator(); it.hasNext(); )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+ SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+ notifyIdleness( session, currentTime );
+ }
+ }
+ }
+ }
+
+ private void notifyIdleness( SocketSessionImpl session, long currentTime )
+ {
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+ IdleStatus.BOTH_IDLE,
+ Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+ IdleStatus.READER_IDLE,
+ Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+ notifyIdleness0(
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+ IdleStatus.WRITER_IDLE,
+ Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+
+ notifyWriteTimeout( session, currentTime, session
+ .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ }
+
+ private void notifyIdleness0( SocketSessionImpl session, long currentTime,
+ long idleTime, IdleStatus status,
+ long lastIoTime )
+ {
+ if( idleTime > 0 && lastIoTime != 0
+ && ( currentTime - lastIoTime ) >= idleTime )
+ {
+ session.increaseIdleCount( status );
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionIdle( session, status );
+ }
+ }
+
+ private void notifyWriteTimeout( SocketSessionImpl session,
+ long currentTime,
+ long writeTimeout, long lastIoTime )
+ {
+ SelectionKey key = session.getSelectionKey();
+ if( writeTimeout > 0
+ && ( currentTime - lastIoTime ) >= writeTimeout
+ && key != null && key.isValid()
+ && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
+ {
+ ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, new WriteTimeoutException() );
+ }
+ }
+
+ private void doFlush()
+ {
+ if( flushingSessions.size() == 0 )
+ return;
+
+ for( ;; )
+ {
+ SocketSessionImpl session;
+
+ synchronized( flushingSessions )
+ {
+ session = ( SocketSessionImpl ) flushingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ if( !session.isConnected() )
+ {
+ releaseWriteBuffers( session );
+ continue;
+ }
+
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.write() is called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleFlush( session );
+ break;
+ }
+ // skip if channel is already closed
+ if( !key.isValid() )
+ {
+ continue;
+ }
+
+ try
+ {
+ doFlush( session );
+ }
+ catch( IOException e )
+ {
+ scheduleRemove( session );
+ ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+ }
+ }
+
+ private void releaseWriteBuffers( SocketSessionImpl session )
+ {
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequest req;
+
+ while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
+ {
+ try
+ {
+ ( ( ByteBuffer ) req.getMessage() ).release();
+ }
+ catch( IllegalStateException e )
+ {
+ ( ( SocketFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+ }
+ finally
+ {
+ req.getFuture().setWritten( false );
+ }
+ }
+ }
+
+ private void doFlush( SocketSessionImpl session ) throws IOException
+ {
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+
+ SocketChannel ch = session.getChannel();
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+
+ WriteRequest req;
+ for( ;; )
+ {
+ synchronized( writeRequestQueue )
+ {
+ req = ( WriteRequest ) writeRequestQueue.first();
+ }
+
+ if( req == null )
+ break;
+
+ ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+ if( buf.remaining() == 0 )
+ {
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.pop();
+ }
+
+ req.getFuture().setWritten( true );
+ session.increaseWrittenWriteRequests();
+ ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, buf.reset() );
+ continue;
+ }
+
+ int writtenBytes = ch.write( buf.buf() );
+ if( writtenBytes > 0 )
+ {
+ session.increaseWrittenBytes( writtenBytes );
+ }
+
+ if( buf.hasRemaining() )
+ {
+ // Kernel buffer is full
+ key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+ break;
+ }
+ }
+ }
+
+ private void doUpdateTrafficMask()
+ {
+ if( trafficControllingSessions.isEmpty() )
+ return;
+
+ for( ;; )
+ {
+ SocketSessionImpl session;
+
+ synchronized( trafficControllingSessions )
+ {
+ session = ( SocketSessionImpl ) trafficControllingSessions.pop();
+ }
+
+ if( session == null )
+ break;
+
+ SelectionKey key = session.getSelectionKey();
+ // Retry later if session is not yet fully initialized.
+ // (In case that Session.suspend??() or session.resume??() is
+ // called before addSession() is processed)
+ if( key == null )
+ {
+ scheduleTrafficControl( session );
+ break;
+ }
+ // skip if channel is already closed
+ if( !key.isValid() )
+ {
+ continue;
+ }
+
+ // The normal is OP_READ and, if there are write requests in the
+ // session's write queue, set OP_WRITE to trigger flushing.
+ int ops = SelectionKey.OP_READ;
+ Queue writeRequestQueue = session.getWriteRequestQueue();
+ synchronized( writeRequestQueue )
+ {
+ if( !writeRequestQueue.isEmpty() )
+ {
+ ops |= SelectionKey.OP_WRITE;
+ }
+ }
+
+ // Now mask the preferred ops with the mask of the current session
+ int mask = session.getTrafficMask().getInterestOps();
+ key.interestOps( ops & mask );
+ }
+ }
+
+ /**
+ * Configures the number of processors employed.
+ * We first check for a system property "mina.IoProcessors". If this
+ * property is present and can be interpreted as an integer value greater
+ * or equal to 1, this value is used as the number of processors.
+ * Otherwise a default of 1 processor is employed.
+ *
+ * @return The nubmer of processors to employ
+ */
+ private static int configureProcessorCount()
+ {
+ int processors = DEFAULT_PROCESSORS;
+ String processorProperty = System.getProperty( PROCESSORS_PROPERTY );
+ if ( processorProperty != null )
+ {
+ try
+ {
+ processors = Integer.parseInt( processorProperty );
+ }
+ catch ( NumberFormatException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ processors = Math.max( processors, 1 );
+
+ System.setProperty( PROCESSORS_PROPERTY, String.valueOf( processors ) );
+ }
+
+ return processors;
+ }
+
+ private static SocketIoProcessor[] createProcessors()
+ {
+ SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ];
+ for ( int i = 0; i < PROCESSOR_COUNT; i ++ )
+ {
+ processors[i] = new SocketIoProcessor( THREAD_PREFIX + i );
+ }
+ return processors;
+ }
+
+ private class Worker extends Thread
+ {
+ public Worker()
+ {
+ super( SocketIoProcessor.this.threadName );
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ try
+ {
+ int nKeys = selector.select( 1000 );
+ doAddNew();
+ doUpdateTrafficMask();
+
+ if( nKeys > 0 )
+ {
+ process( selector.selectedKeys() );
+ }
+
+ doFlush();
+ doRemove();
+ notifyIdleness();
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( SocketIoProcessor.this )
+ {
+ if( selector.keys().isEmpty() &&
+ newSessions.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ }
+ }
+ }
+ }
+ }
+
}
Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java Tue Dec 27 18:49:31 2005
@@ -1,247 +1,247 @@
-/*
- * @(#) $Id$
- *
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionManager;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoSession;
-import org.apache.mina.transport.socket.nio.SocketSession;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoSession} for socket transport (TCP/IP).
- *
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-class SocketSessionImpl extends BaseIoSession implements SocketSession
-{
- private static final int DEFAULT_READ_BUFFER_SIZE = 1024;
-
- private final IoSessionManager manager;
- private final SocketIoProcessor ioProcessor;
- private final SocketFilterChain filterChain;
- private final SocketChannel ch;
- private final Queue writeRequestQueue;
- private final IoHandler handler;
- private final SocketAddress remoteAddress;
- private final SocketAddress localAddress;
- private SelectionKey key;
- private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
-
- /**
- * Creates a new instance.
- */
- public SocketSessionImpl(
- IoSessionManager manager,
- SocketChannel ch, IoHandler defaultHandler )
- {
- this.manager = manager;
- this.ioProcessor = SocketIoProcessor.getInstance();
- this.filterChain = new SocketFilterChain( this );
- this.ch = ch;
- this.writeRequestQueue = new Queue();
- this.handler = defaultHandler;
- this.remoteAddress = ch.socket().getRemoteSocketAddress();
- this.localAddress = ch.socket().getLocalSocketAddress();
- }
-
- public IoSessionManager getManager()
- {
- return manager;
- }
-
- SocketIoProcessor getIoProcessor()
- {
- return ioProcessor;
- }
-
- public IoFilterChain getFilterChain()
- {
- return filterChain;
- }
-
- SocketChannel getChannel()
- {
- return ch;
- }
-
- SelectionKey getSelectionKey()
- {
- return key;
- }
-
- void setSelectionKey( SelectionKey key )
- {
- this.key = key;
- }
-
- public IoHandler getHandler()
- {
- return handler;
- }
-
- protected void close0( CloseFuture closeFuture )
- {
- filterChain.filterClose( this, closeFuture );
- }
-
- Queue getWriteRequestQueue()
- {
- return writeRequestQueue;
- }
-
- public int getScheduledWriteRequests()
- {
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.size();
- }
- }
-
- protected void write0( WriteRequest writeRequest )
- {
- filterChain.filterWrite( this, writeRequest );
- }
-
- public TransportType getTransportType()
- {
- return TransportType.SOCKET;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return remoteAddress;
- }
-
- public SocketAddress getLocalAddress()
- {
- return localAddress;
- }
-
- public boolean getKeepAlive() throws SocketException
- {
- return ch.socket().getKeepAlive();
- }
-
- public void setKeepAlive( boolean on ) throws SocketException
- {
- ch.socket().setKeepAlive( on );
- }
-
- public boolean getOOBInline() throws SocketException
- {
- return ch.socket().getOOBInline();
- }
-
- public void setOOBInline( boolean on ) throws SocketException
- {
- ch.socket().setOOBInline( on );
- }
-
- public boolean getReuseAddress() throws SocketException
- {
- return ch.socket().getReuseAddress();
- }
-
- public void setReuseAddress( boolean on ) throws SocketException
- {
- ch.socket().setReuseAddress( on );
- }
-
- public int getSoLinger() throws SocketException
- {
- return ch.socket().getSoLinger();
- }
-
- public void setSoLinger( boolean on, int linger ) throws SocketException
- {
- ch.socket().setSoLinger( on, linger );
- }
-
- public boolean getTcpNoDelay() throws SocketException
- {
- return ch.socket().getTcpNoDelay();
- }
-
- public void setTcpNoDelay( boolean on ) throws SocketException
- {
- ch.socket().setTcpNoDelay( on );
- }
-
- public int getTrafficClass() throws SocketException
- {
- return ch.socket().getTrafficClass();
- }
-
- public void setTrafficClass( int tc ) throws SocketException
- {
- ch.socket().setTrafficClass( tc );
- }
-
- public int getSendBufferSize() throws SocketException
- {
- return ch.socket().getSendBufferSize();
- }
-
- public void setSendBufferSize( int size ) throws SocketException
- {
- ch.socket().setSendBufferSize( size );
- }
-
- public int getReceiveBufferSize() throws SocketException
- {
- return ch.socket().getReceiveBufferSize();
- }
-
- public void setReceiveBufferSize( int size ) throws SocketException
- {
- ch.socket().setReceiveBufferSize( size );
- }
-
- public int getSessionReceiveBufferSize()
- {
- return readBufferSize;
- }
-
- public void setSessionReceiveBufferSize( int size )
- {
- if( size <= 0 )
- {
- throw new IllegalArgumentException( "Invalid session receive buffer size: " + size );
- }
-
- this.readBufferSize = size;
- }
-
- protected void updateTrafficMask()
- {
- this.ioProcessor.updateTrafficMask( this );
- }
-}
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManager;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.transport.socket.nio.SocketSession;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoSession} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+class SocketSessionImpl extends BaseIoSession implements SocketSession
+{
+ private static final int DEFAULT_READ_BUFFER_SIZE = 1024;
+
+ private final IoSessionManager manager;
+ private final SocketIoProcessor ioProcessor;
+ private final SocketFilterChain filterChain;
+ private final SocketChannel ch;
+ private final Queue writeRequestQueue;
+ private final IoHandler handler;
+ private final SocketAddress remoteAddress;
+ private final SocketAddress localAddress;
+ private SelectionKey key;
+ private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+
+ /**
+ * Creates a new instance.
+ */
+ public SocketSessionImpl(
+ IoSessionManager manager,
+ SocketChannel ch, IoHandler defaultHandler )
+ {
+ this.manager = manager;
+ this.ioProcessor = SocketIoProcessor.getInstance();
+ this.filterChain = new SocketFilterChain( this );
+ this.ch = ch;
+ this.writeRequestQueue = new Queue();
+ this.handler = defaultHandler;
+ this.remoteAddress = ch.socket().getRemoteSocketAddress();
+ this.localAddress = ch.socket().getLocalSocketAddress();
+ }
+
+ public IoSessionManager getManager()
+ {
+ return manager;
+ }
+
+ SocketIoProcessor getIoProcessor()
+ {
+ return ioProcessor;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filterChain;
+ }
+
+ SocketChannel getChannel()
+ {
+ return ch;
+ }
+
+ SelectionKey getSelectionKey()
+ {
+ return key;
+ }
+
+ void setSelectionKey( SelectionKey key )
+ {
+ this.key = key;
+ }
+
+ public IoHandler getHandler()
+ {
+ return handler;
+ }
+
+ protected void close0( CloseFuture closeFuture )
+ {
+ filterChain.filterClose( this, closeFuture );
+ }
+
+ Queue getWriteRequestQueue()
+ {
+ return writeRequestQueue;
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.size();
+ }
+ }
+
+ protected void write0( WriteRequest writeRequest )
+ {
+ filterChain.filterWrite( this, writeRequest );
+ }
+
+ public TransportType getTransportType()
+ {
+ return TransportType.SOCKET;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return localAddress;
+ }
+
+ public boolean getKeepAlive() throws SocketException
+ {
+ return ch.socket().getKeepAlive();
+ }
+
+ public void setKeepAlive( boolean on ) throws SocketException
+ {
+ ch.socket().setKeepAlive( on );
+ }
+
+ public boolean getOOBInline() throws SocketException
+ {
+ return ch.socket().getOOBInline();
+ }
+
+ public void setOOBInline( boolean on ) throws SocketException
+ {
+ ch.socket().setOOBInline( on );
+ }
+
+ public boolean getReuseAddress() throws SocketException
+ {
+ return ch.socket().getReuseAddress();
+ }
+
+ public void setReuseAddress( boolean on ) throws SocketException
+ {
+ ch.socket().setReuseAddress( on );
+ }
+
+ public int getSoLinger() throws SocketException
+ {
+ return ch.socket().getSoLinger();
+ }
+
+ public void setSoLinger( boolean on, int linger ) throws SocketException
+ {
+ ch.socket().setSoLinger( on, linger );
+ }
+
+ public boolean getTcpNoDelay() throws SocketException
+ {
+ return ch.socket().getTcpNoDelay();
+ }
+
+ public void setTcpNoDelay( boolean on ) throws SocketException
+ {
+ ch.socket().setTcpNoDelay( on );
+ }
+
+ public int getTrafficClass() throws SocketException
+ {
+ return ch.socket().getTrafficClass();
+ }
+
+ public void setTrafficClass( int tc ) throws SocketException
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+
+ public int getSendBufferSize() throws SocketException
+ {
+ return ch.socket().getSendBufferSize();
+ }
+
+ public void setSendBufferSize( int size ) throws SocketException
+ {
+ ch.socket().setSendBufferSize( size );
+ }
+
+ public int getReceiveBufferSize() throws SocketException
+ {
+ return ch.socket().getReceiveBufferSize();
+ }
+
+ public void setReceiveBufferSize( int size ) throws SocketException
+ {
+ ch.socket().setReceiveBufferSize( size );
+ }
+
+ public int getSessionReceiveBufferSize()
+ {
+ return readBufferSize;
+ }
+
+ public void setSessionReceiveBufferSize( int size )
+ {
+ if( size <= 0 )
+ {
+ throw new IllegalArgumentException( "Invalid session receive buffer size: " + size );
+ }
+
+ this.readBufferSize = size;
+ }
+
+ protected void updateTrafficMask()
+ {
+ this.ioProcessor.updateTrafficMask( this );
+ }
+}