You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/09/18 18:31:54 UTC
svn commit: r576972 - in
/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr:
APRConnector.java APRIoProcessor.java APRSessionImpl.java
Author: jvermillard
Date: Tue Sep 18 09:31:49 2007
New Revision: 576972
URL: http://svn.apache.org/viewvc?rev=576972&view=rev
Log:
APRconnector compiling now, but doesn't work for now
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?rev=576972&r1=576971&r2=576972&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java Tue Sep 18 09:31:49 2007
@@ -34,6 +34,7 @@
import org.apache.mina.util.NewThreadExecutor;
import org.apache.tomcat.jni.Address;
import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
/**
* An {@link IoConnector} implementation using the Apache Portable Runtime
@@ -128,13 +129,17 @@
// FIXME : error checking
int ret = Socket.connect(clientSock, inetAddr);
- System.err.println("Socket.connect : " + ret);
+ if(ret!=Status.APR_SUCCESS)
+ System.err.println("Error Socket.connect : " + ret);
+
if (localAddress != null) {
// TODO, check if it's possible to bind to a local address
}
ConnectFuture future = new DefaultConnectFuture();
- APRSessionImpl session = new APRSessionImpl(this, nextProcessor(),
+ APRIoProcessor proc=nextProcessor();
+ System.err.println("proc : "+proc);
+ APRSessionImpl session = new APRSessionImpl(this,proc ,
clientSock, sockAddr, (InetSocketAddress) localAddress);
try {
@@ -151,7 +156,7 @@
// Forward the remaining process to the APRIoProcessor.
// it's will validate the COnnectFuture when the session is in the poll set
- session.getIoProcessor().addNew(session);
+ session.getIoProcessor().add(session);
success = true;
return future;
@@ -166,11 +171,10 @@
}
private APRIoProcessor nextProcessor() {
- if (processorDistributor++ < 0) {
- processorDistributor = 0;
+ if (this.processorDistributor == Integer.MAX_VALUE) {
+ this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
}
-
- return ioProcessors[processorDistributor % processorCount];
+ return ioProcessors[processorDistributor++ % processorCount];
}
public TransportMetadata getTransportMetadata() {
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?rev=576972&r1=576971&r2=576972&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Tue Sep 18 09:31:49 2007
@@ -19,10 +19,13 @@
*/
package org.apache.mina.transport.apr;
+import java.nio.channels.SelectionKey;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@@ -52,17 +55,63 @@
class APRIoProcessor extends AbstractIoProcessor {
+ protected static class IoSessionIterator implements Iterator<AbstractIoSession> {
+ private final Iterator<APRSessionImpl> i;
+ private IoSessionIterator(Collection<APRSessionImpl> sessions) {
+ i = sessions.iterator();
+ }
+ public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ public AbstractIoSession next() {
+ APRSessionImpl sess = i.next();
+ return (AbstractIoSession) sess;
+ }
+
+ public void remove() {
+ i.remove();
+ }
+ }
+
+ protected class PollSetIterator implements Iterator<AbstractIoSession> {
+ private long[] pollResult;
+
+ int index=0;
+ public PollSetIterator(long[] pollResult) {
+ this.pollResult=pollResult;
+ }
+
+ public boolean hasNext() {
+ return index*2< pollResult.length;
+ }
+
+ public AbstractIoSession next() {
+
+ AbstractIoSession sess=managedSessions.get(pollResult[index*2]);
+ index++;
+ return sess;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+ }
+
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Object lock = new Object();
private long pool = 0; // memory pool
+// private Worker worker;
+
private long pollset = 0; // socket poller
+
private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
-// private Worker worker;
+ private long[] pollResult;
APRIoProcessor(String threadName, Executor executor) {
super(threadName,executor);
@@ -85,15 +134,17 @@
}
}
-
@Override
protected Iterator<AbstractIoSession> allSessions() throws Exception {
+
// TODO Auto-generated method stub
- return null;
+ return new IoSessionIterator(managedSessions.values());
}
+
@Override
protected void doAdd(IoSession sess) throws Exception {
+ System.err.println("doAdd");
APRSessionImpl session = (APRSessionImpl) sess;
int rv;
rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
@@ -103,97 +154,20 @@
} else
throw new RuntimeException("APR error while Poll.add(..) : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
}
-
+
@Override
protected void doRemove(IoSession session) throws Exception {
+ System.err.println("doRemove");
remove(session); // will schedule it
}
@Override
- protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
- APRSessionImpl session=(APRSessionImpl)sess;
-
- byte[] buf = session.getReadBuffer();
- // FIXME : hardcoded read value for testing
- int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
- if (bytes > 0) {
- buffer.put(buf);
- } else if (bytes < 0) {
- logger.debug("Read {} bytes, need closing ?", bytes);
- return -1;
- }
- return bytes;
- }
-
-
- private long[] pollResult;
-
- @Override
- protected boolean select(int timeout) throws Exception {
- // poll the socket descriptors
- /* is it OK ? : Two times size of the created pollset */
- long[] pollResult = new long[managedSessions.size() * 2];
-
- int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
- if (rv > 0) {
- return true;
- } else if(rv<0) {
- if(rv!=-100002) { // timeout ( FIXME : can't find the good constant in APR)
- System.err.println("APR Poll error : "+Error.strerror(-1*rv)+" "+rv);
- throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
- }
- }
- return false;
- }
-
- @Override
- protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
- return new PollSetIterator(pollResult);
- }
-
- @Override
- protected SessionState state(IoSession session) {
- long socket=((APRSessionImpl)session).getAPRSocket();
- if(socket>0)
- return SessionState.OPEN;
- else if(managedSessions.get(socket)!=null)
- return SessionState.PREPARING; // will occur ?
- else
- return SessionState.CLOSED;
- }
-
- @Override
- protected long transferFile(IoSession session, FileRegion region)
- throws Exception {
- throw new UnsupportedOperationException("Not supposed for APR (TODO)");
- }
-
- @Override
- protected void wakeup() {
- // FIXME : is it possible to interrupt a Poll.poll ?
-
+ protected void finalize() throws Throwable {
+ // TODO : necessary I think, need to check APR doc
+ Pool.clear(pool);
}
@Override
- protected int write(IoSession session, ByteBuffer buf) throws Exception {
- // be sure APR_SO_NONBLOCK was set, or it will block
- int toWrite = buf.remaining();
-
- int writtenBytes;
- // APR accept ByteBuffer, only if they are Direct ones, due to native code
- if (buf.isDirect()) {
- writtenBytes = Socket.sendb( ((APRSessionImpl)session).getAPRSocket(), buf.buf(),
- 0, toWrite);
- } else {
- writtenBytes = Socket.send( ((APRSessionImpl)session).getAPRSocket(), buf.array(),
- 0, toWrite);
- // FIXME : kludgy ?
- buf.position(buf.position() + writtenBytes);
- }
- return writtenBytes;
- }
-
- @Override
protected boolean isOpRead(IoSession sess) throws Exception {
APRSessionImpl session=(APRSessionImpl)sess;
long[] descriptors=new long[managedSessions.size()*2];
@@ -246,9 +220,55 @@
}
return false;
}
+
+ @Override
+ protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
+ APRSessionImpl session=(APRSessionImpl)sess;
+
+ byte[] buf = session.getReadBuffer();
+ // FIXME : hardcoded read value for testing
+ int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
+ if (bytes > 0) {
+ buffer.put(buf);
+ } else if (bytes < 0) {
+ logger.debug("Read {} bytes, need closing ?", bytes);
+ return -1;
+ }
+ return bytes;
+ }
+
+ public void remove(IoSession session) {
+ Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
+ Socket.close(((APRSessionImpl)session).getAPRSocket());
+ }
+
+ @Override
+ protected boolean select(int timeout) throws Exception {
+ System.err.println("select");
+ // poll the socket descriptors
+ /* is it OK ? : Two times size of the created pollset */
+ long[] pollResult = new long[managedSessions.size() * 2];
+
+ int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
+ if (rv > 0) {
+ return true;
+ } else if(rv<0) {
+ if(rv!=-120001) { // timeout ( FIXME : can't find the good constant in APR)
+ System.err.println("APR Poll error : "+Error.strerror(-1*rv)+" "+rv);
+ throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
+ }
+ }
+ return false;
+ }
+
+ @Override
+ protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+ return new PollSetIterator(pollResult);
+ }
@Override
protected void setOpRead(IoSession sess, boolean value) throws Exception {
+ System.err.println("setOpRead : "+value);
APRSessionImpl session=(APRSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
@@ -272,6 +292,7 @@
@Override
protected void setOpWrite(IoSession sess, boolean value)
throws Exception {
+ System.err.println("setOpWrite : "+value);
APRSessionImpl session=(APRSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
@@ -292,9 +313,16 @@
}
}
- public void remove(IoSession session) {
- Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
- Socket.close(((APRSessionImpl)session).getAPRSocket());
+ @Override
+ protected SessionState state(IoSession session) {
+ System.err.println("state?");
+ long socket=((APRSessionImpl)session).getAPRSocket();
+ if(socket>0)
+ return SessionState.OPEN;
+ else if(managedSessions.get(socket)!=null)
+ return SessionState.PREPARING; // will occur ?
+ else
+ return SessionState.CLOSED;
}
@@ -370,32 +398,35 @@
@Override
- protected void finalize() throws Throwable {
- // TODO : necessary I think, need to check APR doc
- Pool.clear(pool);
+ protected long transferFile(IoSession session, FileRegion region)
+ throws Exception {
+ throw new UnsupportedOperationException("Not supposed for APR (TODO)");
}
- protected class PollSetIterator implements Iterator<AbstractIoSession> {
- private long[] pollResult;
+ @Override
+ protected void wakeup() {
+ System.err.println("wakeup");
+ // FIXME : is it possible to interrupt a Poll.poll ?
- int index=0;
- public PollSetIterator(long[] pollResult) {
- this.pollResult=pollResult;
- }
-
- public boolean hasNext() {
- return index*2< pollResult.length;
- }
-
- public AbstractIoSession next() {
-
- AbstractIoSession sess=managedSessions.get(pollResult[index*2]);
- index++;
- return sess;
- }
+ }
+
+ @Override
+ protected int write(IoSession session, ByteBuffer buf) throws Exception {
+ System.err.println("write");
+ // be sure APR_SO_NONBLOCK was set, or it will block
+ int toWrite = buf.remaining();
- public void remove() {
- throw new UnsupportedOperationException("remove");
+ int writtenBytes;
+ // APR accept ByteBuffer, only if they are Direct ones, due to native code
+ if (buf.isDirect()) {
+ writtenBytes = Socket.sendb( ((APRSessionImpl)session).getAPRSocket(), buf.buf(),
+ 0, toWrite);
+ } else {
+ writtenBytes = Socket.send( ((APRSessionImpl)session).getAPRSocket(), buf.array(),
+ 0, toWrite);
+ // FIXME : kludgy ?
+ buf.position(buf.position() + writtenBytes);
}
+ return writtenBytes;
}
-}
+}
\ No newline at end of file
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?rev=576972&r1=576971&r2=576972&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Tue Sep 18 09:31:49 2007
@@ -52,8 +52,6 @@
private final IoFilterChain filterChain = new DefaultIoFilterChain(this);
- private final Queue<WriteRequest> writeRequestQueue;
-
private final IoHandler handler;
private byte[] readBuffer = new byte[1024]; //FIXME : fixed rcvd buffer, need to change that to a config value
@@ -73,7 +71,6 @@
InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
this.service = service;
this.ioProcessor = ioProcessor;
- this.writeRequestQueue = new LinkedList<WriteRequest>();
this.handler = service.getHandler();
this.remoteAddress = remoteAddress;
this.localAddress = localAddress;
@@ -225,7 +222,6 @@
@Override
protected IoProcessor getProcessor() {
- // TODO Auto-generated method stub
- return null;
+ return ioProcessor;
}
}