You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/06/27 17:55:38 UTC
svn commit: r551212 - in /mina/sandbox/jvermillard/apr/src:
main/java/org/apache/mina/transport/apr/ test/ test/java/ test/java/org/
test/java/org/apache/ test/java/org/apache/mina/
test/java/org/apache/mina/transport/ test/java/org/apache/mina/transpo...
Author: jvermillard
Date: Wed Jun 27 08:55:37 2007
New Revision: 551212
URL: http://svn.apache.org/viewvc?view=rev&rev=551212
Log:
start to work APR connector
Added:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/DefaultAPRSessionConfig.java
mina/sandbox/jvermillard/apr/src/test/
mina/sandbox/jvermillard/apr/src/test/java/
mina/sandbox/jvermillard/apr/src/test/java/org/
mina/sandbox/jvermillard/apr/src/test/java/org/apache/
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?view=diff&rev=551212&r1=551211&r2=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java Wed Jun 27 08:55:37 2007
@@ -16,131 +16,131 @@
import org.apache.tomcat.jni.Pool;
import org.apache.tomcat.jni.Socket;
-public class APRConnector extends BaseIoConnector {
+public class APRConnector extends BaseIoConnector {
- /**
- * @noinspection StaticNonFinalField
- */
- private static volatile int nextId = 0;
-
- private final Object lock = new Object();
- private final int id = nextId++;
- //private final String threadName = "APRConnector-" + id;
- private final int processorCount;
- private final Executor executor;
- private final APRIoProcessor[] ioProcessors;
- private int processorDistributor = 0;
-
- // APR memory pool (package wide mother pool)
- static long pool = -1;
-
- /**
- * Create a connector with a single processing thread using a NewThreadExecutor
- */
- public APRConnector()
- {
- this( 1, new NewThreadExecutor() );
- }
-
- /**
- * Create a connector with the desired number of processing threads
- *
- * @param processorCount Number of processing threads
- * @param executor Executor to use for launching threads
- */
- public APRConnector( int processorCount, Executor executor )
- {
- super( null ); // TODO : DEFAULT CONFIG
- if( processorCount < 1 )
- {
- throw new IllegalArgumentException( "Must have at least one processor" );
- }
-
- if(pool==-1)
- pool = Pool.create(0);
-
- this.executor = executor;
- this.processorCount = processorCount;
- ioProcessors = new APRIoProcessor[processorCount];
-
- for( int i = 0; i < processorCount; i++ )
- {
- ioProcessors[i] = new APRIoProcessor( "APRConnectorIoProcessor-" + id + "." + i, executor );
- }
- }
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+
+ private final int id = nextId++;
+
+ // private final String threadName = "APRConnector-" + id;
+ private final int processorCount;
+
+ private final Executor executor;
+
+ private final APRIoProcessor[] ioProcessors;
+
+ private int processorDistributor = 0;
+
+ // APR memory pool (package wide mother pool)
+ static long pool = -1;
+
+ /**
+ * Create a connector with a single processing thread using a
+ * NewThreadExecutor
+ */
+ public APRConnector() {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount
+ * Number of processing threads
+ * @param executor
+ * Executor to use for launching threads
+ */
+ public APRConnector(int processorCount, Executor executor) {
+ super(new DefaultAPRSessionConfig());
+ if (processorCount < 1) {
+ throw new IllegalArgumentException(
+ "Must have at least one processor");
+ }
+
+ if (pool == -1)
+ pool = Pool.create(0);
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new APRIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++) {
+ ioProcessors[i] = new APRIoProcessor("APRConnectorIoProcessor-"
+ + id + "." + i, executor);
+ }
+ }
-
-
@Override
- protected ConnectFuture doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
- boolean success = false;
- try
- {
- InetSocketAddress sockAddr=(InetSocketAddress)remoteAddress;
- pool = Pool.create(pool);
- long inetAddr=0;
- inetAddr = Address.info( sockAddr.getHostName(), Socket.APR_INET,
- sockAddr.getPort(), 0, pool);
-
- // FIXME : type of socket need to be configurable
-
- long clientSock=Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
-
-
- // TODO: error checking ???
- int ret=Socket.connect(clientSock, inetAddr);
- System.err.println("Socket.connect : "+ret);
- if( localAddress != null ) {
- // TODO, check if it's possible to bind to a local address
- }
-
- ConnectFuture future = new DefaultConnectFuture();
- APRSessionImpl session=new APRSessionImpl(this,nextProcessor(),clientSock);
-
- try
- {
- getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
- }
- catch( Throwable e )
- {
- throw ( IOException ) new IOException( "Failed to create a session." ).initCause( e );
- }
-
- // Set the ConnectFuture of the specified session, which will be
- // removed and notified by AbstractIoFilterChain eventually.
- session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future );
-
- // Forward the remaining process to the SocketIoProcessor.
- session.getIoProcessor().addNew( session );
- future.setSession(session);
-
- success = true;
- return future;
- }
- catch( Exception e )
- {
- return DefaultConnectFuture.newFailedFuture( e );
- }
+ protected ConnectFuture doConnect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+ boolean success = false;
+ try {
+ InetSocketAddress sockAddr = (InetSocketAddress) remoteAddress;
+ pool = Pool.create(pool);
+ long inetAddr = 0;
+ inetAddr = Address.info(sockAddr.getHostName(), Socket.APR_INET,
+ sockAddr.getPort(), 0, pool);
+
+ // FIXME : type of socket need to be configurable
+
+ long clientSock = Socket.create(Socket.APR_INET,
+ Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
+ System.err.println("Socket.create(..) : "+clientSock);
+
+ // TODO: error checking ???
+ int ret = Socket.connect(clientSock, inetAddr);
+ System.err.println("Socket.connect : " + ret);
+ if (localAddress != null) {
+ // TODO, check if it's possible to bind to a local address
+ }
+
+ ConnectFuture future = new DefaultConnectFuture();
+ APRSessionImpl session = new APRSessionImpl(this, nextProcessor(),
+ clientSock,sockAddr,(InetSocketAddress)localAddress);
+
+ try {
+ getFilterChainBuilder().buildFilterChain(
+ session.getFilterChain());
+ } catch (Throwable e) {
+ throw (IOException) new IOException(
+ "Failed to create a session.").initCause(e);
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+ session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, future);
+
+ // Forward the remaining process to the APRIoProcessor.
+ session.getIoProcessor().addNew(session);
+ future.setSession(session);
+
+ success = true;
+ return future;
+ } catch (Exception e) {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
}
public TransportType getTransportType() {
return APRTransportType.APR_SOCKET;
}
-
- @Override
- protected IoServiceListenerSupport getListeners()
- {
- return super.getListeners();
- }
-
- private APRIoProcessor nextProcessor()
- {
- if ( processorDistributor++ < 0 )
- {
- processorDistributor = 0;
- }
- return ioProcessors[processorDistributor % processorCount];
- }
+ @Override
+ protected IoServiceListenerSupport getListeners() {
+ return super.getListeners();
+ }
+
+ private APRIoProcessor nextProcessor() {
+ if (processorDistributor++ < 0) {
+ processorDistributor = 0;
+ }
+
+ return ioProcessors[processorDistributor % processorCount];
+ }
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java?view=diff&rev=551212&r1=551211&r2=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java Wed Jun 27 08:55:37 2007
@@ -10,35 +10,33 @@
public class APRFilterChain extends AbstractIoFilterChain {
- APRFilterChain( IoSession parent )
- {
- super( parent );
- }
+ APRFilterChain(IoSession parent) {
+ super(parent);
+ }
- @Override
- protected void doWrite( IoSession session, WriteRequest writeRequest )
- {
- APRSessionImpl s = ( APRSessionImpl ) session;
- Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
-
- // SocketIoProcessor.doFlush() will reset it after write is finished
- // because the buffer will be passed with messageSent event.
- ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
- synchronized( writeRequestQueue )
- {
- writeRequestQueue.offer( writeRequest );
- if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
- {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
- s.getIoProcessor().flush( s );
- }
- }
- }
+ @Override
+ protected void doWrite(IoSession session, WriteRequest writeRequest) {
+ System.err.println("dowrite ????");
+ APRSessionImpl s = (APRSessionImpl) session;
+ Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
- @Override
- protected void doClose( IoSession session ) throws IOException
- {
- APRSessionImpl s = ( APRSessionImpl ) session;
- s.getIoProcessor().remove( s );
- }
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ((ByteBuffer) writeRequest.getMessage()).mark();
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.offer(writeRequest);
+ if (writeRequestQueue.size() == 1
+ && session.getTrafficMask().isWritable()) {
+ // Notify SocketIoProcessor only when writeRequestQueue was
+ // empty.
+ s.getIoProcessor().flush(s);
+ }
+ }
+ }
+
+ @Override
+ protected void doClose(IoSession session) throws IOException {
+ APRSessionImpl s = (APRSessionImpl) session;
+ s.getIoProcessor().remove(s);
+ }
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=551212&r1=551211&r2=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Wed Jun 27 08:55:37 2007
@@ -115,6 +115,8 @@
// polling the socket for write and read
// FIXME : perhaps we should oll write only if needed for save CPU, but actually it's too complex for me :)
+ System.err.println("pollset : "+pollset);
+ System.err.println("Socket : "+session.getAPRSocket());
int rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
| Poll.APR_POLLOUT);
if (rv == Status.APR_SUCCESS) {
@@ -152,7 +154,8 @@
private void read(APRSessionImpl session) {
byte[] buf = session.getReadBuffer();
- int bytes = Socket.recv(session.getAPRSocket(), buf, 0, -1);
+ // FIXME : hardcoded read value for testing
+ int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
if (bytes > 0) {
ByteBuffer bbuf = ByteBuffer.allocate(bytes);
bbuf.put(buf, 0, bytes);
@@ -166,10 +169,12 @@
}
private void write(APRSessionImpl session) {
+ //System.err.println("Do write");
if (session.getWriteRequestQueue().size() <= 0)
return;
+ System.err.println("Ok something in the queue");
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
+
for (;;) {
WriteRequest req;
@@ -292,7 +297,7 @@
if (rv > 0) {
for (int n = 0; n < rv; n++) {
long clientSock = desc[n * 2 + 1];
- logger.debug("Poll flags " + desc[n * 2]);
+ //logger.debug("Poll flags " + desc[n * 2]);
APRSessionImpl session = managedSessions
.get(clientSock);
@@ -302,9 +307,9 @@
continue;
}
- if (desc[n * 2] == Poll.APR_POLLIN)
+ if ( (desc[n * 2] & Poll.APR_POLLIN) >0 )
read(session);
- if (desc[n * 2] == Poll.APR_POLLOUT)
+ if ( (desc[n * 2] & Poll.APR_POLLOUT) >0 )
write(session);
}
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java?view=diff&rev=551212&r1=551211&r2=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java Wed Jun 27 08:55:37 2007
@@ -5,8 +5,11 @@
import org.apache.mina.common.IoSession;
public interface APRSession extends IoSession {
- APRSessionConfig getConfig();
- InetSocketAddress getRemoteAddress();
- InetSocketAddress getLocalAddress();
- InetSocketAddress getServiceAddress();
+ APRSessionConfig getConfig();
+
+ InetSocketAddress getRemoteAddress();
+
+ InetSocketAddress getLocalAddress();
+
+ InetSocketAddress getServiceAddress();
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?view=diff&rev=551212&r1=551211&r2=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Wed Jun 27 08:55:37 2007
@@ -17,34 +17,41 @@
public class APRSessionImpl extends BaseIoSession implements APRSession {
private long socket;
- private final IoService service;
- private final APRSessionConfig config = new APRSessionConfigImpl();
- private final APRIoProcessor ioProcessor;
- private final APRFilterChain filterChain;
- private final Queue<WriteRequest> writeRequestQueue;
- private final IoHandler handler;
- private byte[] readBuffer;
- private int readBufferSize;
- private InetSocketAddress remoteAddress;
- private InetSocketAddress localAddress;
- /**
- * Creates a new instance.
- */
- APRSessionImpl(IoService service, APRIoProcessor ioProcessor, long socket) {
- this.service = service;
- this.ioProcessor = ioProcessor;
- this.filterChain = new APRFilterChain( this );
- this.writeRequestQueue = new LinkedList<WriteRequest>();
- this.handler = service.getHandler();
-
- }
-
- void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
- this.remoteAddress=(InetSocketAddress)remoteAddress;
- this.localAddress=(InetSocketAddress)localAddress;
- // TODO : connect
+ private final IoService service;
+
+ private final APRSessionConfig config = new APRSessionConfigImpl();
+
+ private final APRIoProcessor ioProcessor;
+
+ private final APRFilterChain filterChain;
+
+ private final Queue<WriteRequest> writeRequestQueue;
+
+ private final IoHandler handler;
+
+ private byte[] readBuffer=new byte[1024]; //FIXME : fixed rcvd buffer, need to change that to a config value
+
+ private int readBufferSize;
+
+ private final InetSocketAddress remoteAddress;
+
+ private final InetSocketAddress localAddress;
+
+ /**
+ * Creates a new instance.
+ */
+ APRSessionImpl(IoService service, APRIoProcessor ioProcessor, long socket,
+ InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
+ this.service = service;
+ this.ioProcessor = ioProcessor;
+ this.filterChain = new APRFilterChain(this);
+ this.writeRequestQueue = new LinkedList<WriteRequest>();
+ this.handler = service.getHandler();
+ this.remoteAddress = remoteAddress;
+ this.localAddress = localAddress;
+ this.socket=socket;
}
-
+
long getAPRSocket() {
return socket;
}
@@ -80,33 +87,27 @@
}
public IoHandler getHandler() {
- return service.getHandler();
+ return handler;
}
- public int getScheduledWriteMessages()
- {
- synchronized( writeRequestQueue )
- {
- return writeRequestQueue.size();
- }
- }
-
- public int getScheduledWriteBytes()
- {
- int size = 0;
- synchronized( writeRequestQueue )
- {
- for( Object o: writeRequestQueue )
- {
- if( o instanceof ByteBuffer )
- {
- size += ( ( ByteBuffer ) o ).remaining();
- }
- }
- }
-
- return size;
- }
+ public int getScheduledWriteMessages() {
+ synchronized (writeRequestQueue) {
+ return writeRequestQueue.size();
+ }
+ }
+
+ public int getScheduledWriteBytes() {
+ int size = 0;
+ synchronized (writeRequestQueue) {
+ for (Object o : writeRequestQueue) {
+ if (o instanceof ByteBuffer) {
+ size += ((ByteBuffer) o).remaining();
+ }
+ }
+ }
+
+ return size;
+ }
public IoService getService() {
return service;
@@ -116,17 +117,17 @@
return APRTransportType.APR_SOCKET;
}
- APRIoProcessor getIoProcessor()
- {
- return ioProcessor;
- }
-
- @Override
- public InetSocketAddress getServiceAddress() {
- return (InetSocketAddress) super.getServiceAddress();
- }
-
- private class APRSessionConfigImpl extends BaseIoSessionConfig implements APRSessionConfig {
-
- }
+ APRIoProcessor getIoProcessor() {
+ return ioProcessor;
+ }
+
+ @Override
+ public InetSocketAddress getServiceAddress() {
+ return (InetSocketAddress) super.getServiceAddress();
+ }
+
+ private class APRSessionConfigImpl extends BaseIoSessionConfig implements
+ APRSessionConfig {
+
+ }
}
Added: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/DefaultAPRSessionConfig.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/DefaultAPRSessionConfig.java?view=auto&rev=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/DefaultAPRSessionConfig.java (added)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/DefaultAPRSessionConfig.java Wed Jun 27 08:55:37 2007
@@ -0,0 +1,7 @@
+package org.apache.mina.transport.apr;
+
+import org.apache.mina.common.support.BaseIoSessionConfig;
+
+public class DefaultAPRSessionConfig extends BaseIoSessionConfig implements APRSessionConfig {
+
+}
Added: mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java?view=auto&rev=551212
==============================================================================
--- mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java (added)
+++ mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java Wed Jun 27 08:55:37 2007
@@ -0,0 +1,67 @@
+package org.apache.mina.transport.apr;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.tomcat.jni.Library;
+
+public class TestCnx extends TestCase {
+
+ public void testCnx() throws Exception {
+ BasicConfigurator.configure();
+ Library.initialize(null);
+ APRConnector cnx=new APRConnector();
+ cnx.setHandler(new IoHandler(){
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ System.err.println("Exception : "+cause);
+ cause.printStackTrace();
+
+ }
+
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ System.err.println("Rcvd : "+message);
+
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception {
+ System.err.println("Sent : "+message);
+
+ }
+
+ public void sessionClosed(IoSession session) throws Exception {
+ System.err.println("Session closed");
+
+ }
+
+ public void sessionCreated(IoSession session) throws Exception {
+ System.err.println("sesssion created");
+
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void sessionOpened(IoSession session) throws Exception {
+ System.err.println("sesssion opened");
+ }});
+ ConnectFuture f=cnx.connect(new InetSocketAddress("towel.blinkenlights.nl",23));
+ f.awaitUninterruptibly();
+
+ assertTrue(f.getSession().isConnected());
+ //System.err.println("writing hello");
+ //f.getSession().write( ByteBuffer.wrap("HELLO\n".getBytes()) );
+
+ Thread.sleep(4000);
+ System.err.println("Done");
+ }
+}