You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/09/19 11:52:28 UTC
svn commit: r577220 - in
/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr:
APRConnector.java APRIoProcessor.java APRSessionImpl.java
Author: jvermillard
Date: Wed Sep 19 02:52:27 2007
New Revision: 577220
URL: http://svn.apache.org/viewvc?rev=577220&view=rev
Log:
Working APR, just miss wakeup();
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?rev=577220&r1=577219&r2=577220&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java Wed Sep 19 02:52:27 2007
@@ -127,11 +127,17 @@
protocol.socketType, protocol.codeProto, APRLibrary
.getLibrary().getPool());
+
// FIXME : error checking
int ret = Socket.connect(clientSock, inetAddr);
if(ret!=Status.APR_SUCCESS)
System.err.println("Error Socket.connect : " + ret);
+ ret=Socket.optSet(clientSock, Socket.APR_SO_NONBLOCK, 1);
+
+ if(ret!=Status.APR_SUCCESS)
+ System.err.println("Error Socket.optSet : " + ret);
+
if (localAddress != null) {
// TODO, check if it's possible to bind to a local address
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?rev=577220&r1=577219&r2=577220&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Wed Sep 19 02:52:27 2007
@@ -19,21 +19,15 @@
*/
package org.apache.mina.transport.apr;
-import java.nio.channels.SelectionKey;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoProcessor;
import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.FileRegion;
import org.apache.mina.common.IoSession;
import org.apache.tomcat.jni.Error;
@@ -44,8 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.sun.org.apache.bcel.internal.generic.GETSTATIC;
-
/**
* The class in charge of processing socket level IO events for the {@link APRConnector}
*
@@ -87,28 +79,23 @@
}
public AbstractIoSession next() {
-
- AbstractIoSession sess=managedSessions.get(pollResult[index*2]);
+ AbstractIoSession sess=managedSessions.get(pollResult[index*2+1]);
index++;
+ System.err.println("sess : "+((APRSessionImpl)sess).getAPRSocket());
return sess;
}
public void remove() {
- throw new UnsupportedOperationException("remove");
+ //throw new UnsupportedOperationException("remove");
}
}
private final Logger logger = LoggerFactory.getLogger(getClass());
- private final Object lock = new Object();
-
private long pool = 0; // memory pool
-// private Worker worker;
-
private long pollset = 0; // socket poller
-
private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
private long[] pollResult;
@@ -144,12 +131,13 @@
@Override
protected void doAdd(IoSession sess) throws Exception {
- System.err.println("doAdd");
+ logger.debug("doAdd");
APRSessionImpl session = (APRSessionImpl) sess;
int rv;
rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
if (rv == Status.APR_SUCCESS) {
- System.out.println("Added session to pollset");
+ logger.debug("sesion added to pollset");
+ session.setOpRead(true);
managedSessions.put(session.getAPRSocket(), session);
} else
throw new RuntimeException("APR error while Poll.add(..) : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
@@ -157,100 +145,104 @@
@Override
protected void doRemove(IoSession session) throws Exception {
- System.err.println("doRemove");
- remove(session); // will schedule it
+ logger.debug("doRemove");
+ int ret=Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
+ if(ret!=Status.APR_SUCCESS) {
+ logger.error("removing of pollset error");
+ }
+ ret=Socket.close(((APRSessionImpl)session).getAPRSocket());
+ if(ret!=Status.APR_SUCCESS) {
+ logger.error("closing socket error");
+ }
+
}
@Override
protected void finalize() throws Throwable {
// TODO : necessary I think, need to check APR doc
+ logger.debug("finalize, freeing the pool");
Pool.clear(pool);
}
@Override
protected boolean isOpRead(IoSession sess) throws Exception {
+ logger.debug("isOpRead?");
APRSessionImpl session=(APRSessionImpl)sess;
- long[] descriptors=new long[managedSessions.size()*2];
- Poll.pollset(pollset, descriptors);
- for(int i=0;i<managedSessions.size();i++) {
- if(descriptors[i*2+1]== session.getAPRSocket()) {
- return (descriptors[i*2] & Poll.APR_POLLOUT) >0;
- }
- }
- return false;
+ logger.debug("isOpRead : "+session.isOpRead());
+ return session.isOpRead();
}
@Override
protected boolean isOpWrite(IoSession sess) throws Exception {
+ logger.debug("isOpWrite?");
APRSessionImpl session=(APRSessionImpl)sess;
- long[] descriptors=new long[managedSessions.size()*2];
- Poll.pollset(pollset, descriptors);
- for(int i=0;i<managedSessions.size();i++) {
- if(descriptors[i*2+1]== session.getAPRSocket()) {
- return (descriptors[i*2] & Poll.APR_POLLIN) >0;
- }
- }
- return false;
+ logger.debug("isOpWrite : "+session.isOpWrite());
+ return session.isOpWrite();
}
@Override
protected boolean isReadable(IoSession session) throws Exception {
+ logger.debug("isReadable?");
long socket=((APRSessionImpl)session).getAPRSocket();
for(int i=0;i<pollResult.length/2;i++) {
- if(pollResult[i+1]==socket) {
- if( (pollResult[i]&Poll.APR_POLLIN) >0 )
+ if(pollResult[i*2+1]==socket) {
+ if( (pollResult[i*2]&Poll.APR_POLLIN) >0 ) {
+ logger.debug("isReadable : true");
return true;
- else
+ } else {
+ logger.debug("isReadable : false");
return false;
+ }
}
}
+ logger.debug("isReadable : false (socket not found)");
return false;
}
@Override
protected boolean isWritable(IoSession session) throws Exception {
+ logger.debug("isWritable?");
long socket=((APRSessionImpl)session).getAPRSocket();
for(int i=0;i<pollResult.length/2;i++) {
- if(pollResult[i+1]==socket) {
- if( (pollResult[i]&Poll.APR_POLLOUT) >0 )
+ if(pollResult[i*2+1]==socket) {
+ if( (pollResult[i*2]&Poll.APR_POLLOUT) >0 ) {
+ logger.debug("isWritable : true");
return true;
- else
+ } else {
+ logger.debug("isWritable : false");
return false;
+ }
}
}
+ logger.debug("isWritable : false (socket not found)");
return false;
}
@Override
protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
+ logger.debug("read");
APRSessionImpl session=(APRSessionImpl)sess;
byte[] buf = session.getReadBuffer();
// FIXME : hardcoded read value for testing
int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
+ logger.debug("read bytes : "+bytes);
if (bytes > 0) {
buffer.put(buf);
- } else if (bytes < 0) {
- logger.debug("Read {} bytes, need closing ?", bytes);
- return -1;
}
return bytes;
}
- public void remove(IoSession session) {
- Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
- Socket.close(((APRSessionImpl)session).getAPRSocket());
- }
-
@Override
protected boolean select(int timeout) throws Exception {
- System.err.println("select");
+ logger.debug("select?");
// poll the socket descriptors
/* is it OK ? : Two times size of the created pollset */
- long[] pollResult = new long[managedSessions.size() * 2];
+ pollResult = new long[managedSessions.size() * 2];
int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
if (rv > 0) {
+ logger.debug("select : true");
return true;
} else if(rv<0) {
if(rv!=-120001) { // timeout ( FIXME : can't find the good constant in APR)
@@ -258,6 +250,7 @@
throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
}
}
+ logger.debug("select : false");
return false;
}
@@ -268,54 +261,50 @@
@Override
protected void setOpRead(IoSession sess, boolean value) throws Exception {
- System.err.println("setOpRead : "+value);
+ logger.debug("setOpRead : "+value);
APRSessionImpl session=(APRSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
System.err.println("poll.remove Error : " + Error.strerror(rv));
}
- boolean write=isOpWrite(sess);
- if(write)
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
- | Poll.APR_POLLOUT);
- else
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
+ int flags=(value?Poll.APR_POLLIN:0) | (session.isOpWrite()?Poll.APR_POLLOUT:0);
+
+ rv = Poll.add(pollset, session.getAPRSocket(), flags);
if (rv == Status.APR_SUCCESS) {
// ok
+ session.setOpRead(value);
} else {
- System.err.println("poll.add Error : " + Error.strerror(rv));
+ logger.error("poll.add Error : " + Error.strerror(rv));
}
}
@Override
protected void setOpWrite(IoSession sess, boolean value)
throws Exception {
- System.err.println("setOpWrite : "+value);
+ logger.debug("setOpWrite : "+value);
APRSessionImpl session=(APRSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
System.err.println("poll.remove Error : " + Error.strerror(rv));
}
- boolean read=isOpRead(sess);
- if(read)
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
- | Poll.APR_POLLOUT);
- else
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLOUT);
+ int flags=(session.isOpRead()?Poll.APR_POLLIN:0) | (value?Poll.APR_POLLOUT:0);
+
+ rv = Poll.add(pollset, session.getAPRSocket(), flags);
if (rv == Status.APR_SUCCESS) {
// ok
+ session.setOpWrite(value);
} else {
- System.err.println("poll.add Error : " + Error.strerror(rv));
- }
+ logger.error("poll.add Error : " + Error.strerror(rv));
+ }
}
@Override
protected SessionState state(IoSession session) {
- System.err.println("state?");
+ logger.debug("state?");
long socket=((APRSessionImpl)session).getAPRSocket();
if(socket>0)
return SessionState.OPEN;
@@ -325,78 +314,6 @@
return SessionState.CLOSED;
}
-
-
-// private void write(APRSessionImpl session) {
-// if (session.getWriteRequestQueue().size() <= 0)
-// return;
-// Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-//
-// for (;;) {
-//
-// WriteRequest req;
-//
-// synchronized (writeRequestQueue) {
-// req = writeRequestQueue.peek();
-// }
-//
-// if (req == null) {
-// // remove of write polling
-// int rv = Poll.remove(pollset, session.getAPRSocket());
-// if (rv != Status.APR_SUCCESS) {
-// System.err.println("poll.remove Error : "
-// + Error.strerror(rv));
-// }
-// rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
-// if (rv == Status.APR_SUCCESS) {
-// // ok
-// } else {
-// System.err
-// .println("poll.add Error : " + Error.strerror(rv));
-// }
-// break;
-// }
-//
-// ByteBuffer buf = (ByteBuffer) req.getMessage();
-// if (buf.remaining() == 0) {
-// synchronized (writeRequestQueue) {
-// writeRequestQueue.poll();
-// }
-// session.increaseWrittenMessages();
-// buf.reset();
-// session.getFilterChain().fireMessageSent(req);
-// continue;
-// }
-// // be sure APR_SO_NONBLOCK was set, or it will block
-// int toWrite = buf.remaining();
-//
-// int writtenBytes;
-// // APR accept ByteBuffer, only if they are Direct ones, due to native code
-// if (buf.isDirect()) {
-// writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
-// 0, toWrite);
-// } else {
-// writtenBytes = Socket.send(session.getAPRSocket(), buf.array(),
-// 0, toWrite);
-// // FIXME : kludgy ?
-// buf.position(buf.position() + writtenBytes);
-// }
-// if (writtenBytes > 0) {
-// // increase
-//
-// session.increaseWrittenBytes(writtenBytes);
-// } else {
-// // FIXME : send the exception
-// System.err.println(Error.strerror(writtenBytes * -1));
-// }
-//
-// // kernel buffer full for this socket, wait next polling
-// if (buf.hasRemaining())
-// break;
-// }
-// }
-
-
@Override
protected long transferFile(IoSession session, FileRegion region)
throws Exception {
@@ -405,14 +322,14 @@
@Override
protected void wakeup() {
- System.err.println("wakeup");
+ logger.debug("wakeup");
// FIXME : is it possible to interrupt a Poll.poll ?
}
@Override
protected int write(IoSession session, ByteBuffer buf) throws Exception {
- System.err.println("write");
+ logger.debug("write");
// be sure APR_SO_NONBLOCK was set, or it will block
int toWrite = buf.remaining();
@@ -427,6 +344,7 @@
// FIXME : kludgy ?
buf.position(buf.position() + writtenBytes);
}
+ logger.debug("write : "+writtenBytes);
return writtenBytes;
}
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?rev=577220&r1=577219&r2=577220&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Wed Sep 19 02:52:27 2007
@@ -64,6 +64,9 @@
"Apache Portable Runtime socket", false, true,
InetSocketAddress.class, APRSessionConfig.class, ByteBuffer.class);
+ private boolean isOpRead=false;
+
+ private boolean isOpWrite=false;
/**
* Creates a new instance.
*/
@@ -223,5 +226,21 @@
@Override
protected IoProcessor getProcessor() {
return ioProcessor;
+ }
+
+ public boolean isOpRead() {
+ return isOpRead;
+ }
+
+ public void setOpRead(boolean isOpRead) {
+ this.isOpRead = isOpRead;
+ }
+
+ public boolean isOpWrite() {
+ return isOpWrite;
+ }
+
+ public void setOpWrite(boolean isOpWrite) {
+ this.isOpWrite = isOpWrite;
}
}