You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/07/26 16:33:16 UTC
svn commit: r559833 -
/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
Author: jvermillard
Date: Thu Jul 26 07:33:15 2007
New Revision: 559833
URL: http://svn.apache.org/viewvc?view=rev&rev=559833
Log:
less write synchronize, and call Poll.add for writing in the filterchain, for better latency (was previously done in the APRioP Worker thread)
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=559833&r1=559832&r2=559833
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Thu Jul 26 07:33:15 2007
@@ -40,8 +40,6 @@
private final Queue<APRSessionImpl> removingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
- private final Queue<APRSessionImpl> flushingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
-
private final Queue<APRSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
@@ -61,9 +59,7 @@
try {
// TODO : optimize/parametrize those values
- synchronized (this) {
- pollset = Poll.create(32, pool, /* obviously doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);
- }
+ pollset = Poll.create(32, pool, /* obviously doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);
} catch (Error e) {
logger.error("APR Error : " + e.getDescription(), e);
@@ -92,18 +88,14 @@
}
void flush(APRSessionImpl session) {
- scheduleFlush(session);
+ // re-add the session to polling with POLLOUT flag
+ pollOutSession(session);
}
private void scheduleRemove(APRSessionImpl session) {
removingSessions.offer(session);
}
- private void scheduleFlush(APRSessionImpl session) {
- flushingSessions.offer(session);
- }
-
-
// TODO : do something with traffic control
private void scheduleTrafficControl(APRSessionImpl session) {
trafficControllingSessions.offer(session);
@@ -122,11 +114,7 @@
System.err.println("pollset : "+pollset);
System.err.println("Socket : "+session.getAPRSocket());
int rv;
- synchronized (this) {
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
-
- /*| Poll.APR_POLLOUT*/);
- }
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN/*| Poll.APR_POLLOUT*/);
if (rv == Status.APR_SUCCESS) {
System.out.println("Added worker to pollset");
managedSessions.put(session.getAPRSocket(), session);
@@ -151,9 +139,7 @@
}
// remove of the pollset
- synchronized (this) {
- Poll.remove(pollset, session.getAPRSocket());
- }
+ Poll.remove(pollset, session.getAPRSocket());
// close the socket
Socket.close(session.getAPRSocket());
@@ -162,43 +148,19 @@
}
}
- private void doFlush() {
- if (flushingSessions.size() == 0) {
- return;
- }
-
- for (;;) {
- APRSessionImpl session = flushingSessions.poll();
-
- if (session == null) {
- break;
- }
-
- if (!session.isConnected()) {
- clearWriteRequestQueue(session);
- continue;
- }
-
- // add the descriptor as POLLOUT
- synchronized (this) {
- int rv= Poll.remove(pollset, session.getAPRSocket());
- if(rv!=Status.APR_SUCCESS) {
- System.err.println("poll.remove Error : "+Error.strerror(rv));
- }
- rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
- if (rv == Status.APR_SUCCESS) {
- // ok
- } else {
- System.err.println("poll.add Error : "+Error.strerror(rv));
- }
-
- }
-
-
- }
+ private void pollOutSession(APRSessionImpl session) {
+ int rv= Poll.remove(pollset, session.getAPRSocket());
+ if(rv!=Status.APR_SUCCESS) {
+ System.err.println("poll.remove Error : "+Error.strerror(rv));
+ }
+ rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
+ if (rv == Status.APR_SUCCESS) {
+ // ok
+ } else {
+ System.err.println("poll.add Error : "+Error.strerror(rv));
+ }
}
-
private void read(APRSessionImpl session) {
byte[] buf = session.getReadBuffer();
// FIXME : hardcoded read value for testing
@@ -363,12 +325,7 @@
long[] desc = new long[socketCount * 2];
/* use 100 milliseconds poll timeout, TODO : parametrize for more latency/CPU usage control*/
- int rv;
- synchronized (this) {
- rv = Poll.poll(pollset, 100000, desc, false);
- }
- //System.err.println("rv poll : "+rv+" - "+Thread.currentThread());
-
+ int rv = Poll.poll(pollset, 100000, desc, false);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
long clientSock = desc[n * 2 + 1];
@@ -388,7 +345,7 @@
write(session);
}
}
- doFlush();
+ // doFlush();
notifyIdleness();
doRemove();
} catch (Throwable t) {