You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/09/18 18:31:54 UTC

svn commit: r576972 - in /mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr: APRConnector.java APRIoProcessor.java APRSessionImpl.java

Author: jvermillard
Date: Tue Sep 18 09:31:49 2007
New Revision: 576972

URL: http://svn.apache.org/viewvc?rev=576972&view=rev
Log:
APRconnector compiling now, but doesn't work for now

Modified:
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
    mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?rev=576972&r1=576971&r2=576972&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java Tue Sep 18 09:31:49 2007
@@ -34,6 +34,7 @@
 import org.apache.mina.util.NewThreadExecutor;
 import org.apache.tomcat.jni.Address;
 import org.apache.tomcat.jni.Socket;
+import org.apache.tomcat.jni.Status;
 
 /**
  * An {@link IoConnector} implementation using the Apache Portable Runtime 
@@ -128,13 +129,17 @@
 
             // FIXME : error checking
             int ret = Socket.connect(clientSock, inetAddr);
-            System.err.println("Socket.connect : " + ret);
+            if(ret!=Status.APR_SUCCESS)
+                System.err.println("Error Socket.connect : " + ret);
+            
             if (localAddress != null) {
                 // TODO, check if it's possible to bind to a local address
             }
 
             ConnectFuture future = new DefaultConnectFuture();
-            APRSessionImpl session = new APRSessionImpl(this, nextProcessor(),
+            APRIoProcessor proc=nextProcessor();
+            System.err.println("proc : "+proc);
+            APRSessionImpl session = new APRSessionImpl(this,proc ,
                     clientSock, sockAddr, (InetSocketAddress) localAddress);
 
             try {
@@ -151,7 +156,7 @@
 
             // Forward the remaining process to the APRIoProcessor.
             // it's will validate the COnnectFuture when the session is in the poll set
-            session.getIoProcessor().addNew(session);
+            session.getIoProcessor().add(session);
 
             success = true;
             return future;
@@ -166,11 +171,10 @@
     }
 
     private APRIoProcessor nextProcessor() {
-        if (processorDistributor++ < 0) {
-            processorDistributor = 0;
+        if (this.processorDistributor == Integer.MAX_VALUE) {
+            this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
         }
-
-        return ioProcessors[processorDistributor % processorCount];
+        return ioProcessors[processorDistributor++ % processorCount];
     }
 
     public TransportMetadata getTransportMetadata() {

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?rev=576972&r1=576971&r2=576972&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Tue Sep 18 09:31:49 2007
@@ -19,10 +19,13 @@
  */
 package org.apache.mina.transport.apr;
 
+import java.nio.channels.SelectionKey;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
@@ -52,17 +55,63 @@
 
 class APRIoProcessor extends AbstractIoProcessor {
 
+    protected static class IoSessionIterator implements Iterator<AbstractIoSession> {
+        private final Iterator<APRSessionImpl> i;
+        private IoSessionIterator(Collection<APRSessionImpl> sessions) {
+            i = sessions.iterator();
+        }
+        public boolean hasNext() {
+            return i.hasNext();
+        }
+
+        public AbstractIoSession next() {
+            APRSessionImpl sess = i.next();
+            return (AbstractIoSession) sess;
+        }
+
+        public void remove() {
+            i.remove();
+        }
+    }
+
+    protected class PollSetIterator implements Iterator<AbstractIoSession> {
+        private long[] pollResult;
+        
+        int index=0;
+        public PollSetIterator(long[] pollResult) {
+            this.pollResult=pollResult;
+        }
+
+        public boolean hasNext() {
+            return index*2< pollResult.length;
+        }
+
+        public AbstractIoSession next() {
+            
+            AbstractIoSession  sess=managedSessions.get(pollResult[index*2]);
+            index++;
+            return sess;
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException("remove");            
+        }
+    }
+
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final Object lock = new Object();
 
     private long pool = 0; // memory pool
 
+//    private Worker worker;
+
     private long pollset = 0; // socket poller
 
+
     private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
 
-//    private Worker worker;
+    private long[] pollResult;
 
     APRIoProcessor(String threadName, Executor executor) {
         super(threadName,executor);
@@ -85,15 +134,17 @@
         }
     }
 
-
     @Override
     protected Iterator<AbstractIoSession> allSessions() throws Exception {
+        
         // TODO Auto-generated method stub
-        return null;
+        return new IoSessionIterator(managedSessions.values());
     }
+    
 
     @Override
     protected void doAdd(IoSession sess) throws Exception {
+        System.err.println("doAdd");
         APRSessionImpl session = (APRSessionImpl) sess;
         int rv;
         rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
@@ -103,97 +154,20 @@
         } else
             throw new RuntimeException("APR error while Poll.add(..) : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
     }
-
+                 
     @Override
     protected void doRemove(IoSession session) throws Exception {
+        System.err.println("doRemove");
         remove(session); // will schedule it
     }
 
     @Override
-    protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
-        APRSessionImpl session=(APRSessionImpl)sess;
-        
-        byte[] buf = session.getReadBuffer();
-        // FIXME : hardcoded read value for testing
-        int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
-        if (bytes > 0) {
-            buffer.put(buf);
-        } else if (bytes < 0) {
-            logger.debug("Read {} bytes, need closing ?", bytes);
-            return -1;
-        }
-        return bytes;
-    }
-    
-
-    private long[] pollResult;
-                 
-    @Override
-    protected boolean select(int timeout) throws Exception {
-        // poll the socket descriptors
-        /* is it OK ? : Two times size of the created pollset */
-        long[] pollResult = new long[managedSessions.size() * 2];
-
-        int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
-        if (rv > 0) {
-            return true;
-        } else if(rv<0) {
-            if(rv!=-100002) { // timeout ( FIXME : can't find the good constant in APR)
-                System.err.println("APR Poll error : "+Error.strerror(-1*rv)+" "+rv);
-                throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
-            }
-        }
-        return false;
-    }
-
-    @Override
-    protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
-        return new PollSetIterator(pollResult);
-    }
-
-    @Override
-    protected SessionState state(IoSession session) {
-        long socket=((APRSessionImpl)session).getAPRSocket();
-        if(socket>0)
-            return SessionState.OPEN;
-        else if(managedSessions.get(socket)!=null)
-            return SessionState.PREPARING; // will occur ?
-        else
-            return SessionState.CLOSED;
-    }
-
-    @Override
-    protected long transferFile(IoSession session, FileRegion region)
-            throws Exception {
-        throw new UnsupportedOperationException("Not supposed for APR (TODO)");
-    }
-
-    @Override
-    protected void wakeup() {
-        // FIXME : is it possible to interrupt a Poll.poll ?
-        
+    protected void finalize() throws Throwable {
+        // TODO : necessary I think, need to check APR doc
+        Pool.clear(pool);
     }
 
     @Override
-    protected int write(IoSession session, ByteBuffer buf) throws Exception {
-        // be sure APR_SO_NONBLOCK was set, or it will block
-        int toWrite = buf.remaining();
-
-        int writtenBytes;
-        // APR accept ByteBuffer, only if they are Direct ones, due to native code
-        if (buf.isDirect()) {
-            writtenBytes = Socket.sendb( ((APRSessionImpl)session).getAPRSocket(), buf.buf(),
-                    0, toWrite);
-        } else {
-            writtenBytes = Socket.send( ((APRSessionImpl)session).getAPRSocket(), buf.array(),
-                    0, toWrite);
-            // FIXME : kludgy ?
-            buf.position(buf.position() + writtenBytes);
-        }
-        return writtenBytes;
-    }
-    
-    @Override
     protected boolean isOpRead(IoSession sess) throws Exception {
         APRSessionImpl session=(APRSessionImpl)sess;
         long[] descriptors=new long[managedSessions.size()*2];
@@ -246,9 +220,55 @@
         }
         return false;
     }
+    
+    @Override
+    protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
+        APRSessionImpl session=(APRSessionImpl)sess;
+        
+        byte[] buf = session.getReadBuffer();
+        // FIXME : hardcoded read value for testing
+        int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
+        if (bytes > 0) {
+            buffer.put(buf);
+        } else if (bytes < 0) {
+            logger.debug("Read {} bytes, need closing ?", bytes);
+            return -1;
+        }
+        return bytes;
+    }
+
+    public void remove(IoSession session) {
+        Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
+        Socket.close(((APRSessionImpl)session).getAPRSocket());
+    }
+
+    @Override
+    protected boolean select(int timeout) throws Exception {
+        System.err.println("select");
+        // poll the socket descriptors
+        /* is it OK ? : Two times size of the created pollset */
+        long[] pollResult = new long[managedSessions.size() * 2];
+
+        int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
+        if (rv > 0) {
+            return true;
+        } else if(rv<0) {
+            if(rv!=-120001) { // timeout ( FIXME : can't find the good constant in APR)
+                System.err.println("APR Poll error : "+Error.strerror(-1*rv)+" "+rv);
+                throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
+            }
+        }
+        return false;
+    }
+
+    @Override
+    protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+        return new PollSetIterator(pollResult);
+    }
 
     @Override
     protected void setOpRead(IoSession sess, boolean value) throws Exception {
+        System.err.println("setOpRead : "+value);
         APRSessionImpl session=(APRSessionImpl)sess;
         int rv = Poll.remove(pollset, session.getAPRSocket());
         if (rv != Status.APR_SUCCESS) {
@@ -272,6 +292,7 @@
     @Override
     protected void setOpWrite(IoSession sess, boolean value)
             throws Exception {
+        System.err.println("setOpWrite : "+value);
         APRSessionImpl session=(APRSessionImpl)sess;
         int rv = Poll.remove(pollset, session.getAPRSocket());
         if (rv != Status.APR_SUCCESS) {
@@ -292,9 +313,16 @@
         }        
     }
 
-    public void remove(IoSession session) {
-        Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
-        Socket.close(((APRSessionImpl)session).getAPRSocket());
+    @Override
+    protected SessionState state(IoSession session) {
+        System.err.println("state?");
+        long socket=((APRSessionImpl)session).getAPRSocket();
+        if(socket>0)
+            return SessionState.OPEN;
+        else if(managedSessions.get(socket)!=null)
+            return SessionState.PREPARING; // will occur ?
+        else
+            return SessionState.CLOSED;
     }
 
   
@@ -370,32 +398,35 @@
 
 
     @Override
-    protected void finalize() throws Throwable {
-        // TODO : necessary I think, need to check APR doc
-        Pool.clear(pool);
+    protected long transferFile(IoSession session, FileRegion region)
+            throws Exception {
+        throw new UnsupportedOperationException("Not supposed for APR (TODO)");
     }
 
-    protected class PollSetIterator implements Iterator<AbstractIoSession> {
-        private long[] pollResult;
+    @Override
+    protected void wakeup() {
+        System.err.println("wakeup");
+        // FIXME : is it possible to interrupt a Poll.poll ?
         
-        int index=0;
-        public PollSetIterator(long[] pollResult) {
-            this.pollResult=pollResult;
-        }
-
-        public boolean hasNext() {
-            return index*2< pollResult.length;
-        }
-
-        public AbstractIoSession next() {
-            
-            AbstractIoSession  sess=managedSessions.get(pollResult[index*2]);
-            index++;
-            return sess;
-        }
+    }
+    
+    @Override
+    protected int write(IoSession session, ByteBuffer buf) throws Exception {
+        System.err.println("write");
+        // be sure APR_SO_NONBLOCK was set, or it will block
+        int toWrite = buf.remaining();
 
-        public void remove() {
-            throw new UnsupportedOperationException("remove");            
+        int writtenBytes;
+        // APR accept ByteBuffer, only if they are Direct ones, due to native code
+        if (buf.isDirect()) {
+            writtenBytes = Socket.sendb( ((APRSessionImpl)session).getAPRSocket(), buf.buf(),
+                    0, toWrite);
+        } else {
+            writtenBytes = Socket.send( ((APRSessionImpl)session).getAPRSocket(), buf.array(),
+                    0, toWrite);
+            // FIXME : kludgy ?
+            buf.position(buf.position() + writtenBytes);
         }
+        return writtenBytes;
     }
-}
+}
\ No newline at end of file

Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?rev=576972&r1=576971&r2=576972&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Tue Sep 18 09:31:49 2007
@@ -52,8 +52,6 @@
 
     private final IoFilterChain filterChain = new DefaultIoFilterChain(this);
 
-    private final Queue<WriteRequest> writeRequestQueue;
-
     private final IoHandler handler;
 
     private byte[] readBuffer = new byte[1024]; //FIXME : fixed rcvd buffer, need to change that to a config value
@@ -73,7 +71,6 @@
             InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
         this.service = service;
         this.ioProcessor = ioProcessor;
-        this.writeRequestQueue = new LinkedList<WriteRequest>();
         this.handler = service.getHandler();
         this.remoteAddress = remoteAddress;
         this.localAddress = localAddress;
@@ -225,7 +222,6 @@
 
     @Override
     protected IoProcessor getProcessor() {
-        // TODO Auto-generated method stub
-        return null;
+        return ioProcessor;
     }
 }