You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/09/19 11:52:28 UTC

svn commit: r577220 - in /mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr: APRConnector.java APRIoProcessor.java APRSessionImpl.java

Author: jvermillard
Date: Wed Sep 19 02:52:27 2007
New Revision: 577220

URL: http://svn.apache.org/viewvc?rev=577220&view=rev
Log:
Working APR, just miss wakeup();

Modified:
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?rev=577220&r1=577219&r2=577220&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java Wed Sep 19 02:52:27 2007
@@ -127,11 +127,17 @@
                     protocol.socketType, protocol.codeProto, APRLibrary
                             .getLibrary().getPool());
 
+            
             // FIXME : error checking
             int ret = Socket.connect(clientSock, inetAddr);
             if(ret!=Status.APR_SUCCESS)
                 System.err.println("Error Socket.connect : " + ret);
             
+            ret=Socket.optSet(clientSock, Socket.APR_SO_NONBLOCK, 1);
+
+            if(ret!=Status.APR_SUCCESS)
+                System.err.println("Error Socket.optSet : " + ret);
+
             if (localAddress != null) {
                 // TODO, check if it's possible to bind to a local address
             }

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?rev=577220&r1=577219&r2=577220&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Wed Sep 19 02:52:27 2007
@@ -19,21 +19,15 @@
  */
 package org.apache.mina.transport.apr;
 
-import java.nio.channels.SelectionKey;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.AbstractIoProcessor;
 import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.FileRegion;
 import org.apache.mina.common.IoSession;
 import org.apache.tomcat.jni.Error;
@@ -44,8 +38,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.sun.org.apache.bcel.internal.generic.GETSTATIC;
-
 /**
  * The class in charge of processing socket level IO events for the {@link APRConnector}
  * 
@@ -87,28 +79,23 @@
         }
 
         public AbstractIoSession next() {
-            
-            AbstractIoSession  sess=managedSessions.get(pollResult[index*2]);
+            AbstractIoSession  sess=managedSessions.get(pollResult[index*2+1]);
             index++;
+            System.err.println("sess : "+((APRSessionImpl)sess).getAPRSocket());
             return sess;
         }
 
         public void remove() {
-            throw new UnsupportedOperationException("remove");            
+            //throw new UnsupportedOperationException("remove");            
         }
     }
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private final Object lock = new Object();
-
     private long pool = 0; // memory pool
 
-//    private Worker worker;
-
     private long pollset = 0; // socket poller
 
-
     private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
 
     private long[] pollResult;
@@ -144,12 +131,13 @@
 
     @Override
     protected void doAdd(IoSession sess) throws Exception {
-        System.err.println("doAdd");
+        logger.debug("doAdd");
         APRSessionImpl session = (APRSessionImpl) sess;
         int rv;
         rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
         if (rv == Status.APR_SUCCESS) {
-            System.out.println("Added session to pollset");
+            logger.debug("sesion added to pollset");
+            session.setOpRead(true);
             managedSessions.put(session.getAPRSocket(), session);
         } else
             throw new RuntimeException("APR error while Poll.add(..) : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
@@ -157,100 +145,104 @@
                  
     @Override
     protected void doRemove(IoSession session) throws Exception {
-        System.err.println("doRemove");
-        remove(session); // will schedule it
+        logger.debug("doRemove");
+        int ret=Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
+        if(ret!=Status.APR_SUCCESS) {
+            logger.error("removing of pollset error");
+        }
+        ret=Socket.close(((APRSessionImpl)session).getAPRSocket());
+        if(ret!=Status.APR_SUCCESS) {
+            logger.error("closing socket error");
+        }
+
     }
 
     @Override
     protected void finalize() throws Throwable {
         // TODO : necessary I think, need to check APR doc
+        logger.debug("finalize, freeing the pool");
         Pool.clear(pool);
     }
 
     @Override
     protected boolean isOpRead(IoSession sess) throws Exception {
+        logger.debug("isOpRead?");
         APRSessionImpl session=(APRSessionImpl)sess;
-        long[] descriptors=new long[managedSessions.size()*2];
-        Poll.pollset(pollset, descriptors);
-        for(int i=0;i<managedSessions.size();i++) {
-            if(descriptors[i*2+1]== session.getAPRSocket()) {
-                return (descriptors[i*2] & Poll.APR_POLLOUT) >0;
-            }
-        }
-        return false;
+        logger.debug("isOpRead : "+session.isOpRead());
+        return session.isOpRead();
     }
 
     @Override
     protected boolean isOpWrite(IoSession sess) throws Exception {
+        logger.debug("isOpWrite?");
         APRSessionImpl session=(APRSessionImpl)sess;
-        long[] descriptors=new long[managedSessions.size()*2];
-        Poll.pollset(pollset, descriptors);
-        for(int i=0;i<managedSessions.size();i++) {
-            if(descriptors[i*2+1]== session.getAPRSocket()) {
-                return (descriptors[i*2] & Poll.APR_POLLIN) >0;
-            }
-        }
-        return false;
+        logger.debug("isOpWrite : "+session.isOpWrite());
+        return session.isOpWrite();
     }
 
     @Override
     protected boolean isReadable(IoSession session) throws Exception {
+        logger.debug("isReadable?");
         long socket=((APRSessionImpl)session).getAPRSocket();
         for(int i=0;i<pollResult.length/2;i++) {
-            if(pollResult[i+1]==socket) {
-                if( (pollResult[i]&Poll.APR_POLLIN) >0 )
+            if(pollResult[i*2+1]==socket) {
+                if( (pollResult[i*2]&Poll.APR_POLLIN) >0 ) {
+                    logger.debug("isReadable : true");
                     return true;
-                else
+                } else {
+                    logger.debug("isReadable : false");
                     return false;
+                }
             }
         }
+        logger.debug("isReadable : false (socket not found)");
         return false;
     }
 
     @Override
     protected boolean isWritable(IoSession session) throws Exception {
+        logger.debug("isWritable?");
         long socket=((APRSessionImpl)session).getAPRSocket();
         for(int i=0;i<pollResult.length/2;i++) {
-            if(pollResult[i+1]==socket) {
-                if( (pollResult[i]&Poll.APR_POLLOUT) >0 )
+            if(pollResult[i*2+1]==socket) {
+                if( (pollResult[i*2]&Poll.APR_POLLOUT) >0 ) {
+                    logger.debug("isWritable : true");
                     return true;
-                else
+                } else {
+                    logger.debug("isWritable : false");   
                     return false;
+                }
             }
         }
+        logger.debug("isWritable : false (socket not found)");
         return false;
     }
     
     @Override
     protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
+        logger.debug("read");
         APRSessionImpl session=(APRSessionImpl)sess;
         
         byte[] buf = session.getReadBuffer();
         // FIXME : hardcoded read value for testing
         int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
+        logger.debug("read bytes : "+bytes);
         if (bytes > 0) {
             buffer.put(buf);
-        } else if (bytes < 0) {
-            logger.debug("Read {} bytes, need closing ?", bytes);
-            return -1;
         }
         return bytes;
     }
 
-    public void remove(IoSession session) {
-        Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
-        Socket.close(((APRSessionImpl)session).getAPRSocket());
-    }
-
     @Override
     protected boolean select(int timeout) throws Exception {
-        System.err.println("select");
+        logger.debug("select?");
         // poll the socket descriptors
         /* is it OK ? : Two times size of the created pollset */
-        long[] pollResult = new long[managedSessions.size() * 2];
+        pollResult = new long[managedSessions.size() * 2];
 
         int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
         if (rv > 0) {
+            logger.debug("select : true");    
             return true;
         } else if(rv<0) {
             if(rv!=-120001) { // timeout ( FIXME : can't find the good constant in APR)
@@ -258,6 +250,7 @@
                 throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
             }
         }
+        logger.debug("select : false");
         return false;
     }
 
@@ -268,54 +261,50 @@
 
     @Override
     protected void setOpRead(IoSession sess, boolean value) throws Exception {
-        System.err.println("setOpRead : "+value);
+        logger.debug("setOpRead : "+value);
         APRSessionImpl session=(APRSessionImpl)sess;
         int rv = Poll.remove(pollset, session.getAPRSocket());
         if (rv != Status.APR_SUCCESS) {
             System.err.println("poll.remove Error : " + Error.strerror(rv));
         }
-        boolean write=isOpWrite(sess);
         
-        if(write)
-            rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
-                    | Poll.APR_POLLOUT);
-        else
-            rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
+        int flags=(value?Poll.APR_POLLIN:0) | (session.isOpWrite()?Poll.APR_POLLOUT:0);
+        
+        rv = Poll.add(pollset, session.getAPRSocket(), flags);
         
         if (rv == Status.APR_SUCCESS) {
             // ok
+            session.setOpRead(value);
         } else {
-            System.err.println("poll.add Error : " + Error.strerror(rv));
+            logger.error("poll.add Error : " + Error.strerror(rv));
         }
     }
 
     @Override
     protected void setOpWrite(IoSession sess, boolean value)
             throws Exception {
-        System.err.println("setOpWrite : "+value);
+        logger.debug("setOpWrite : "+value);
         APRSessionImpl session=(APRSessionImpl)sess;
         int rv = Poll.remove(pollset, session.getAPRSocket());
         if (rv != Status.APR_SUCCESS) {
             System.err.println("poll.remove Error : " + Error.strerror(rv));
         }
-        boolean read=isOpRead(sess);
         
-        if(read)
-            rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
-                    | Poll.APR_POLLOUT);
-        else
-            rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLOUT);
+        int flags=(session.isOpRead()?Poll.APR_POLLIN:0) | (value?Poll.APR_POLLOUT:0);
+        
+        rv = Poll.add(pollset, session.getAPRSocket(), flags);
         
         if (rv == Status.APR_SUCCESS) {
             // ok
+            session.setOpWrite(value);
         } else {
-            System.err.println("poll.add Error : " + Error.strerror(rv));
-        }        
+            logger.error("poll.add Error : " + Error.strerror(rv));
+        }
     }
 
     @Override
     protected SessionState state(IoSession session) {
-        System.err.println("state?");
+        logger.debug("state?");
         long socket=((APRSessionImpl)session).getAPRSocket();
         if(socket>0)
             return SessionState.OPEN;
@@ -325,78 +314,6 @@
             return SessionState.CLOSED;
     }
 
-  
-
-//    private void write(APRSessionImpl session) {
-//        if (session.getWriteRequestQueue().size() <= 0)
-//            return;
-//        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-//
-//        for (;;) {
-//
-//            WriteRequest req;
-//
-//            synchronized (writeRequestQueue) {
-//                req = writeRequestQueue.peek();
-//            }
-//
-//            if (req == null) {
-//                // remove of write polling
-//                int rv = Poll.remove(pollset, session.getAPRSocket());
-//                if (rv != Status.APR_SUCCESS) {
-//                    System.err.println("poll.remove Error : "
-//                            + Error.strerror(rv));
-//                }
-//                rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
-//                if (rv == Status.APR_SUCCESS) {
-//                    // ok
-//                } else {
-//                    System.err
-//                            .println("poll.add Error : " + Error.strerror(rv));
-//                }
-//                break;
-//            }
-//
-//            ByteBuffer buf = (ByteBuffer) req.getMessage();
-//            if (buf.remaining() == 0) {
-//                synchronized (writeRequestQueue) {
-//                    writeRequestQueue.poll();
-//                }
-//                session.increaseWrittenMessages();
-//                buf.reset();
-//                session.getFilterChain().fireMessageSent(req);
-//                continue;
-//            }
-//            // be sure APR_SO_NONBLOCK was set, or it will block
-//            int toWrite = buf.remaining();
-//
-//            int writtenBytes;
-//            // APR accept ByteBuffer, only if they are Direct ones, due to native code
-//            if (buf.isDirect()) {
-//                writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
-//                        0, toWrite);
-//            } else {
-//                writtenBytes = Socket.send(session.getAPRSocket(), buf.array(),
-//                        0, toWrite);
-//                // FIXME : kludgy ?
-//                buf.position(buf.position() + writtenBytes);
-//            }
-//            if (writtenBytes > 0) {
-//                // increase
-//
-//                session.increaseWrittenBytes(writtenBytes);
-//            } else {
-//                // FIXME : send the exception
-//                System.err.println(Error.strerror(writtenBytes * -1));
-//            }
-//
-//            // kernel buffer full for this socket, wait next polling
-//            if (buf.hasRemaining())
-//                break;
-//        }
-//    }
-
-
     @Override
     protected long transferFile(IoSession session, FileRegion region)
             throws Exception {
@@ -405,14 +322,14 @@
 
     @Override
     protected void wakeup() {
-        System.err.println("wakeup");
+        logger.debug("wakeup");
         // FIXME : is it possible to interrupt a Poll.poll ?
         
     }
     
     @Override
     protected int write(IoSession session, ByteBuffer buf) throws Exception {
-        System.err.println("write");
+        logger.debug("write");
         // be sure APR_SO_NONBLOCK was set, or it will block
         int toWrite = buf.remaining();
 
@@ -427,6 +344,7 @@
             // FIXME : kludgy ?
             buf.position(buf.position() + writtenBytes);
         }
+        logger.debug("write : "+writtenBytes);
         return writtenBytes;
     }
 }

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?rev=577220&r1=577219&r2=577220&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Wed Sep 19 02:52:27 2007
@@ -64,6 +64,9 @@
             "Apache Portable Runtime socket", false, true,
             InetSocketAddress.class, APRSessionConfig.class, ByteBuffer.class);
 
+    private boolean isOpRead=false;
+    
+    private boolean isOpWrite=false;
     /**
      * Creates a new instance.
      */
@@ -223,5 +226,21 @@
     @Override
     protected IoProcessor getProcessor() {
         return ioProcessor;
+    }
+
+    public boolean isOpRead() {
+        return isOpRead;
+    }
+
+    public void setOpRead(boolean isOpRead) {
+        this.isOpRead = isOpRead;
+    }
+
+    public boolean isOpWrite() {
+        return isOpWrite;
+    }
+
+    public void setOpWrite(boolean isOpWrite) {
+        this.isOpWrite = isOpWrite;
     }
 }