You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2006/11/19 20:15:46 UTC

svn commit: r476890 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src: examples/org/apache/http/nio/examples/ main/java/org/apache/http/nio/impl/reactor/

Author: olegk
Date: Sun Nov 19 11:15:45 2006
New Revision: 476890

URL: http://svn.apache.org/viewvc?view=rev&rev=476890
Log:
* Fixed samples
* Added abstract multithreaded I/O reactor

Added:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java   (with props)
Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java?view=diff&rev=476890&r1=476889&r2=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java Sun Nov 19 11:15:45 2006
@@ -31,7 +31,7 @@
 import org.apache.http.nio.impl.reactor.DefaultIOReactor;
 import org.apache.http.nio.protocol.AsyncHttpService;
 import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.IOReactor;
+import org.apache.http.nio.reactor.ListeningIOReactor;
 import org.apache.http.nio.util.ContentInputBuffer;
 import org.apache.http.nio.util.ContentOutputBuffer;
 import org.apache.http.params.HttpConnectionParams;
@@ -62,7 +62,7 @@
             .setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
             .setParameter(HttpProtocolParams.ORIGIN_SERVER, "Jakarta-HttpComponents-NIO/1.1");
 
-        IOReactor ioReactor = new DefaultIOReactor(params);
+        ListeningIOReactor ioReactor = new DefaultIOReactor(params);
 
         // Set up request handlers
         HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry();

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java?view=diff&rev=476890&r1=476889&r2=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java Sun Nov 19 11:15:45 2006
@@ -9,8 +9,8 @@
 import org.apache.http.nio.impl.reactor.DefaultIOReactor;
 import org.apache.http.nio.reactor.EventMask;
 import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.IOReactor;
 import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.ListeningIOReactor;
 import org.apache.http.params.HttpParams;
 
 public class ElementalEchoServer {
@@ -18,7 +18,7 @@
     public static void main(String[] args) throws Exception {
         HttpParams params = new DefaultHttpParams(); 
         IOEventDispatch ioEventDispatch = new DefaultIoEventDispatch();
-        IOReactor ioReactor = new DefaultIOReactor(params);
+        ListeningIOReactor ioReactor = new DefaultIOReactor(params);
         ioReactor.listen(new InetSocketAddress(8080));
         try {
             ioReactor.execute(ioEventDispatch);

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java?view=diff&rev=476890&r1=476889&r2=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java Sun Nov 19 11:15:45 2006
@@ -25,8 +25,8 @@
 import org.apache.http.nio.NHttpClientHandler;
 import org.apache.http.nio.impl.DefaultClientIOEventDispatch;
 import org.apache.http.nio.impl.reactor.DefaultIOReactor;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.IOReactor;
 import org.apache.http.nio.reactor.SessionRequest;
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
@@ -55,7 +55,7 @@
             .setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
             .setParameter(HttpProtocolParams.USER_AGENT, "Jakarta-HttpComponents-NIO/1.1");
 
-        IOReactor ioReactor = new DefaultIOReactor(params);
+        ConnectingIOReactor ioReactor = new DefaultIOReactor(params);
 
         SessionRequest[] reqs = new SessionRequest[3];
         reqs[0] = ioReactor.connect(

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java?view=diff&rev=476890&r1=476889&r2=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java Sun Nov 19 11:15:45 2006
@@ -35,7 +35,7 @@
 import org.apache.http.nio.impl.DefaultServerIOEventDispatch;
 import org.apache.http.nio.impl.reactor.DefaultIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.IOReactor;
+import org.apache.http.nio.reactor.ListeningIOReactor;
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.params.HttpProtocolParams;
@@ -65,7 +65,7 @@
             .setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
             .setParameter(HttpProtocolParams.ORIGIN_SERVER, "Jakarta-HttpComponents-NIO/1.1");
 
-        IOReactor ioReactor = new DefaultIOReactor(params);
+        ListeningIOReactor ioReactor = new DefaultIOReactor(params);
 
         NHttpServiceHandler handler = new MyNHttpServiceHandler(args[0], params);
         IOEventDispatch ioEventDispatch = new DefaultServerIOEventDispatch(handler, params);

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java?view=diff&rev=476890&r1=476889&r2=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java Sun Nov 19 11:15:45 2006
@@ -79,7 +79,7 @@
         return channel.register(this.selector, 0);
     }
     
-    protected IOSession newSession(final SelectionKey key) throws IOException {
+    protected IOSession newSession(final SelectionKey key) {
         IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
 
             public void sessionClosed(IOSession session) {
@@ -91,7 +91,7 @@
         return session;
     }
     
-    public void execute(final IOEventDispatch eventDispatch) throws IOException {
+    public void execute(final IOEventDispatch eventDispatch) {
         if (eventDispatch == null) {
             throw new IllegalArgumentException("Event dispatcher may not be null");
         }
@@ -99,7 +99,13 @@
         
         try {
             for (;;) {
-                int readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
+                
+                int readyCount = 0;
+                try {
+                    readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
+                } catch (IOException ex) {
+                    this.closed = true;
+                }
                 if (this.closed) {
                     break;
                 }
@@ -124,7 +130,7 @@
         }
     }
     
-    private void processEvents(final Set selectedKeys) throws IOException {
+    private void processEvents(final Set selectedKeys) {
         for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
             
             SelectionKey key = (SelectionKey) it.next();
@@ -134,7 +140,7 @@
         selectedKeys.clear();
     }
 
-    private void processEvent(final SelectionKey key) throws IOException {
+    private void processEvent(final SelectionKey key) {
         try {
             if (key.isAcceptable()) {
                 acceptable(key);

Added: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java?view=auto&rev=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java (added)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java Sun Nov 19 11:15:45 2006
@@ -0,0 +1,115 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ *  Copyright 1999-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.impl.reactor;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactor;
+
+public abstract class AbstractMultiworkerIOReactor implements IOReactor {
+
+    private final int workerCount;
+    private final BaseIOReactor[] ioReactors;
+    private final Thread[] threads;
+    
+    private int currentWorker = 0;
+    
+    public AbstractMultiworkerIOReactor(int workerCount) throws IOException {
+        super();
+        if (workerCount <= 0) {
+            throw new IllegalArgumentException("Worker count may not be negative or zero");
+        }
+        this.workerCount = workerCount;
+        this.ioReactors = new BaseIOReactor[workerCount];
+        this.threads = new Thread[workerCount];
+        for (int i = 0; i < this.ioReactors.length; i++) {
+            this.ioReactors[i] = new BaseIOReactor();
+        }
+    }
+
+    protected void startWorkers(final IOEventDispatch eventDispatch) {
+        if (eventDispatch == null) {
+            throw new IllegalArgumentException("Event dispatcher may not be null");
+        }
+        for (int i = 0; i < this.workerCount; i++) {
+            BaseIOReactor ioReactor = this.ioReactors[i];
+            this.threads[i] = new WorkerThread(ioReactor, eventDispatch);
+        }
+        for (int i = 0; i < this.workerCount; i++) {
+            this.threads[i].start();
+        }
+    }
+
+    protected void stopWorkers(int millis) throws IOException {
+        for (int i = 0; i < this.workerCount; i++) {
+            this.ioReactors[i].shutdown();
+        }
+        for (int i = 0; i < this.workerCount; i++) {
+            try {
+                this.threads[i].join(millis);
+            } catch (InterruptedException ex) {
+                throw new InterruptedIOException(ex.getMessage());
+            }
+        }
+    }
+    
+    protected void addChannel(final SocketChannel channel) throws IOException {
+        // Distribute new channels among the workers
+        this.ioReactors[this.currentWorker++ % this.workerCount].addChannel(channel);
+    }
+        
+    static class WorkerThread extends Thread {
+
+        final BaseIOReactor ioReactor;
+        final IOEventDispatch eventDispatch;
+        
+        public WorkerThread(final BaseIOReactor ioReactor, final IOEventDispatch eventDispatch) {
+            super();
+            this.ioReactor = ioReactor;
+            this.eventDispatch = eventDispatch;
+        }
+        
+        public void run() {
+            try {
+                this.ioReactor.execute(this.eventDispatch);
+            } finally {
+                try {
+                    this.ioReactor.shutdown();
+                } catch (IOException ignore) {
+                }
+            }
+        }
+        
+    }
+
+}

Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java?view=diff&rev=476890&r1=476889&r2=476890
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java Sun Nov 19 11:15:45 2006
@@ -30,24 +30,15 @@
 package org.apache.http.nio.impl.reactor;
 
 import java.io.IOException;
-import java.net.Socket;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
 import org.apache.http.nio.reactor.IOSession;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
 
 public class BaseIOReactor extends AbstractIOReactor {
 
-    private final HttpParams params;
-    
-    public BaseIOReactor(final HttpParams params) throws IOException {
+    public BaseIOReactor() throws IOException {
         super();
-        if (params == null) {
-            throw new IllegalArgumentException("HTTP parameters may not be null");
-        }
-        this.params = params;
     }
 
     protected void acceptable(final SelectionKey key) {
@@ -86,14 +77,7 @@
         }
     }
 
-    public void addChannel(final SocketChannel channel) throws IOException {
-        Socket socket = channel.socket();
-        socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
-        socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
-        int linger = HttpConnectionParams.getLinger(this.params);
-        if (linger >= 0) {
-            socket.setSoLinger(linger > 0, linger);
-        }
+    protected void addChannel(final SocketChannel channel) throws IOException {
         SelectionKey key = registerChannel(channel);
 
         IOSession session = newSession(key);