You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/04/29 01:36:29 UTC
svn commit: r398039 - in
/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio:
./ support/
Author: proyal
Date: Fri Apr 28 16:36:27 2006
New Revision: 398039
URL: http://svn.apache.org/viewcvs?rev=398039&view=rev
Log:
DIRMINA-209 - The number of processors for socket connectors and acceptors can be specified via a cxtor parameter. By default, connectors and acceptors each use their own processor.
Additionally, the delegates have been removed helping to simplify the code.
Added:
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (contents, props changed)
- copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (contents, props changed)
- copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java (contents, props changed)
- copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (contents, props changed)
- copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
Removed:
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
Modified:
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java
directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html
Added: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=398039&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Fri Apr 28 16:36:27 2006
@@ -0,0 +1,651 @@
+/*
+ * @(#) $Id: SocketAcceptor.java 389042 2006-03-27 07:49:41Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
+ */
+public class SocketAcceptor extends BaseIoAcceptor
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId ++;
+ private final String threadName = "SocketAcceptor-" + id;
+ private final IoServiceConfig defaultConfig = new SocketAcceptorConfig();
+ private final Map channels = new HashMap();
+ private final Hashtable sessions = new Hashtable();
+
+ private final Queue registerQueue = new Queue();
+ private final Queue cancelQueue = new Queue();
+
+ private final SocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+
+ /**
+ * Create an acceptor with a single processing thread
+ */
+ public SocketAcceptor()
+ {
+ this( 1 );
+ }
+
+ /**
+ * Create an acceptor with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ */
+ public SocketAcceptor( int processorCount )
+ {
+ if( processorCount < 1 )
+ {
+ throw new IllegalArgumentException( "Must have at least one processor" );
+ }
+
+ this.processorCount = processorCount;
+ ioProcessors = new SocketIoProcessor[processorCount];
+
+ for( int i = 0; i < processorCount; i++ )
+ {
+ ioProcessors[i] = new SocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i );
+ }
+ }
+
+ /**
+ * Binds to the specified <code>address</code> and handles incoming connections with the specified
+ * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property.
+ *
+ * @throws IOException if failed to bind
+ */
+ public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ if( handler == null )
+ {
+ throw new NullPointerException( "handler" );
+ }
+
+ if( !( address instanceof InetSocketAddress ) )
+ {
+ throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+ }
+
+ if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+ {
+ throw new IllegalArgumentException( "Unsupported port number: 0" );
+ }
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+ synchronized( registerQueue )
+ {
+ registerQueue.push( request );
+ }
+
+ startupWorker();
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ throw request.exception;
+ }
+ }
+
+
+ private synchronized void startupWorker() throws IOException
+ {
+ synchronized( lock )
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+
+ worker.start();
+ }
+ }
+ }
+
+ public Set getManagedSessions( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ Set managedSessions = ( Set ) sessions.get( address );
+
+ if( managedSessions == null )
+ {
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ return Collections.unmodifiableSet(
+ new IdentityHashSet( Arrays.asList( managedSessions.toArray() ) ) );
+ }
+
+ public void unbind( SocketAddress address )
+ {
+ if( address == null )
+ {
+ throw new NullPointerException( "address" );
+ }
+
+ final Set managedSessions = ( Set ) sessions.get( address );
+ CancellationRequest request = new CancellationRequest( address );
+
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ // IOException is thrown only when Worker thread is not
+ // running and failed to open a selector. We simply throw
+ // IllegalArgumentException here because we can simply
+ // conclude that nothing is bound to the selector.
+ throw new IllegalArgumentException( "Address not bound: " + address );
+ }
+
+ synchronized( cancelQueue )
+ {
+ cancelQueue.push( request );
+ }
+
+ selector.wakeup();
+
+ synchronized( request )
+ {
+ while( !request.done )
+ {
+ try
+ {
+ request.wait();
+ }
+ catch( InterruptedException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ if( request.exception != null )
+ {
+ request.exception.fillInStackTrace();
+
+ throw request.exception;
+ }
+
+ // Disconnect all clients
+ IoServiceConfig cfg = request.registrationRequest.config;
+ boolean disconnectOnUnbind;
+ if( cfg instanceof IoAcceptorConfig )
+ {
+ disconnectOnUnbind = ( ( IoAcceptorConfig ) cfg ).isDisconnectOnUnbind();
+ }
+ else
+ {
+ disconnectOnUnbind = ( ( IoAcceptorConfig ) getDefaultConfig() ).isDisconnectOnUnbind();
+ }
+
+ if( disconnectOnUnbind && managedSessions != null )
+ {
+ IoSession[] tempSessions =
+ ( IoSession[] ) managedSessions.toArray( new IoSession[ managedSessions.size() ] );
+
+ final Object lock = new Object();
+
+ for( int i = 0; i < tempSessions.length; i++ )
+ {
+ if( !managedSessions.contains( tempSessions[i] ) )
+ {
+ // The session has already been closed and have been
+ // removed from managedSessions by the SocketIoProcessor.
+ continue;
+ }
+ tempSessions[i].close().setCallback( new IoFuture.Callback()
+ {
+ public void operationComplete( IoFuture future )
+ {
+ synchronized( lock )
+ {
+ //noinspection NakedNotify
+ lock.notifyAll();
+ }
+ }
+ } );
+ }
+
+ try
+ {
+ synchronized( lock )
+ {
+ while( !managedSessions.isEmpty() )
+ {
+ lock.wait( 1000 );
+ }
+ }
+ }
+ catch( InterruptedException ie )
+ {
+ // Ignored
+ }
+
+ }
+ }
+
+ public void unbindAll()
+ {
+ List addresses;
+ synchronized( channels )
+ {
+ addresses = new ArrayList( channels.keySet() );
+ }
+
+ for( Iterator i = addresses.iterator(); i.hasNext(); )
+ {
+ unbind( ( SocketAddress ) i.next() );
+ }
+ }
+
+ public boolean isBound( SocketAddress address )
+ {
+ synchronized( channels )
+ {
+ return channels.containsKey( address );
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ Worker()
+ {
+ super( SocketAcceptor.this.threadName );
+ }
+
+ public void run()
+ {
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select();
+
+ registerNew();
+ cancelKeys();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ if( selector.keys().isEmpty() )
+ {
+ synchronized( lock )
+ {
+ if( selector.keys().isEmpty() &&
+ registerQueue.isEmpty() &&
+ cancelQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ }
+
+ private void processSessions( Set keys ) throws IOException
+ {
+ Iterator it = keys.iterator();
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ it.remove();
+
+ if( !key.isAcceptable() )
+ {
+ continue;
+ }
+
+ ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+ SocketChannel ch = ssc.accept();
+
+ if( ch == null )
+ {
+ continue;
+ }
+
+ boolean success = false;
+ SocketSessionImpl session = null;
+ try
+ {
+ RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+ session = new SocketSessionImpl( SocketAcceptor.this,
+ nextProcessor(),
+ ( Set ) sessions.get( req.address ),
+ req.config.getSessionConfig(),
+ ch,
+ req.handler,
+ req.address );
+ getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+ session.getManagedSessions().add( session );
+ session.getIoProcessor().addNew( session );
+ success = true;
+ }
+ catch( Throwable t )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( t );
+ }
+ finally
+ {
+ if( !success )
+ {
+ if( session != null )
+ {
+ session.getManagedSessions().remove( session );
+ }
+ ch.close();
+ }
+ }
+ }
+ }
+ }
+
+ private SocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+
+ private void registerNew()
+ {
+ if( registerQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ RegistrationRequest req;
+
+ synchronized( registerQueue )
+ {
+ req = ( RegistrationRequest ) registerQueue.pop();
+ }
+
+ if( req == null )
+ {
+ break;
+ }
+
+ ServerSocketChannel ssc = null;
+
+ try
+ {
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking( false );
+
+ // Configure the server socket,
+ SocketAcceptorConfig cfg;
+ if( req.config instanceof SocketAcceptorConfig )
+ {
+ cfg = ( SocketAcceptorConfig ) req.config;
+ }
+ else
+ {
+ cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+ }
+
+ ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+ ssc.socket().setReceiveBufferSize(
+ ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+ // and bind.
+ ssc.socket().bind( req.address, cfg.getBacklog() );
+ ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+ synchronized( channels )
+ {
+ channels.put( req.address, ssc );
+ }
+ sessions.put( req.address, Collections.synchronizedSet( new HashSet() ) );
+ }
+ catch( IOException e )
+ {
+ req.exception = e;
+ }
+ finally
+ {
+ synchronized( req )
+ {
+ req.done = true;
+
+ req.notifyAll();
+ }
+
+ if( ssc != null && req.exception != null )
+ {
+ try
+ {
+ ssc.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+ }
+
+
+ private void cancelKeys()
+ {
+ if( cancelQueue.isEmpty() )
+ {
+ return;
+ }
+
+ for( ; ; )
+ {
+ CancellationRequest request;
+
+ synchronized( cancelQueue )
+ {
+ request = ( CancellationRequest ) cancelQueue.pop();
+ }
+
+ if( request == null )
+ {
+ break;
+ }
+
+ sessions.remove( request.address );
+ ServerSocketChannel ssc;
+ synchronized( channels )
+ {
+ ssc = ( ServerSocketChannel ) channels.remove( request.address );
+ }
+
+ // close the channel
+ try
+ {
+ if( ssc == null )
+ {
+ request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+ }
+ else
+ {
+ SelectionKey key = ssc.keyFor( selector );
+ request.registrationRequest = ( RegistrationRequest ) key.attachment();
+ key.cancel();
+
+ selector.wakeup(); // wake up again to trigger thread death
+
+ ssc.close();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ synchronized( request )
+ {
+ request.done = true;
+ request.notifyAll();
+ }
+ }
+ }
+ }
+
+ private static class RegistrationRequest
+ {
+ private final SocketAddress address;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+ private IOException exception;
+ private boolean done;
+
+ private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ this.address = address;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+
+
+ private static class CancellationRequest
+ {
+ private final SocketAddress address;
+ private boolean done;
+ private RegistrationRequest registrationRequest;
+ private RuntimeException exception;
+
+ private CancellationRequest( SocketAddress address )
+ {
+ this.address = address;
+ }
+ }
+}
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java?rev=398039&r1=398038&r2=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java Fri Apr 28 16:36:27 2006
@@ -18,15 +18,14 @@
*/
package org.apache.mina.transport.socket.nio;
-import java.io.IOException;
-import java.net.ServerSocket;
-
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.support.BaseIoAcceptorConfig;
-import org.apache.mina.transport.socket.nio.support.SocketSessionConfigImpl;
+
+import java.io.IOException;
+import java.net.ServerSocket;
/**
* An {@link IoAcceptorConfig} for {@link SocketAcceptor}.
@@ -39,7 +38,7 @@
private IoSessionConfig sessionConfig = new SocketSessionConfigImpl();
private int backlog = 50;
private boolean reuseAddress;
-
+
/**
* Creates a new instance.
*
@@ -85,7 +84,7 @@
{
return reuseAddress;
}
-
+
/**
* @see ServerSocket#setReuseAddress(boolean)
*/
@@ -93,12 +92,12 @@
{
this.reuseAddress = reuseAddress;
}
-
+
public int getBacklog()
{
return backlog;
}
-
+
public void setBacklog( int backlog )
{
this.backlog = backlog;
Added: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=398039&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Fri Apr 28 16:36:27 2006
@@ -0,0 +1,466 @@
+/*
+ * @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
+ */
+public class SocketConnector extends BaseIoConnector
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+ private final IoServiceConfig defaultConfig = new SocketConnectorConfig();
+ private final Queue connectQueue = new Queue();
+ private final Set managedSessions = Collections.synchronizedSet( new HashSet() );
+ private final SocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+
+ /**
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+ */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private long workerTimeout = 1000L * 60;
+
+ /**
+ * Create a connector with a single processing thread
+ */
+ public SocketConnector()
+ {
+ this( 1 );
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ */
+ public SocketConnector( int processorCount )
+ {
+ if( processorCount < 1 )
+ {
+ throw new IllegalArgumentException( "Must have at least one processor" );
+ }
+
+ this.processorCount = processorCount;
+ ioProcessors = new SocketIoProcessor[processorCount];
+
+ for( int i = 0; i < processorCount; i++ )
+ {
+ ioProcessors[i] = new SocketIoProcessor( "SocketConnectorIoProcessor-" + id + "." + i );
+ }
+ }
+
+ /**
+ * How many milliseconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of milliseconds to keep connection thread alive
+ */
+ public long getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many milliseconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of milliseconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout( long workerTimeout )
+ {
+ if( workerTimeout < 0 )
+ {
+ throw new IllegalArgumentException( "Must be > 0" );
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
+ {
+ return connect( address, null, handler, config );
+ }
+
+ public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config )
+ {
+ if( address == null )
+ throw new NullPointerException( "address" );
+ if( handler == null )
+ throw new NullPointerException( "handler" );
+
+ if( ! ( address instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected address type: "
+ + address.getClass() );
+
+ if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+ throw new IllegalArgumentException( "Unexpected local address type: "
+ + localAddress.getClass() );
+
+ if( config == null )
+ {
+ config = getDefaultConfig();
+ }
+
+ SocketChannel ch = null;
+ boolean success = false;
+ try
+ {
+ ch = SocketChannel.open();
+ ch.socket().setReuseAddress( true );
+ if( localAddress != null )
+ {
+ ch.socket().bind( localAddress );
+ }
+
+ ch.configureBlocking( false );
+
+ if( ch.connect( address ) )
+ {
+ SocketSessionImpl session = newSession( ch, handler, config );
+ success = true;
+ ConnectFuture future = new ConnectFuture();
+ future.setSession( session );
+ return future;
+ }
+
+ success = true;
+ }
+ catch( IOException e )
+ {
+ return ConnectFuture.newFailedFuture( e );
+ }
+ finally
+ {
+ if( !success && ch != null )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+
+ ConnectionRequest request = new ConnectionRequest( ch, handler, config );
+ synchronized( lock )
+ {
+ try
+ {
+ startupWorker();
+ }
+ catch( IOException e )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e2 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e2 );
+ }
+
+ return ConnectFuture.newFailedFuture( e );
+ }
+ }
+
+ synchronized( connectQueue )
+ {
+ connectQueue.push( request );
+ }
+ selector.wakeup();
+
+ return request;
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
+ }
+
+ private void registerNew()
+ {
+ if( connectQueue.isEmpty() )
+ return;
+
+ for( ; ; )
+ {
+ ConnectionRequest req;
+ synchronized( connectQueue )
+ {
+ req = ( ConnectionRequest ) connectQueue.pop();
+ }
+
+ if( req == null )
+ break;
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register( selector, SelectionKey.OP_CONNECT, req );
+ }
+ catch( IOException e )
+ {
+ req.setException( e );
+ }
+ }
+ }
+
+ private void processSessions( Set keys )
+ {
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isConnectable() )
+ continue;
+
+ SocketChannel ch = ( SocketChannel ) key.channel();
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ SocketSessionImpl session = newSession( ch, entry.handler, entry.config );
+ entry.setSession( session );
+ success = true;
+ }
+ catch( Throwable e )
+ {
+ entry.setException( e );
+ }
+ finally
+ {
+ key.cancel();
+ if( !success )
+ {
+ try
+ {
+ ch.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions( Set keys )
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while( it.hasNext() )
+ {
+ SelectionKey key = ( SelectionKey ) it.next();
+
+ if( !key.isValid() )
+ continue;
+
+ ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+ if( currentTime >= entry.deadline )
+ {
+ entry.setException( new ConnectException() );
+ key.cancel();
+ }
+ }
+ }
+
+ private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoServiceConfig config )
+ throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl( this,
+ nextProcessor(),
+ managedSessions,
+ config.getSessionConfig(),
+ ch,
+ handler,
+ ch.socket().getRemoteSocketAddress() );
+ try
+ {
+ getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+ config.getThreadModel().buildFilterChain( session.getFilterChain() );
+ ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+ }
+ catch( Throwable e )
+ {
+ throw ( IOException ) new IOException( "Failed to create a session." ).initCause( e );
+ }
+ session.getManagedSessions().add( session );
+ session.getIoProcessor().addNew( session );
+ return session;
+ }
+
+ private SocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ private class Worker extends Thread
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ Worker()
+ {
+ super( SocketConnector.this.threadName );
+ }
+
+ public void run()
+ {
+ for( ; ; )
+ {
+ try
+ {
+ int nKeys = selector.select( 1000 );
+
+ registerNew();
+
+ if( nKeys > 0 )
+ {
+ processSessions( selector.selectedKeys() );
+ }
+
+ processTimedOutSessions( selector.keys() );
+
+ if( selector.keys().isEmpty() )
+ {
+ if( System.currentTimeMillis() - lastActive > workerTimeout )
+ {
+ synchronized( lock )
+ {
+ if( selector.keys().isEmpty() &&
+ connectQueue.isEmpty() )
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch( IOException e )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e );
+
+ try
+ {
+ Thread.sleep( 1000 );
+ }
+ catch( InterruptedException e1 )
+ {
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends ConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest( SocketChannel channel, IoHandler handler, IoServiceConfig config )
+ {
+ this.channel = channel;
+ long timeout;
+ if( config instanceof IoConnectorConfig )
+ {
+ timeout = ( ( IoConnectorConfig ) config ).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ( ( IoConnectorConfig ) getDefaultConfig() ).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
\ No newline at end of file
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java?rev=398039&r1=398038&r2=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java Fri Apr 28 16:36:27 2006
@@ -22,7 +22,6 @@
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.support.BaseIoConnectorConfig;
-import org.apache.mina.transport.socket.nio.support.SocketSessionConfigImpl;
/**
* An {@link IoConnectorConfig} for {@link SocketConnector}.
Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Fri Apr 28 16:36:27 2006
@@ -16,7 +16,7 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
import java.io.IOException;
@@ -34,7 +34,7 @@
*/
class SocketFilterChain extends AbstractIoFilterChain {
- public SocketFilterChain( IoSession parent )
+ SocketFilterChain( IoSession parent )
{
super( parent );
}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Apr 28 16:36:27 2006
@@ -16,7 +16,14 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.Queue;
import java.io.IOException;
import java.nio.channels.SelectionKey;
@@ -25,50 +32,20 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.util.Queue;
-
/**
- * Performs all I/O operations for sockets which is connected or bound.
- * This class is used by MINA internally.
- *
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$,
*/
class SocketIoProcessor
{
- private static final String PROCESSORS_PROPERTY = "mina.socket.processors";
- private static final String THREAD_PREFIX = "SocketIoProcessor-";
- private static final int DEFAULT_PROCESSORS = 1;
- private static final int PROCESSOR_COUNT;
- private static final SocketIoProcessor[] PROCESSORS;
-
- private static int nextId;
-
- static
- {
- PROCESSOR_COUNT = configureProcessorCount();
- PROCESSORS = createProcessors();
- }
-
+ private final Object lock = new Object();
+
+ private final String threadName;
/**
- * Returns the {@link SocketIoProcessor} to be used for a newly
- * created session
- *
- * @return The processor to be employed
+ * @noinspection FieldAccessedSynchronizedAndUnsynchronized
*/
- static synchronized SocketIoProcessor getInstance()
- {
- SocketIoProcessor processor = PROCESSORS[ nextId ++ ];
- nextId %= PROCESSOR_COUNT;
- return processor;
- }
-
- private final String threadName;
private Selector selector;
private final Queue newSessions = new Queue();
@@ -79,22 +56,20 @@
private Worker worker;
private long lastIdleCheckTime = System.currentTimeMillis();
- private SocketIoProcessor( String threadName )
+ SocketIoProcessor( String threadName )
{
this.threadName = threadName;
}
void addNew( SocketSessionImpl session ) throws IOException
{
- synchronized( this )
+ synchronized( newSessions )
{
- synchronized( newSessions )
- {
- newSessions.push( session );
- }
- startupWorker();
+ newSessions.push( session );
}
+ startupWorker();
+
selector.wakeup();
}
@@ -105,13 +80,16 @@
selector.wakeup();
}
- private synchronized void startupWorker() throws IOException
+ private void startupWorker() throws IOException
{
- if( worker == null )
+ synchronized( lock )
{
- selector = Selector.open();
- worker = new Worker();
- worker.start();
+ if( worker == null )
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ worker.start();
+ }
}
}
@@ -164,10 +142,10 @@
if( newSessions.isEmpty() )
return;
- SocketSessionImpl session;
-
- for( ;; )
+ for( ; ; )
{
+ SocketSessionImpl session;
+
synchronized( newSessions )
{
session = ( SocketSessionImpl ) newSessions.pop();
@@ -206,7 +184,7 @@
if( removingSessions.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
SocketSessionImpl session;
@@ -277,16 +255,16 @@
private void read( SocketSessionImpl session )
{
- ByteBuffer buf = ByteBuffer.allocate( session.getReadBufferSize() );
+ ByteBuffer buf = ByteBuffer.allocate( session.getReadBufferSize() );
SocketChannel ch = session.getChannel();
try
{
+ buf.clear();
+
int readBytes = 0;
int ret;
- buf.clear();
-
try
{
while( ( ret = ch.read( buf.buf() ) ) > 0 )
@@ -347,28 +325,28 @@
private void notifyIdleness( SocketSessionImpl session, long currentTime )
{
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
- IdleStatus.BOTH_IDLE,
- Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+ IdleStatus.BOTH_IDLE,
+ Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
- IdleStatus.READER_IDLE,
- Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+ IdleStatus.READER_IDLE,
+ Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
notifyIdleness0(
- session, currentTime,
- session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
- IdleStatus.WRITER_IDLE,
- Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+ session, currentTime,
+ session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+ IdleStatus.WRITER_IDLE,
+ Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
notifyWriteTimeout( session, currentTime, session
- .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+ .getWriteTimeoutInMillis(), session.getLastWriteTime() );
}
private void notifyIdleness0( SocketSessionImpl session, long currentTime,
- long idleTime, IdleStatus status,
- long lastIoTime )
+ long idleTime, IdleStatus status,
+ long lastIoTime )
{
if( idleTime > 0 && lastIoTime != 0
&& ( currentTime - lastIoTime ) >= idleTime )
@@ -379,8 +357,8 @@
}
private void notifyWriteTimeout( SocketSessionImpl session,
- long currentTime,
- long writeTimeout, long lastIoTime )
+ long currentTime,
+ long writeTimeout, long lastIoTime )
{
SelectionKey key = session.getSelectionKey();
if( writeTimeout > 0
@@ -397,7 +375,7 @@
if( flushingSessions.size() == 0 )
return;
- for( ;; )
+ for( ; ; )
{
SocketSessionImpl session;
@@ -414,7 +392,7 @@
releaseWriteBuffers( session );
continue;
}
-
+
SelectionKey key = session.getSelectionKey();
// Retry later if session is not yet fully initialized.
// (In case that Session.write() is called before addSession() is processed)
@@ -440,12 +418,12 @@
}
}
}
-
+
private void releaseWriteBuffers( SocketSessionImpl session )
{
Queue writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
-
+
while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
{
try
@@ -472,9 +450,10 @@
SocketChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for( ;; )
+ for( ; ; )
{
+ WriteRequest req;
+
synchronized( writeRequestQueue )
{
req = ( WriteRequest ) writeRequestQueue.first();
@@ -490,9 +469,9 @@
{
writeRequestQueue.pop();
}
-
+
session.increaseWrittenWriteRequests();
-
+
buf.reset();
( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, req );
continue;
@@ -513,12 +492,12 @@
}
}
- private void doUpdateTrafficMask()
+ private void doUpdateTrafficMask()
{
if( trafficControllingSessions.isEmpty() )
return;
- for( ;; )
+ for( ; ; )
{
SocketSessionImpl session;
@@ -562,65 +541,25 @@
key.interestOps( ops & mask );
}
}
-
- /**
- * Configures the number of processors employed.
- * We first check for a system property "mina.IoProcessors". If this
- * property is present and can be interpreted as an integer value greater
- * or equal to 1, this value is used as the number of processors.
- * Otherwise a default of 1 processor is employed.
- *
- * @return The nubmer of processors to employ
- */
- private static int configureProcessorCount()
- {
- int processors = DEFAULT_PROCESSORS;
- String processorProperty = System.getProperty( PROCESSORS_PROPERTY );
- if ( processorProperty != null )
- {
- try
- {
- processors = Integer.parseInt( processorProperty );
- }
- catch ( NumberFormatException e )
- {
- ExceptionMonitor.getInstance().exceptionCaught( e );
- }
- processors = Math.max( processors, 1 );
-
- System.setProperty( PROCESSORS_PROPERTY, String.valueOf( processors ) );
- }
- return processors;
- }
-
- private static SocketIoProcessor[] createProcessors()
- {
- SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ];
- for ( int i = 0; i < PROCESSOR_COUNT; i ++ )
- {
- processors[i] = new SocketIoProcessor( THREAD_PREFIX + i );
- }
- return processors;
- }
-
+
private class Worker extends Thread
{
- public Worker()
+ Worker()
{
super( SocketIoProcessor.this.threadName );
}
public void run()
{
- for( ;; )
+ for( ; ; )
{
try
{
int nKeys = selector.select( 1000 );
doAddNew();
doUpdateTrafficMask();
-
+
if( nKeys > 0 )
{
process( selector.selectedKeys() );
@@ -632,12 +571,12 @@
if( selector.keys().isEmpty() )
{
- synchronized( SocketIoProcessor.this )
+ synchronized( lock )
{
- if( selector.keys().isEmpty() &&
- newSessions.isEmpty() )
+ if( selector.keys().isEmpty() && newSessions.isEmpty() )
{
worker = null;
+
try
{
selector.close();
@@ -650,6 +589,7 @@
{
selector = null;
}
+
break;
}
}
@@ -665,10 +605,11 @@
}
catch( InterruptedException e1 )
{
+ ExceptionMonitor.getInstance().exceptionCaught( e1 );
}
}
}
}
}
-
+
}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java Fri Apr 28 16:36:27 2006
@@ -16,17 +16,15 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketException;
+package org.apache.mina.transport.socket.nio;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnectorConfig;
import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
/**
* An {@link IoConnectorConfig} for {@link SocketConnector}.
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Apr 28 16:36:27 2006
@@ -16,14 +16,9 @@
* limitations under the License.
*
*/
-package org.apache.mina.transport.socket.nio.support;
-
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
+package org.apache.mina.transport.socket.nio;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -31,15 +26,19 @@
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.Queue;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
/**
* An {@link IoSession} for socket transport (TCP/IP).
- *
+ *
* @author The Apache Directory Project (mina-dev@directory.apache.org)
* @version $Rev$, $Date$
*/
@@ -55,22 +54,24 @@
private final SocketAddress remoteAddress;
private final SocketAddress localAddress;
private final SocketAddress serviceAddress;
- private final Set managedSessions;
+ private final Set managedSessions;
private SelectionKey key;
private int readBufferSize;
/**
* Creates a new instance.
*/
- public SocketSessionImpl(
- IoService manager, Set managedSessions,
- IoSessionConfig config,
- SocketChannel ch, IoHandler defaultHandler,
- SocketAddress serviceAddress )
+ SocketSessionImpl( IoService manager,
+ SocketIoProcessor ioProcessor,
+ Set managedSessions,
+ IoSessionConfig config,
+ SocketChannel ch,
+ IoHandler defaultHandler,
+ SocketAddress serviceAddress )
{
this.manager = manager;
this.managedSessions = managedSessions;
- this.ioProcessor = SocketIoProcessor.getInstance();
+ this.ioProcessor = ioProcessor;
this.filterChain = new SocketFilterChain( this );
this.ch = ch;
this.writeRequestQueue = new Queue();
@@ -78,7 +79,7 @@
this.remoteAddress = ch.socket().getRemoteSocketAddress();
this.localAddress = ch.socket().getLocalSocketAddress();
this.serviceAddress = serviceAddress;
-
+
// Apply the initial session settings
if( config instanceof SocketSessionConfig )
{
@@ -98,22 +99,22 @@
}
}
}
-
+
public IoService getService()
{
return manager;
}
-
+
public IoSessionConfig getConfig()
{
return config;
}
-
+
SocketIoProcessor getIoProcessor()
{
return ioProcessor;
}
-
+
public IoFilterChain getFilterChain()
{
return filterChain;
@@ -143,12 +144,12 @@
{
return handler;
}
-
+
protected void close0()
{
filterChain.filterClose( this );
}
-
+
Queue getWriteRequestQueue()
{
return writeRequestQueue;
@@ -169,7 +170,7 @@
return writeRequestQueue.byteSize();
}
}
-
+
protected void write0( WriteRequest writeRequest )
{
filterChain.filterWrite( this, writeRequest );
@@ -189,17 +190,17 @@
{
return localAddress;
}
-
+
public SocketAddress getServiceAddress()
{
return serviceAddress;
}
-
+
protected void updateTrafficMask()
{
this.ioProcessor.updateTrafficMask( this );
}
-
+
int getReadBufferSize()
{
return readBufferSize;
@@ -218,7 +219,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setKeepAlive( boolean on )
{
try
@@ -230,7 +231,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public boolean isOobInline()
{
try
@@ -242,7 +243,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setOobInline( boolean on )
{
try
@@ -254,7 +255,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public boolean isReuseAddress()
{
try
@@ -266,7 +267,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setReuseAddress( boolean on )
{
try
@@ -278,7 +279,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public int getSoLinger()
{
try
@@ -290,7 +291,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setSoLinger( int linger )
{
try
@@ -309,7 +310,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public boolean isTcpNoDelay()
{
try
@@ -321,7 +322,7 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setTcpNoDelay( boolean on )
{
try
@@ -333,41 +334,41 @@
throw new RuntimeIOException( e );
}
}
-
+
public int getTrafficClass()
{
- if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
- {
- try
- {
- return ch.socket().getTrafficClass();
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
- else
- {
- return 0;
- }
+ if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
+ {
+ try
+ {
+ return ch.socket().getTrafficClass();
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
+ else
+ {
+ return 0;
+ }
}
-
+
public void setTrafficClass( int tc )
{
- if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
- {
- try
- {
- ch.socket().setTrafficClass( tc );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
+ if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+ {
+ try
+ {
+ ch.socket().setTrafficClass( tc );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
}
-
+
public int getSendBufferSize()
{
try
@@ -379,22 +380,22 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setSendBufferSize( int size )
{
- if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
- {
- try
- {
- ch.socket().setSendBufferSize( size );
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
+ if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setSendBufferSize( size );
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
}
-
+
public int getReceiveBufferSize()
{
try
@@ -406,21 +407,21 @@
throw new RuntimeIOException( e );
}
}
-
+
public void setReceiveBufferSize( int size )
{
- if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
- {
- try
- {
- ch.socket().setReceiveBufferSize( size );
- SocketSessionImpl.this.readBufferSize = size;
- }
- catch( SocketException e )
- {
- throw new RuntimeIOException( e );
- }
- }
+ if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
+ {
+ try
+ {
+ ch.socket().setReceiveBufferSize( size );
+ SocketSessionImpl.this.readBufferSize = size;
+ }
+ catch( SocketException e )
+ {
+ throw new RuntimeIOException( e );
+ }
+ }
}
}
}
Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html?rev=398039&r1=398038&r2=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html Fri Apr 28 16:36:27 2006
@@ -3,16 +3,14 @@
<head>
</head>
<body>
-Socket (TCP/IP) and Datagram (UDP/IP) support based on Java <a href="http://java.sun.com/j2se/1.5.0/docs/guide/nio/">NIO (New I/O) API</a>.
+Socket (TCP/IP) and Datagram (UDP/IP) support based on Java
+<a href="http://java.sun.com/j2se/1.5.0/docs/guide/nio/">NIO (New I/O) API</a>.
<h3>Configuring the number of NIO selector loops</h3>
<p>
You can specify the number of Socket I/O thread to utilize multi-processors
-efficiently, using a Java system property, '<tt>mina.socket.processors':
-<pre>
-java -Dmina.socket.processors=2 com.example.mina.Main
-</pre>
-The default number of processors is '<tt>1</tt>'.
+efficiently by specifying the number of processing threads in the constructor. The default is <tt>1</tt>
+</p>
</body>
</html>