You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/07/26 16:33:16 UTC

svn commit: r559833 - /mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java

Author: jvermillard
Date: Thu Jul 26 07:33:15 2007
New Revision: 559833

URL: http://svn.apache.org/viewvc?view=rev&rev=559833
Log:
less write synchronize, and call Poll.add for writing in the filterchain, for better latency (was previously done in the APRioP Worker thread)

Modified:
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=559833&r1=559832&r2=559833
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Thu Jul 26 07:33:15 2007
@@ -40,8 +40,6 @@
 
 	private final Queue<APRSessionImpl> removingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
 
-	private final Queue<APRSessionImpl> flushingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
-
 	private final Queue<APRSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
 
 	private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
@@ -61,9 +59,7 @@
 		try {
 
 			// TODO : optimize/parametrize those values
-			synchronized (this) {
-				pollset = Poll.create(32, pool, /* obviously doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);	
-			}
+			pollset = Poll.create(32, pool, /* obviously doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);	
 			
 		} catch (Error e) {
 			logger.error("APR Error : " + e.getDescription(), e);
@@ -92,18 +88,14 @@
 	}
 
 	void flush(APRSessionImpl session) {
-		scheduleFlush(session);
+		// re-add the session to polling with POLLOUT flag 
+		pollOutSession(session);
 	}
     
 	private void scheduleRemove(APRSessionImpl session) {
 		removingSessions.offer(session);
 	}
 
-	private void scheduleFlush(APRSessionImpl session) {
-		flushingSessions.offer(session);
-	}
-
-	
 	// TODO : do something with traffic control 
 	private void scheduleTrafficControl(APRSessionImpl session) {
 		trafficControllingSessions.offer(session);
@@ -122,11 +114,7 @@
 			System.err.println("pollset : "+pollset);
 			System.err.println("Socket : "+session.getAPRSocket());
 			int rv;
-			synchronized (this) {
-				rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
-		
-				/*| Poll.APR_POLLOUT*/);
-			}
+				rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN/*| Poll.APR_POLLOUT*/);
 			if (rv == Status.APR_SUCCESS) {
 				System.out.println("Added worker to pollset");
 				managedSessions.put(session.getAPRSocket(), session);
@@ -151,9 +139,7 @@
 			}
 
 			// remove of the pollset
-			synchronized (this) {
-				Poll.remove(pollset, session.getAPRSocket());	
-			}
+			Poll.remove(pollset, session.getAPRSocket());	
 			
 			// close the socket
 			Socket.close(session.getAPRSocket());
@@ -162,43 +148,19 @@
 		}
 	}
 	
-    private void doFlush() {
-        if (flushingSessions.size() == 0) {
-            return;
-        }
-
-        for (;;) {
-            APRSessionImpl session = flushingSessions.poll();
-
-            if (session == null) {
-                break;
-            }
-
-            if (!session.isConnected()) {
-                clearWriteRequestQueue(session);
-                continue;
-            }
-            
-            // add the descriptor as POLLOUT
-        	synchronized (this) {
-        			int rv= Poll.remove(pollset, session.getAPRSocket());
-        			if(rv!=Status.APR_SUCCESS) {
-        				System.err.println("poll.remove Error : "+Error.strerror(rv));
-        			}
-        			rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
-        			if (rv == Status.APR_SUCCESS) {
-        				// ok
-        			} else {
-        				System.err.println("poll.add Error : "+Error.strerror(rv));
-        			}
-
-        	}
-        	
-            
-        }
+    private void pollOutSession(APRSessionImpl session) {
+	    int rv= Poll.remove(pollset, session.getAPRSocket());
+		if(rv!=Status.APR_SUCCESS) {
+			System.err.println("poll.remove Error : "+Error.strerror(rv));
+		}
+		rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
+		if (rv == Status.APR_SUCCESS) {
+			// ok
+		} else {
+			System.err.println("poll.add Error : "+Error.strerror(rv));
+		}
     }
 
-
 	private void read(APRSessionImpl session) {
 		byte[] buf = session.getReadBuffer();
 		// FIXME : hardcoded read value for testing
@@ -363,12 +325,7 @@
 					long[] desc = new long[socketCount * 2];
 
 					/* use 100 milliseconds poll timeout, TODO : parametrize for more latency/CPU usage control*/
-					int rv;
-					synchronized (this) {
-						rv = Poll.poll(pollset, 100000, desc, false);
-					}
-					//System.err.println("rv poll : "+rv+" - "+Thread.currentThread());
-					
+					int rv = Poll.poll(pollset, 100000, desc, false);
 					if (rv > 0) {
 						for (int n = 0; n < rv; n++) {
 							long clientSock = desc[n * 2 + 1];
@@ -388,7 +345,7 @@
 								write(session);
 						}
 					}
-					doFlush();
+				//	doFlush();
 					notifyIdleness();
 					doRemove();
 				} catch (Throwable t) {