You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/07/26 16:24:09 UTC

svn commit: r559826 - in /mina/sandbox/jvermillard/apr/src: main/java/org/apache/mina/transport/apr/ test/java/org/apache/mina/transport/apr/

Author: jvermillard
Date: Thu Jul 26 07:24:09 2007
New Revision: 559826

URL: http://svn.apache.org/viewvc?view=rev&rev=559826
Log:
write works now, without big optimization (lot of useless synchronize)

Modified:
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
    mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java Thu Jul 26 07:24:09 2007
@@ -16,7 +16,6 @@
 
 	@Override
 	protected void doWrite(IoSession session, WriteRequest writeRequest) {
-		System.err.println("dowrite ????");
 		APRSessionImpl s = (APRSessionImpl) session;
 		Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
 

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Thu Jul 26 07:24:09 2007
@@ -61,7 +61,10 @@
 		try {
 
 			// TODO : optimize/parametrize those values
-			pollset = Poll.create(32, pool, Poll.APR_POLLSET_THREADSAFE, 10000000);
+			synchronized (this) {
+				pollset = Poll.create(32, pool, /* obviously doesn't work..*/ Poll.APR_POLLSET_THREADSAFE, 10000000);	
+			}
+			
 		} catch (Error e) {
 			logger.error("APR Error : " + e.getDescription(), e);
 			// TODO : send that to the good logger
@@ -89,27 +92,16 @@
 	}
 
 	void flush(APRSessionImpl session) {
-		//scheduleFlush(session);
-
-		// add the descriptor as POLLOUT
-		
-		int rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
-		| Poll.APR_POLLOUT);
-		if (rv == Status.APR_SUCCESS) {
-			System.err.println("pollout Ok");
-		} else {
-			System.err.println("");
-		}
-
+		scheduleFlush(session);
 	}
-
+    
 	private void scheduleRemove(APRSessionImpl session) {
 		removingSessions.offer(session);
 	}
 
-//	private void scheduleFlush(APRSessionImpl session) {
-//		flushingSessions.offer(session);
-//	}
+	private void scheduleFlush(APRSessionImpl session) {
+		flushingSessions.offer(session);
+	}
 
 	
 	// TODO : do something with traffic control 
@@ -129,8 +121,12 @@
 			// FIXME : perhaps we should oll write only if needed for save CPU, but actually it's too complex for me :)
 			System.err.println("pollset : "+pollset);
 			System.err.println("Socket : "+session.getAPRSocket());
-			int rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
-					/*| Poll.APR_POLLOUT*/);
+			int rv;
+			synchronized (this) {
+				rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
+		
+				/*| Poll.APR_POLLOUT*/);
+			}
 			if (rv == Status.APR_SUCCESS) {
 				System.out.println("Added worker to pollset");
 				managedSessions.put(session.getAPRSocket(), session);
@@ -142,7 +138,7 @@
 				// FIXME: find a way to bring the real APR error from returned codes
 				session.getFilterChain().fireExceptionCaught(session,
 						new RuntimeException("APR Error"));
-			}
+			}	
 		}
 	}
 
@@ -155,14 +151,53 @@
 			}
 
 			// remove of the pollset
-			Poll.remove(pollset, session.getAPRSocket());
-
+			synchronized (this) {
+				Poll.remove(pollset, session.getAPRSocket());	
+			}
+			
 			// close the socket
 			Socket.close(session.getAPRSocket());
 			clearWriteRequestQueue(session);
 			getServiceListeners(session).fireSessionDestroyed(session);
 		}
 	}
+	
+    private void doFlush() {
+        if (flushingSessions.size() == 0) {
+            return;
+        }
+
+        for (;;) {
+            APRSessionImpl session = flushingSessions.poll();
+
+            if (session == null) {
+                break;
+            }
+
+            if (!session.isConnected()) {
+                clearWriteRequestQueue(session);
+                continue;
+            }
+            
+            // add the descriptor as POLLOUT
+        	synchronized (this) {
+        			int rv= Poll.remove(pollset, session.getAPRSocket());
+        			if(rv!=Status.APR_SUCCESS) {
+        				System.err.println("poll.remove Error : "+Error.strerror(rv));
+        			}
+        			rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN | Poll.APR_POLLOUT);
+        			if (rv == Status.APR_SUCCESS) {
+        				// ok
+        			} else {
+        				System.err.println("poll.add Error : "+Error.strerror(rv));
+        			}
+
+        	}
+        	
+            
+        }
+    }
+
 
 	private void read(APRSessionImpl session) {
 		byte[] buf = session.getReadBuffer();
@@ -181,10 +216,8 @@
 	}
 
 	private void write(APRSessionImpl session) {
-		//System.err.println("Do write");
 		if (session.getWriteRequestQueue().size() <= 0)
 			return;
-		System.err.println("Ok something in the queue");
 		Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 		
 		for (;;) {
@@ -196,6 +229,17 @@
 			}
 
 			if (req == null) {
+				// remove of write polling
+       			int rv= Poll.remove(pollset, session.getAPRSocket());
+    			if(rv!=Status.APR_SUCCESS) {
+    				System.err.println("poll.remove Error : "+Error.strerror(rv));
+    			}
+    			rv = Poll.add(pollset, session.getAPRSocket(),Poll.APR_POLLIN);
+    			if (rv == Status.APR_SUCCESS) {
+    				// ok
+    			} else {
+    				System.err.println("poll.add Error : "+Error.strerror(rv));
+    			}
 				break;
 			}
 
@@ -211,11 +255,25 @@
 			}
 			// be sure APR_SO_NONBLOCK was set, or it will block
 			int toWrite = buf.remaining();
-			int writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
-					0, toWrite);
+			
+			int writtenBytes;
+			// APR accept ByteBuffer, only if they are Direct ones, due to native code
+			if(buf.isDirect()) {
+				writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
+						0, toWrite);
+			} else {
+				writtenBytes = Socket.send(session.getAPRSocket(), buf.array(),
+						0, toWrite);
+				// FIXME : kludgy ?
+				buf.position(buf.position()+writtenBytes);
+			}
 			if (writtenBytes > 0) {
 				// increase
+				
 				session.increaseWrittenBytes(writtenBytes);
+			} else {
+				// FIXME : send the exception
+				System.err.println(Error.strerror(writtenBytes*-1));
 			}
 
 			// kernel buffer full for this socket, wait next polling
@@ -305,7 +363,12 @@
 					long[] desc = new long[socketCount * 2];
 
 					/* use 100 milliseconds poll timeout, TODO : parametrize for more latency/CPU usage control*/
-					int rv = Poll.poll(pollset, 100000, desc, false);
+					int rv;
+					synchronized (this) {
+						rv = Poll.poll(pollset, 100000, desc, false);
+					}
+					//System.err.println("rv poll : "+rv+" - "+Thread.currentThread());
+					
 					if (rv > 0) {
 						for (int n = 0; n < rv; n++) {
 							long clientSock = desc[n * 2 + 1];
@@ -325,6 +388,7 @@
 								write(session);
 						}
 					}
+					doFlush();
 					notifyIdleness();
 					doRemove();
 				} catch (Throwable t) {

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Thu Jul 26 07:24:09 2007
@@ -130,4 +130,8 @@
 			APRSessionConfig {
 
 	}
+	@Override
+	protected void write0(WriteRequest writeRequest) {
+		filterChain.fireFilterWrite(this, writeRequest);
+	}
 }

Modified: mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java?view=diff&rev=559826&r1=559825&r2=559826
==============================================================================
--- mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java (original)
+++ mina/sandbox/jvermillard/apr/src/test/java/org/apache/mina/transport/apr/TestCnx.java Thu Jul 26 07:24:09 2007
@@ -12,6 +12,7 @@
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.tomcat.jni.Library;
 
 public class TestCnx extends TestCase {
@@ -20,6 +21,7 @@
 		BasicConfigurator.configure();
 		Library.initialize(null);
 		APRConnector cnx=new APRConnector();
+		//SocketConnector cnx=new SocketConnector(); 
 		cnx.setHandler(new IoHandler(){
 
 			public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
@@ -66,7 +68,6 @@
 		Thread.sleep(1000);
 		System.err.println("writing hello");
 		f.getSession().write(  ByteBuffer.wrap("HELLO\n".getBytes()).rewind() );
-		
 		Thread.sleep(4000);
 		System.err.println("Done");
 	}