You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2006/11/19 15:38:07 UTC

svn commit: r476827 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio: impl/reactor/AbstractIOReactor.java impl/reactor/BaseIOReactor.java reactor/IOReactor.java

Author: olegk
Date: Sun Nov 19 06:38:07 2006
New Revision: 476827

URL: http://svn.apache.org/viewvc?view=rev&rev=476827
Log:
Laying groundwork for more complex I/O reactors

Added:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java
      - copied, changed from r476807, jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java   (with props)
Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java

Copied: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java (from r476807, jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java)
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java?view=diff&rev=476827&p1=jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java&r1=476807&p2=jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java&r2=476827
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java Sun Nov 19 06:38:07 2006
@@ -30,51 +30,67 @@
 package org.apache.http.nio.impl.reactor;
 
 import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketAddress;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactor;
 import org.apache.http.nio.reactor.IOSession;
-import org.apache.http.nio.reactor.ListeningIOReactor;
-import org.apache.http.nio.reactor.SessionRequest;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
 
-public class DefaultIOReactor implements ListeningIOReactor, ConnectingIOReactor {
+public abstract class AbstractIOReactor implements IOReactor {
 
     public static int TIMEOUT_CHECK_INTERVAL = 1000;
     
     private volatile boolean closed = false;
     
-    private final HttpParams params;
     private final Selector selector;
     private final SessionSet sessions;
     private final SessionQueue closedSessions;
     
     private long lastTimeoutCheck;
     
-    private IOEventDispatch eventDispatch = null;
+    protected IOEventDispatch eventDispatch = null;
     
-    public DefaultIOReactor(final HttpParams params) throws IOException {
+    public AbstractIOReactor() throws IOException {
         super();
-        if (params == null) {
-            throw new IllegalArgumentException("HTTP parameters may not be null");
-        }
-        this.params = params;
         this.selector = Selector.open();
         this.sessions = new SessionSet();
         this.closedSessions = new SessionQueue();
         this.lastTimeoutCheck = System.currentTimeMillis();
     }
 
+    protected abstract void acceptable(SelectionKey key);
+    
+    protected abstract void connectable(SelectionKey key);
+
+    protected abstract void readable(SelectionKey key);
+
+    protected abstract void writable(SelectionKey key);
+    
+    protected abstract void timeoutCheck(SelectionKey key, long now);
+
+    protected SelectionKey registerChannel(final SocketChannel channel) 
+            throws IOException {
+        channel.configureBlocking(false);
+        return channel.register(this.selector, 0);
+    }
+    
+    protected IOSession newSession(final SelectionKey key) throws IOException {
+        IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
+
+            public void sessionClosed(IOSession session) {
+                closedSessions.push(session);
+            }
+            
+        });
+        this.sessions.add(session);
+        return session;
+    }
+    
     public void execute(final IOEventDispatch eventDispatch) throws IOException {
         if (eventDispatch == null) {
             throw new IllegalArgumentException("Event dispatcher may not be null");
@@ -120,80 +136,18 @@
 
     private void processEvent(final SelectionKey key) throws IOException {
         try {
-            
             if (key.isAcceptable()) {
-                
-                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
-                SocketChannel socketChannel = serverChannel.accept();
-                if (socketChannel != null) {
-                    // Configure new socket
-                    onNewSocket(socketChannel.socket());
-                    // Register new channel with the selector
-                    socketChannel.configureBlocking(false);
-                    SelectionKey newkey = socketChannel.register(this.selector, 0);
-                    // Set up new session
-                    IOSession session = newSession(newkey);
-
-                    // Attach session handle to the selection key
-                    SessionHandle handle = new SessionHandle(session); 
-                    newkey.attach(handle);
-                    
-                    this.eventDispatch.connected(session);
-                }
+                acceptable(key);
             }
-            
             if (key.isConnectable()) {
-
-                SocketChannel socketChannel = (SocketChannel) key.channel();
-                // Get request handle
-                SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
-                SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
-                
-                // Finish connection process
-                try {
-                    socketChannel.finishConnect();
-                } catch (IOException ex) {
-                    sessionRequest.failed(ex);
-                    key.cancel();
-                    return;
-                }
-
-                // Configure new socket
-                onNewSocket(socketChannel.socket());
-                // Set up new session
-                IOSession session = newSession(key);
-
-                // Attach session handle to the selection key
-                SessionHandle handle = new SessionHandle(session); 
-                key.attach(handle);
-                
-                // Add the session request to the session context
-                session.setAttribute(SessionRequest.ATTRIB_KEY, sessionRequest);
-                
-                // Fire the request completion notification first
-                sessionRequest.completed(session);
-                
-                // Followed by session connected notification
-                this.eventDispatch.connected(session);
-
+                connectable(key);
             }
-            
             if (key.isReadable()) {
-                SessionHandle handle = (SessionHandle) key.attachment();
-                IOSession session = handle.getSession();
-                handle.resetLastRead();
-
-                this.eventDispatch.inputReady(session);
+                readable(key);
             }
-            
             if (key.isWritable()) {
-                SessionHandle handle = (SessionHandle) key.attachment();
-                IOSession session = handle.getSession();
-                handle.resetLastWrite();
-                
-                this.eventDispatch.outputReady(session);
+                writable(key);
             }
-            
         } catch (CancelledKeyException ex) {
             Object attachment = key.attachment();
             if (attachment instanceof SessionHandle) {
@@ -205,56 +159,11 @@
         }
     }
 
-    private IOSession newSession(final SelectionKey key) throws IOException {
-        IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
-
-            public void sessionClosed(IOSession session) {
-                closedSessions.push(session);
-            }
-            
-        });
-        session.setSocketTimeout(HttpConnectionParams.getSoTimeout(this.params));
-        this.sessions.add(session);
-        return session;
-    }
-    
-    protected void onNewSocket(final Socket socket) throws IOException {
-        socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
-        socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
-        int linger = HttpConnectionParams.getLinger(this.params);
-        if (linger >= 0) {
-            socket.setSoLinger(linger > 0, linger);
-        }
-    }
-
     private void processTimeouts(final Set keys) {
         long now = System.currentTimeMillis();
         for (Iterator it = keys.iterator(); it.hasNext();) {
             SelectionKey key = (SelectionKey) it.next();
-            Object attachment = key.attachment();
-            
-            if (attachment instanceof SessionHandle) {
-                SessionHandle handle = (SessionHandle) key.attachment();
-                IOSession session = handle.getSession();
-                int timeout = session.getSocketTimeout();
-                if (timeout > 0) {
-                    if (handle.getLastReadTime() + timeout < now) {
-                        this.eventDispatch.timeout(session);
-                    }
-                }
-            }
-
-            if (attachment instanceof SessionRequestHandle) {
-                SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
-                SessionRequestImpl sessionRequest = handle.getSessionRequest();
-                int timeout = sessionRequest.getConnectTimeout();
-                if (timeout > 0) {
-                    if (handle.getRequestTime() + timeout < now) {
-                        sessionRequest.timeout();
-                    }
-                }
-            }
-            
+            timeoutCheck(key, now);
         }
     }
 
@@ -281,36 +190,6 @@
         }
     }
     
-    public void listen(
-            final SocketAddress address) throws IOException {
-        ServerSocketChannel serverChannel = ServerSocketChannel.open();
-        serverChannel.configureBlocking(false);
-        serverChannel.socket().bind(address);
-        SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
-        key.attach(null);
-    }
-
-    public SessionRequest connect(
-            final SocketAddress remoteAddress, 
-            final SocketAddress localAddress,
-            final Object attachment) throws IOException {
-        SocketChannel socketChannel = SocketChannel.open();
-        socketChannel.configureBlocking(false);
-        if (localAddress != null) {
-            socketChannel.socket().bind(localAddress);
-        }
-        socketChannel.connect(remoteAddress);
-        SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
-        
-        SessionRequestImpl sessionRequest = new SessionRequestImpl(
-                remoteAddress, localAddress, attachment, key);
-        sessionRequest.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(this.params));
-
-        SessionRequestHandle requestHandle = new SessionRequestHandle(sessionRequest); 
-        key.attach(requestHandle);
-        return sessionRequest;
-    }
-
     public void shutdown() throws IOException {
         if (this.closed) {
             return;

Added: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java?view=auto&rev=476827
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java (added)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java Sun Nov 19 06:38:07 2006
@@ -0,0 +1,106 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ *  Copyright 1999-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.impl.reactor;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+
+public class BaseIOReactor extends AbstractIOReactor {
+
+    private final HttpParams params;
+    
+    public BaseIOReactor(final HttpParams params) throws IOException {
+        super();
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        this.params = params;
+    }
+
+    protected void acceptable(final SelectionKey key) {
+    }
+
+    protected void connectable(final SelectionKey key) {
+    }
+
+    protected void readable(final SelectionKey key) {
+        SessionHandle handle = (SessionHandle) key.attachment();
+        IOSession session = handle.getSession();
+        handle.resetLastRead();
+
+        this.eventDispatch.inputReady(session);
+    }
+
+    protected void writable(final SelectionKey key) {
+        SessionHandle handle = (SessionHandle) key.attachment();
+        IOSession session = handle.getSession();
+        handle.resetLastWrite();
+        
+        this.eventDispatch.outputReady(session);
+    }
+
+    protected void timeoutCheck(final SelectionKey key, long now) {
+        Object attachment = key.attachment();
+        if (attachment instanceof SessionHandle) {
+            SessionHandle handle = (SessionHandle) key.attachment();
+            IOSession session = handle.getSession();
+            int timeout = session.getSocketTimeout();
+            if (timeout > 0) {
+                if (handle.getLastReadTime() + timeout < now) {
+                    this.eventDispatch.timeout(session);
+                }
+            }
+        }
+    }
+
+    public void addChannel(final SocketChannel channel) throws IOException {
+        Socket socket = channel.socket();
+        socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
+        socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
+        int linger = HttpConnectionParams.getLinger(this.params);
+        if (linger >= 0) {
+            socket.setSoLinger(linger > 0, linger);
+        }
+        SelectionKey key = registerChannel(channel);
+
+        IOSession session = newSession(key);
+        SessionHandle handle = new SessionHandle(session); 
+        key.attach(handle);
+        
+        this.eventDispatch.connected(session);
+    }
+    
+}

Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java?view=diff&rev=476827&r1=476826&r2=476827
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java Sun Nov 19 06:38:07 2006
@@ -30,7 +30,6 @@
 package org.apache.http.nio.reactor;
 
 import java.io.IOException;
-import java.net.SocketAddress;
 
 public interface IOReactor {
 
@@ -38,13 +37,6 @@
         throws IOException;
 
     void shutdown() 
-        throws IOException;
-    
-    SessionRequest connect(SocketAddress remoteAddress, 
-            SocketAddress localAddress, Object attachment) 
-        throws IOException;
-        
-    void listen(SocketAddress address) 
         throws IOException;
     
 }