You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/07/26 16:24:09 UTC
svn commit: r559826 - in /mina/sandbox/jvermillard/apr/src:
main/java/org/apache/mina/transport/apr/
test/java/org/apache/mina/transport/apr/
Author: jvermillard
Date: Thu Jul 26 07:24:09 2007
New Revision: 559826
URL: http://svn.apache.org/viewvc?view=rev&rev=559826
Log:
write works now, without big optimization (lot of useless synchronize)
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java Thu Jul 26 07:24:09 2007
@@ -16,7 +16,6 @@
@Override
protected void doWrite(IoSession session, WriteRequest writeRequest) {
- System.err.println("dowrite ????");
APRSessionImpl s = (APRSessionImpl) session;
Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Thu Jul 26 07:24:09 2007
@@ -61,7 +61,10 @@
try {
// TODO : optimize/parametrize those values
- pollset = Poll.create(32, pool, Poll.APR_POLLSET_THREADSAFE, 10000000);
+ synchronized (this) {
+ pollset = Poll.create(32, pool, /* obviously doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);
+ }
+
} catch (Error e) {
logger.error("APR Error : " + e.getDescription(), e);
// TODO : send that to the good logger
@@ -89,27 +92,16 @@
}
void flush(APRSessionImpl session) {
- //scheduleFlush(session);
-
- // add the descriptor as POLLOUT
-
- int rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
- | Poll.APR_POLLOUT);
- if (rv == Status.APR_SUCCESS) {
- System.err.println("pollout Ok");
- } else {
- System.err.println("");
- }
-
+ scheduleFlush(session);
}
-
+
private void scheduleRemove(APRSessionImpl session) {
removingSessions.offer(session);
}
-// private void scheduleFlush(APRSessionImpl session) {
-// flushingSessions.offer(session);
-// }
+ private void scheduleFlush(APRSessionImpl session) {
+ flushingSessions.offer(session);
+ }
// TODO : do something with traffic control
@@ -129,8 +121,12 @@
// FIXME : perhaps we should oll write only if needed for save CPU, but actually it's too complex for me :)
System.err.println("pollset : "+pollset);
System.err.println("Socket : "+session.getAPRSocket());
- int rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
- /*| Poll.APR_POLLOUT*/);
+ int rv;
+ synchronized (this) {
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
+
+ /*| Poll.APR_POLLOUT*/);
+ }
if (rv == Status.APR_SUCCESS) {
System.out.println("Added worker to pollset");
managedSessions.put(session.getAPRSocket(), session);
@@ -142,7 +138,7 @@
// FIXME: find a way to bring the real APR error from returned codes
session.getFilterChain().fireExceptionCaught(session,
new RuntimeException("APR Error"));
- }
+ }
}
}
@@ -155,14 +151,53 @@
}
// remove of the pollset
- Poll.remove(pollset, session.getAPRSocket());
-
+ synchronized (this) {
+ Poll.remove(pollset, session.getAPRSocket());
+ }
+
// close the socket
Socket.close(session.getAPRSocket());
clearWriteRequestQueue(session);
getServiceListeners(session).fireSessionDestroyed(session);
}
}
+
+ private void doFlush() {
+ if (flushingSessions.size() == 0) {
+ return;
+ }
+
+ for (;;) {
+ APRSessionImpl session = flushingSessions.poll();
+
+ if (session == null) {
+ break;
+ }
+
+ if (!session.isConnected()) {
+ clearWriteRequestQueue(session);
+ continue;
+ }
+
+ // add the descriptor as POLLOUT
+ synchronized (this) {
+ int rv= Poll.remove(pollset, session.getAPRSocket());
+ if(rv!=Status.APR_SUCCESS) {
+ System.err.println("poll.remove Error : "+Error.strerror(rv));
+ }
+ rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
+ if (rv == Status.APR_SUCCESS) {
+ // ok
+ } else {
+ System.err.println("poll.add Error : "+Error.strerror(rv));
+ }
+
+ }
+
+
+ }
+ }
+
private void read(APRSessionImpl session) {
byte[] buf = session.getReadBuffer();
@@ -181,10 +216,8 @@
}
private void write(APRSessionImpl session) {
- //System.err.println("Do write");
if (session.getWriteRequestQueue().size() <= 0)
return;
- System.err.println("Ok something in the queue");
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
for (;;) {
@@ -196,6 +229,17 @@
}
if (req == null) {
+ // remove of write polling
+ int rv= Poll.remove(pollset, session.getAPRSocket());
+ if(rv!=Status.APR_SUCCESS) {
+ System.err.println("poll.remove Error : "+Error.strerror(rv));
+ }
+ rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN);
+ if (rv == Status.APR_SUCCESS) {
+ // ok
+ } else {
+ System.err.println("poll.add Error : "+Error.strerror(rv));
+ }
break;
}
@@ -211,11 +255,25 @@
}
// be sure APR_SO_NONBLOCK was set, or it will block
int toWrite = buf.remaining();
- int writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
- 0, toWrite);
+
+ int writtenBytes;
+ // APR accept ByteBuffer, only if they are Direct ones, due to native code
+ if(buf.isDirect()) {
+ writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
+ 0, toWrite);
+ } else {
+ writtenBytes = Socket.send(session.getAPRSocket(), buf.array(),
+ 0, toWrite);
+ // FIXME : kludgy ?
+ buf.position(buf.position()+writtenBytes);
+ }
if (writtenBytes > 0) {
// increase
+
session.increaseWrittenBytes(writtenBytes);
+ } else {
+ // FIXME : send the exception
+ System.err.println(Error.strerror(writtenBytes*-1));
}
// kernel buffer full for this socket, wait next polling
@@ -305,7 +363,12 @@
long[] desc = new long[socketCount * 2];
/* use 100 milliseconds poll timeout, TODO : parametrize for more latency/CPU usage control*/
- int rv = Poll.poll(pollset, 100000, desc, false);
+ int rv;
+ synchronized (this) {
+ rv = Poll.poll(pollset, 100000, desc, false);
+ }
+ //System.err.println("rv poll : "+rv+" - "+Thread.currentThread());
+
if (rv > 0) {
for (int n = 0; n < rv; n++) {
long clientSock = desc[n * 2 + 1];
@@ -325,6 +388,7 @@
write(session);
}
}
+ doFlush();
notifyIdleness();
doRemove();
} catch (Throwable t) {
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Thu Jul 26 07:24:09 2007
@@ -130,4 +130,8 @@
APRSessionConfig {
}
+ @Override
+ protected void write0(WriteRequest writeRequest) {
+ filterChain.fireFilterWrite(this, writeRequest);
+ }
}
Modified: mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java (original)
+++ mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java Thu Jul 26 07:24:09 2007
@@ -12,6 +12,7 @@
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.tomcat.jni.Library;
public class TestCnx extends TestCase {
@@ -20,6 +21,7 @@
BasicConfigurator.configure();
Library.initialize(null);
APRConnector cnx=new APRConnector();
+ //SocketConnector cnx=new SocketConnector();
cnx.setHandler(new IoHandler(){
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
@@ -66,7 +68,6 @@
Thread.sleep(1000);
System.err.println("writing hello");
f.getSession().write( ByteBuffer.wrap("HELLO\n".getBytes()).rewind() );
-
Thread.sleep(4000);
System.err.println("Done");
}