You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2007/09/14 15:55:04 UTC

svn commit: r575682 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http: impl/nio/reactor/ nio/reactor/

Author: olegk
Date: Fri Sep 14 06:55:02 2007
New Revision: 575682

URL: http://svn.apache.org/viewvc?rev=575682&view=rev
Log:
HTTPCORE-109: Refactored the default connecting and listening I/O reactor impls; more common code from both moved to the abstract base class

Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java Fri Sep 14 06:55:02 2007
@@ -31,18 +31,34 @@
 
 package org.apache.http.impl.nio.reactor;
 
+import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.Socket;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.http.util.concurrent.ThreadFactory;
+import org.apache.http.nio.params.HttpNIOParams;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactor;
 import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
 
 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
 
-    private volatile int status;
+    protected volatile int status;
     
-    private final long selectTimeout;
+    protected final HttpParams params;
+    protected final Selector selector;
+    protected final long selectTimeout;
+
     private final int workerCount;
     private final ThreadFactory threadFactory;
     private final BaseIOReactor[] dispatchers;
@@ -52,14 +68,23 @@
     private int currentWorker = 0;
     
     public AbstractMultiworkerIOReactor(
-            long selectTimeout, 
             int workerCount, 
-            final ThreadFactory threadFactory) throws IOReactorException {
+            final ThreadFactory threadFactory,
+            final HttpParams params) throws IOReactorException {
         super();
         if (workerCount <= 0) {
             throw new IllegalArgumentException("Worker count may not be negative or zero");
         }
-        this.selectTimeout = selectTimeout;
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be negative or zero");
+        }
+        try {
+            this.selector = Selector.open();
+        } catch (IOException ex) {
+            throw new IOReactorException("Failure opening selector", ex);
+        }
+        this.params = params;
+        this.selectTimeout = HttpNIOParams.getSelectInterval(params);
         this.workerCount = workerCount;
         if (threadFactory != null) {
             this.threadFactory = threadFactory;
@@ -79,12 +104,17 @@
         return this.status;
     }
 
-    protected long getSelectTimeout() {
-        return this.selectTimeout;
-    }
+    protected abstract void processEvents(int count) throws IOReactorException;
     
-    protected void startWorkers(final IOEventDispatch eventDispatch) {
+    public void execute(
+            final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
+        if (eventDispatch == null) {
+            throw new IllegalArgumentException("Event dispatcher may not be null");
+        }
+
         this.status = ACTIVE;
+        
+        // Start I/O dispatchers
         for (int i = 0; i < this.workerCount; i++) {
             BaseIOReactor dispatcher = this.dispatchers[i];
             this.workers[i] = new Worker(dispatcher, eventDispatch);
@@ -96,61 +126,121 @@
             }
             this.threads[i].start();
         }
+        
+        for (;;) {
+
+            int readyCount;
+            try {
+                readyCount = this.selector.select(this.selectTimeout);
+            } catch (ClosedSelectorException ex) {
+                return;
+            } catch (InterruptedIOException ex) {
+                throw ex;
+            } catch (IOException ex) {
+                throw new IOReactorException("Unexpected selector failure", ex);
+            }
+            
+            if (this.status == SHUT_DOWN) {
+                break;
+            }
+            processEvents(readyCount);
+
+            // Verify I/O dispatchers
+            for (int i = 0; i < this.workerCount; i++) {
+                Worker worker = this.workers[i];
+                Thread thread = this.threads[i];
+                if (!thread.isAlive()) {
+                    Exception ex = worker.getException();
+                    if (ex instanceof IOReactorException) {
+                        throw (IOReactorException) ex;
+                    } else if (ex instanceof InterruptedIOException) {
+                        throw (InterruptedIOException) ex;
+                    } else if (ex instanceof RuntimeException) {
+                        throw (RuntimeException) ex;
+                    } else {
+                        throw new IOReactorException(ex.getMessage(), ex);
+                    }
+                }
+            }
+        }
     }
 
-    protected void stopWorkers(int timeout) 
-            throws InterruptedException, IOReactorException {
+    public void shutdown(long gracePeriod) throws IOException {
+        if (this.status > ACTIVE) {
+            return;
+        }
+        this.status = SHUTTING_DOWN;        
+        
+        // Close out all channels
+        Set keys = this.selector.keys();
+        for (Iterator it = keys.iterator(); it.hasNext(); ) {
+            try {
+                SelectionKey key = (SelectionKey) it.next();
+                Channel channel = key.channel();
+                if (channel != null) {
+                    channel.close();
+                }
+            } catch (IOException ignore) {
+            }
+        }
+        // Stop dispatching I/O events
+        this.selector.close();
 
         // Attempt to shut down I/O dispatchers gracefully
         for (int i = 0; i < this.workerCount; i++) {
             BaseIOReactor dispatcher = this.dispatchers[i];
             dispatcher.gracefulShutdown();
         }
-        // Force shut down I/O dispatchers if they fail to terminate
-        // in time
-        for (int i = 0; i < this.workerCount; i++) {
-            BaseIOReactor dispatcher = this.dispatchers[i];
-            if (dispatcher.getStatus() != INACTIVE) {
-                dispatcher.awaitShutdown(timeout);
-            }
-            if (dispatcher.getStatus() != SHUT_DOWN) {
-                dispatcher.hardShutdown();
-            }
-        }
-        // Join worker threads
-        for (int i = 0; i < this.workerCount; i++) {
-            Thread t = this.threads[i];
-            if (t != null) {
-                t.join(timeout);
+
+        try {
+            // Force shut down I/O dispatchers if they fail to terminate
+            // in time
+            for (int i = 0; i < this.workerCount; i++) {
+                BaseIOReactor dispatcher = this.dispatchers[i];
+                if (dispatcher.getStatus() != INACTIVE) {
+                    dispatcher.awaitShutdown(gracePeriod);
+                }
+                if (dispatcher.getStatus() != SHUT_DOWN) {
+                    dispatcher.hardShutdown();
+                }
             }
-        }
-    }
-    
-    protected void verifyWorkers() 
-            throws InterruptedIOException, IOReactorException {
-        for (int i = 0; i < this.workerCount; i++) {
-            Worker worker = this.workers[i];
-            Thread thread = this.threads[i];
-            if (!thread.isAlive()) {
-                Exception ex = worker.getException();
-                if (ex instanceof IOReactorException) {
-                    throw (IOReactorException) ex;
-                } else if (ex instanceof InterruptedIOException) {
-                    throw (InterruptedIOException) ex;
-                } else if (ex instanceof RuntimeException) {
-                    throw (RuntimeException) ex;
-                } else {
-                    throw new IOReactorException(ex.getMessage(), ex);
+            // Join worker threads
+            for (int i = 0; i < this.workerCount; i++) {
+                Thread t = this.threads[i];
+                if (t != null) {
+                    t.join(gracePeriod);
                 }
             }
+        } catch (InterruptedException ex) {
+            throw new InterruptedIOException(ex.getMessage());
+        } finally {
+            this.status = SHUT_DOWN;        
         }
     }
+
+    public void shutdown() throws IOException {
+        shutdown(500);
+    }
     
     protected void addChannel(final ChannelEntry entry) {
         // Distribute new channels among the workers
         this.dispatchers[this.currentWorker++ % this.workerCount].addChannel(entry);
     }
-        
+    
+    protected SelectionKey registerChannel(
+            final SelectableChannel channel, int ops) throws ClosedChannelException {
+        return channel.register(this.selector, ops);
+    }
+
+    protected void prepareSocket(final Socket socket) throws IOException {
+        socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
+        socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
+        int linger = HttpConnectionParams.getLinger(this.params);
+        if (linger >= 0) {
+            socket.setSoLinger(linger > 0, linger);
+        }
+    }
+
     static class Worker implements Runnable {
 
         final BaseIOReactor dispatcher;

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java Fri Sep 14 06:55:02 2007
@@ -32,36 +32,26 @@
 package org.apache.http.impl.nio.reactor;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.http.util.concurrent.ThreadFactory;
-import org.apache.http.nio.params.HttpNIOParams;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorException;
 import org.apache.http.nio.reactor.SessionRequest;
 import org.apache.http.nio.reactor.SessionRequestCallback;
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
+import org.apache.http.util.concurrent.ThreadFactory;
 
 public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor 
         implements ConnectingIOReactor {
 
-    private volatile boolean closed = false;
-    
-    private final HttpParams params;
-    private final Selector selector;
     private final SessionRequestQueue requestQueue;
     
     private long lastTimeoutCheck;
@@ -70,15 +60,9 @@
             int workerCount, 
             final ThreadFactory threadFactory,
             final HttpParams params) throws IOReactorException {
-        super(HttpNIOParams.getSelectInterval(params), workerCount, threadFactory);
-        this.params = params;
+        super(workerCount, threadFactory, params);
         this.requestQueue = new SessionRequestQueue();
         this.lastTimeoutCheck = System.currentTimeMillis();
-        try {
-            this.selector = Selector.open();
-        } catch (IOException ex) {
-            throw new IOReactorException("Failure opening selector", ex);
-        }
     }
 
     public DefaultConnectingIOReactor(
@@ -87,52 +71,28 @@
         this(workerCount, null, params);
     }
     
-    public void execute(final IOEventDispatch eventDispatch) 
-            throws InterruptedIOException, IOReactorException {
-        if (eventDispatch == null) {
-            throw new IllegalArgumentException("Event dispatcher may not be null");
-        }
-        startWorkers(eventDispatch);
-        for (;;) {
-            int readyCount;
-            try {
-                readyCount = this.selector.select(getSelectTimeout());
-            } catch (InterruptedIOException ex) {
-                throw ex;
-            } catch (IOException ex) {
-                throw new IOReactorException("Unexpected selector failure", ex);
-            }
-
-            if (this.closed) {
-                break;
-            }
-            
-            processSessionRequests();
-            
-            if (readyCount > 0) {
-                processEvents(this.selector.selectedKeys());
-            }
-            
-            long currentTime = System.currentTimeMillis();
-            if( (currentTime - this.lastTimeoutCheck) >= getSelectTimeout()) {
-                this.lastTimeoutCheck = currentTime;
-                Set keys = this.selector.keys();
-                if (keys != null) {
-                    processTimeouts(keys);
-                }
+    protected void processEvents(int readyCount) throws IOReactorException {
+        processSessionRequests();
+        
+        if (readyCount > 0) {
+            Set selectedKeys = this.selector.selectedKeys();
+            for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
+                
+                SelectionKey key = (SelectionKey) it.next();
+                processEvent(key);
+                
             }
-            verifyWorkers();
+            selectedKeys.clear();
         }
-    }
-    
-    private void processEvents(final Set selectedKeys) {
-        for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
-            
-            SelectionKey key = (SelectionKey) it.next();
-            processEvent(key);
-            
+        
+        long currentTime = System.currentTimeMillis();
+        if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
+            this.lastTimeoutCheck = currentTime;
+            Set keys = this.selector.keys();
+            if (keys != null) {
+                processTimeouts(keys);
+            }
         }
-        selectedKeys.clear();
     }
 
     private void processEvent(final SelectionKey key) {
@@ -168,15 +128,6 @@
         }
     }
 
-    protected void prepareSocket(final Socket socket) throws IOException {
-        socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
-        socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
-        int linger = HttpConnectionParams.getLinger(this.params);
-        if (linger >= 0) {
-            socket.setSoLinger(linger > 0, linger);
-        }
-    }
-    
     private void processTimeouts(final Set keys) {
         long now = System.currentTimeMillis();
         for (Iterator it = keys.iterator(); it.hasNext();) {
@@ -203,7 +154,7 @@
             final Object attachment,
             final SessionRequestCallback callback) {
 
-        if (this.closed) {
+        if (this.status > ACTIVE) {
             throw new IllegalStateException("I/O reactor has been shut down");
         }
         SessionRequestImpl sessionRequest = new SessionRequestImpl(
@@ -279,31 +230,4 @@
         }
     }
 
-    public void shutdown() throws IOException {
-        if (this.closed) {
-            return;
-        }
-        this.closed = true;
-        // Close out all channels
-        Set keys = this.selector.keys();
-        for (Iterator it = keys.iterator(); it.hasNext(); ) {
-            try {
-                SelectionKey key = (SelectionKey) it.next();
-                Channel channel = key.channel();
-                if (channel != null) {
-                    channel.close();
-                }
-            } catch (IOException ignore) {
-            }
-        }
-        // Stop dispatching I/O events
-        this.selector.close();
-        // Stop the workers
-        try {
-            stopWorkers(500);
-        } catch (InterruptedException ex) {
-            throw new InterruptedIOException(ex.getMessage());
-        }
-    }
-        
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java Fri Sep 14 06:55:02 2007
@@ -32,48 +32,30 @@
 package org.apache.http.impl.nio.reactor;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.Socket;
 import java.net.SocketAddress;
 import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.http.util.concurrent.ThreadFactory;
-import org.apache.http.nio.params.HttpNIOParams;
-import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorException;
 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
 import org.apache.http.nio.reactor.ListeningIOReactor;
-import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
+import org.apache.http.util.concurrent.ThreadFactory;
 
 public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor 
         implements ListeningIOReactor {
 
-    private volatile boolean closed = false;
-    
-    private final HttpParams params;
-    private final Selector selector;
-    
     private IOReactorExceptionHandler exceptionHandler;
     
     public DefaultListeningIOReactor(
             int workerCount, 
             final ThreadFactory threadFactory,
             final HttpParams params) throws IOReactorException {
-        super(HttpNIOParams.getSelectInterval(params), workerCount, threadFactory);
-        this.params = params;
-        try {
-            this.selector = Selector.open();
-        } catch (IOException ex) {
-            throw new IOReactorException("Failure opening selector", ex);
-        }
+        super(workerCount, threadFactory, params);
     }
 
     public DefaultListeningIOReactor(
@@ -86,42 +68,17 @@
         this.exceptionHandler = exceptionHandler;
     }
     
-    public void execute(final IOEventDispatch eventDispatch) 
-            throws InterruptedIOException, IOReactorException {
-        if (eventDispatch == null) {
-            throw new IllegalArgumentException("Event dispatcher may not be null");
-        }
-        startWorkers(eventDispatch);
-        for (;;) {
-
-            int readyCount;
-            try {
-                readyCount = this.selector.select(getSelectTimeout());
-            } catch (InterruptedIOException ex) {
-                throw ex;
-            } catch (IOException ex) {
-                throw new IOReactorException("Unexpected selector failure", ex);
-            }
-            
-            if (this.closed) {
-                break;
-            }
-            if (readyCount > 0) {
-                processEvents(this.selector.selectedKeys());
+    protected void processEvents(int readyCount) throws IOReactorException {
+        if (readyCount > 0) {
+            Set selectedKeys = this.selector.selectedKeys();
+            for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
+                
+                SelectionKey key = (SelectionKey) it.next();
+                processEvent(key);
+                
             }
-            verifyWorkers();
-        }
-    }
-    
-    private void processEvents(final Set selectedKeys)
-            throws IOReactorException {
-        for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
-            
-            SelectionKey key = (SelectionKey) it.next();
-            processEvent(key);
-            
+            selectedKeys.clear();
         }
-        selectedKeys.clear();
     }
 
     private void processEvent(final SelectionKey key) 
@@ -158,18 +115,9 @@
         }
     }
 
-    protected void prepareSocket(final Socket socket) throws IOException {
-        socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
-        socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
-        int linger = HttpConnectionParams.getLinger(this.params);
-        if (linger >= 0) {
-            socket.setSoLinger(linger > 0, linger);
-        }
-    }
-
     public SocketAddress listen(
             final SocketAddress address) throws IOException {
-        if (this.closed) {
+        if (this.status > ACTIVE) {
             throw new IllegalStateException("I/O reactor has been shut down");
         }
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -178,34 +126,6 @@
         SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
         key.attach(null);
         return serverChannel.socket().getLocalSocketAddress();
-    }
-
-    public void shutdown() throws IOException {
-        if (this.closed) {
-            return;
-        }
-        this.closed = true;
-        
-        // Close out all channels
-        Set keys = this.selector.keys();
-        for (Iterator it = keys.iterator(); it.hasNext(); ) {
-            try {
-                SelectionKey key = (SelectionKey) it.next();
-                Channel channel = key.channel();
-                if (channel != null) {
-                    channel.close();
-                }
-            } catch (IOException ignore) {
-            }
-        }
-        // Stop dispatching I/O events
-        this.selector.close();
-        // Stop the workers
-        try {
-            stopWorkers(500);
-        } catch (InterruptedException ex) {
-            throw new InterruptedIOException(ex.getMessage());
-        }
     }
 
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java Fri Sep 14 06:55:02 2007
@@ -45,6 +45,9 @@
     void execute(IOEventDispatch eventDispatch) 
         throws IOException;
 
+    void shutdown(long gracePeriod) 
+        throws IOException;
+
     void shutdown() 
         throws IOException;