You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2007/09/14 15:55:04 UTC
svn commit: r575682 - in
/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http:
impl/nio/reactor/ nio/reactor/
Author: olegk
Date: Fri Sep 14 06:55:02 2007
New Revision: 575682
URL: http://svn.apache.org/viewvc?rev=575682&view=rev
Log:
HTTPCORE-109: Refactored the default connecting and listening I/O reactor impls; more common code from both moved to the abstract base class
Modified:
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java Fri Sep 14 06:55:02 2007
@@ -31,18 +31,34 @@
package org.apache.http.impl.nio.reactor;
+import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.Socket;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
import org.apache.http.util.concurrent.ThreadFactory;
+import org.apache.http.nio.params.HttpNIOParams;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactor;
import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
public abstract class AbstractMultiworkerIOReactor implements IOReactor {
- private volatile int status;
+ protected volatile int status;
- private final long selectTimeout;
+ protected final HttpParams params;
+ protected final Selector selector;
+ protected final long selectTimeout;
+
private final int workerCount;
private final ThreadFactory threadFactory;
private final BaseIOReactor[] dispatchers;
@@ -52,14 +68,23 @@
private int currentWorker = 0;
public AbstractMultiworkerIOReactor(
- long selectTimeout,
int workerCount,
- final ThreadFactory threadFactory) throws IOReactorException {
+ final ThreadFactory threadFactory,
+ final HttpParams params) throws IOReactorException {
super();
if (workerCount <= 0) {
throw new IllegalArgumentException("Worker count may not be negative or zero");
}
- this.selectTimeout = selectTimeout;
+ if (params == null) {
+ throw new IllegalArgumentException("HTTP parameters may not be negative or zero");
+ }
+ try {
+ this.selector = Selector.open();
+ } catch (IOException ex) {
+ throw new IOReactorException("Failure opening selector", ex);
+ }
+ this.params = params;
+ this.selectTimeout = HttpNIOParams.getSelectInterval(params);
this.workerCount = workerCount;
if (threadFactory != null) {
this.threadFactory = threadFactory;
@@ -79,12 +104,17 @@
return this.status;
}
- protected long getSelectTimeout() {
- return this.selectTimeout;
- }
+ protected abstract void processEvents(int count) throws IOReactorException;
- protected void startWorkers(final IOEventDispatch eventDispatch) {
+ public void execute(
+ final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
+ if (eventDispatch == null) {
+ throw new IllegalArgumentException("Event dispatcher may not be null");
+ }
+
this.status = ACTIVE;
+
+ // Start I/O dispatchers
for (int i = 0; i < this.workerCount; i++) {
BaseIOReactor dispatcher = this.dispatchers[i];
this.workers[i] = new Worker(dispatcher, eventDispatch);
@@ -96,61 +126,121 @@
}
this.threads[i].start();
}
+
+ for (;;) {
+
+ int readyCount;
+ try {
+ readyCount = this.selector.select(this.selectTimeout);
+ } catch (ClosedSelectorException ex) {
+ return;
+ } catch (InterruptedIOException ex) {
+ throw ex;
+ } catch (IOException ex) {
+ throw new IOReactorException("Unexpected selector failure", ex);
+ }
+
+ if (this.status == SHUT_DOWN) {
+ break;
+ }
+ processEvents(readyCount);
+
+ // Verify I/O dispatchers
+ for (int i = 0; i < this.workerCount; i++) {
+ Worker worker = this.workers[i];
+ Thread thread = this.threads[i];
+ if (!thread.isAlive()) {
+ Exception ex = worker.getException();
+ if (ex instanceof IOReactorException) {
+ throw (IOReactorException) ex;
+ } else if (ex instanceof InterruptedIOException) {
+ throw (InterruptedIOException) ex;
+ } else if (ex instanceof RuntimeException) {
+ throw (RuntimeException) ex;
+ } else {
+ throw new IOReactorException(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
}
- protected void stopWorkers(int timeout)
- throws InterruptedException, IOReactorException {
+ public void shutdown(long gracePeriod) throws IOException {
+ if (this.status > ACTIVE) {
+ return;
+ }
+ this.status = SHUTTING_DOWN;
+
+ // Close out all channels
+ Set keys = this.selector.keys();
+ for (Iterator it = keys.iterator(); it.hasNext(); ) {
+ try {
+ SelectionKey key = (SelectionKey) it.next();
+ Channel channel = key.channel();
+ if (channel != null) {
+ channel.close();
+ }
+ } catch (IOException ignore) {
+ }
+ }
+ // Stop dispatching I/O events
+ this.selector.close();
// Attempt to shut down I/O dispatchers gracefully
for (int i = 0; i < this.workerCount; i++) {
BaseIOReactor dispatcher = this.dispatchers[i];
dispatcher.gracefulShutdown();
}
- // Force shut down I/O dispatchers if they fail to terminate
- // in time
- for (int i = 0; i < this.workerCount; i++) {
- BaseIOReactor dispatcher = this.dispatchers[i];
- if (dispatcher.getStatus() != INACTIVE) {
- dispatcher.awaitShutdown(timeout);
- }
- if (dispatcher.getStatus() != SHUT_DOWN) {
- dispatcher.hardShutdown();
- }
- }
- // Join worker threads
- for (int i = 0; i < this.workerCount; i++) {
- Thread t = this.threads[i];
- if (t != null) {
- t.join(timeout);
+
+ try {
+ // Force shut down I/O dispatchers if they fail to terminate
+ // in time
+ for (int i = 0; i < this.workerCount; i++) {
+ BaseIOReactor dispatcher = this.dispatchers[i];
+ if (dispatcher.getStatus() != INACTIVE) {
+ dispatcher.awaitShutdown(gracePeriod);
+ }
+ if (dispatcher.getStatus() != SHUT_DOWN) {
+ dispatcher.hardShutdown();
+ }
}
- }
- }
-
- protected void verifyWorkers()
- throws InterruptedIOException, IOReactorException {
- for (int i = 0; i < this.workerCount; i++) {
- Worker worker = this.workers[i];
- Thread thread = this.threads[i];
- if (!thread.isAlive()) {
- Exception ex = worker.getException();
- if (ex instanceof IOReactorException) {
- throw (IOReactorException) ex;
- } else if (ex instanceof InterruptedIOException) {
- throw (InterruptedIOException) ex;
- } else if (ex instanceof RuntimeException) {
- throw (RuntimeException) ex;
- } else {
- throw new IOReactorException(ex.getMessage(), ex);
+ // Join worker threads
+ for (int i = 0; i < this.workerCount; i++) {
+ Thread t = this.threads[i];
+ if (t != null) {
+ t.join(gracePeriod);
}
}
+ } catch (InterruptedException ex) {
+ throw new InterruptedIOException(ex.getMessage());
+ } finally {
+ this.status = SHUT_DOWN;
}
}
+
+ public void shutdown() throws IOException {
+ shutdown(500);
+ }
protected void addChannel(final ChannelEntry entry) {
// Distribute new channels among the workers
this.dispatchers[this.currentWorker++ % this.workerCount].addChannel(entry);
}
-
+
+ protected SelectionKey registerChannel(
+ final SelectableChannel channel, int ops) throws ClosedChannelException {
+ return channel.register(this.selector, ops);
+ }
+
+ protected void prepareSocket(final Socket socket) throws IOException {
+ socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
+ socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
+ int linger = HttpConnectionParams.getLinger(this.params);
+ if (linger >= 0) {
+ socket.setSoLinger(linger > 0, linger);
+ }
+ }
+
static class Worker implements Runnable {
final BaseIOReactor dispatcher;
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java Fri Sep 14 06:55:02 2007
@@ -32,36 +32,26 @@
package org.apache.http.impl.nio.reactor;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
-import org.apache.http.util.concurrent.ThreadFactory;
-import org.apache.http.nio.params.HttpNIOParams;
import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
+import org.apache.http.util.concurrent.ThreadFactory;
public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
implements ConnectingIOReactor {
- private volatile boolean closed = false;
-
- private final HttpParams params;
- private final Selector selector;
private final SessionRequestQueue requestQueue;
private long lastTimeoutCheck;
@@ -70,15 +60,9 @@
int workerCount,
final ThreadFactory threadFactory,
final HttpParams params) throws IOReactorException {
- super(HttpNIOParams.getSelectInterval(params), workerCount, threadFactory);
- this.params = params;
+ super(workerCount, threadFactory, params);
this.requestQueue = new SessionRequestQueue();
this.lastTimeoutCheck = System.currentTimeMillis();
- try {
- this.selector = Selector.open();
- } catch (IOException ex) {
- throw new IOReactorException("Failure opening selector", ex);
- }
}
public DefaultConnectingIOReactor(
@@ -87,52 +71,28 @@
this(workerCount, null, params);
}
- public void execute(final IOEventDispatch eventDispatch)
- throws InterruptedIOException, IOReactorException {
- if (eventDispatch == null) {
- throw new IllegalArgumentException("Event dispatcher may not be null");
- }
- startWorkers(eventDispatch);
- for (;;) {
- int readyCount;
- try {
- readyCount = this.selector.select(getSelectTimeout());
- } catch (InterruptedIOException ex) {
- throw ex;
- } catch (IOException ex) {
- throw new IOReactorException("Unexpected selector failure", ex);
- }
-
- if (this.closed) {
- break;
- }
-
- processSessionRequests();
-
- if (readyCount > 0) {
- processEvents(this.selector.selectedKeys());
- }
-
- long currentTime = System.currentTimeMillis();
- if( (currentTime - this.lastTimeoutCheck) >= getSelectTimeout()) {
- this.lastTimeoutCheck = currentTime;
- Set keys = this.selector.keys();
- if (keys != null) {
- processTimeouts(keys);
- }
+ protected void processEvents(int readyCount) throws IOReactorException {
+ processSessionRequests();
+
+ if (readyCount > 0) {
+ Set selectedKeys = this.selector.selectedKeys();
+ for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
+
+ SelectionKey key = (SelectionKey) it.next();
+ processEvent(key);
+
}
- verifyWorkers();
+ selectedKeys.clear();
}
- }
-
- private void processEvents(final Set selectedKeys) {
- for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
-
- SelectionKey key = (SelectionKey) it.next();
- processEvent(key);
-
+
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
+ this.lastTimeoutCheck = currentTime;
+ Set keys = this.selector.keys();
+ if (keys != null) {
+ processTimeouts(keys);
+ }
}
- selectedKeys.clear();
}
private void processEvent(final SelectionKey key) {
@@ -168,15 +128,6 @@
}
}
- protected void prepareSocket(final Socket socket) throws IOException {
- socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
- socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
- int linger = HttpConnectionParams.getLinger(this.params);
- if (linger >= 0) {
- socket.setSoLinger(linger > 0, linger);
- }
- }
-
private void processTimeouts(final Set keys) {
long now = System.currentTimeMillis();
for (Iterator it = keys.iterator(); it.hasNext();) {
@@ -203,7 +154,7 @@
final Object attachment,
final SessionRequestCallback callback) {
- if (this.closed) {
+ if (this.status > ACTIVE) {
throw new IllegalStateException("I/O reactor has been shut down");
}
SessionRequestImpl sessionRequest = new SessionRequestImpl(
@@ -279,31 +230,4 @@
}
}
- public void shutdown() throws IOException {
- if (this.closed) {
- return;
- }
- this.closed = true;
- // Close out all channels
- Set keys = this.selector.keys();
- for (Iterator it = keys.iterator(); it.hasNext(); ) {
- try {
- SelectionKey key = (SelectionKey) it.next();
- Channel channel = key.channel();
- if (channel != null) {
- channel.close();
- }
- } catch (IOException ignore) {
- }
- }
- // Stop dispatching I/O events
- this.selector.close();
- // Stop the workers
- try {
- stopWorkers(500);
- } catch (InterruptedException ex) {
- throw new InterruptedIOException(ex.getMessage());
- }
- }
-
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java Fri Sep 14 06:55:02 2007
@@ -32,48 +32,30 @@
package org.apache.http.impl.nio.reactor;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
-import org.apache.http.util.concurrent.ThreadFactory;
-import org.apache.http.nio.params.HttpNIOParams;
-import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.nio.reactor.ListeningIOReactor;
-import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
+import org.apache.http.util.concurrent.ThreadFactory;
public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
implements ListeningIOReactor {
- private volatile boolean closed = false;
-
- private final HttpParams params;
- private final Selector selector;
-
private IOReactorExceptionHandler exceptionHandler;
public DefaultListeningIOReactor(
int workerCount,
final ThreadFactory threadFactory,
final HttpParams params) throws IOReactorException {
- super(HttpNIOParams.getSelectInterval(params), workerCount, threadFactory);
- this.params = params;
- try {
- this.selector = Selector.open();
- } catch (IOException ex) {
- throw new IOReactorException("Failure opening selector", ex);
- }
+ super(workerCount, threadFactory, params);
}
public DefaultListeningIOReactor(
@@ -86,42 +68,17 @@
this.exceptionHandler = exceptionHandler;
}
- public void execute(final IOEventDispatch eventDispatch)
- throws InterruptedIOException, IOReactorException {
- if (eventDispatch == null) {
- throw new IllegalArgumentException("Event dispatcher may not be null");
- }
- startWorkers(eventDispatch);
- for (;;) {
-
- int readyCount;
- try {
- readyCount = this.selector.select(getSelectTimeout());
- } catch (InterruptedIOException ex) {
- throw ex;
- } catch (IOException ex) {
- throw new IOReactorException("Unexpected selector failure", ex);
- }
-
- if (this.closed) {
- break;
- }
- if (readyCount > 0) {
- processEvents(this.selector.selectedKeys());
+ protected void processEvents(int readyCount) throws IOReactorException {
+ if (readyCount > 0) {
+ Set selectedKeys = this.selector.selectedKeys();
+ for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
+
+ SelectionKey key = (SelectionKey) it.next();
+ processEvent(key);
+
}
- verifyWorkers();
- }
- }
-
- private void processEvents(final Set selectedKeys)
- throws IOReactorException {
- for (Iterator it = selectedKeys.iterator(); it.hasNext(); ) {
-
- SelectionKey key = (SelectionKey) it.next();
- processEvent(key);
-
+ selectedKeys.clear();
}
- selectedKeys.clear();
}
private void processEvent(final SelectionKey key)
@@ -158,18 +115,9 @@
}
}
- protected void prepareSocket(final Socket socket) throws IOException {
- socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
- socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
- int linger = HttpConnectionParams.getLinger(this.params);
- if (linger >= 0) {
- socket.setSoLinger(linger > 0, linger);
- }
- }
-
public SocketAddress listen(
final SocketAddress address) throws IOException {
- if (this.closed) {
+ if (this.status > ACTIVE) {
throw new IllegalStateException("I/O reactor has been shut down");
}
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -178,34 +126,6 @@
SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
key.attach(null);
return serverChannel.socket().getLocalSocketAddress();
- }
-
- public void shutdown() throws IOException {
- if (this.closed) {
- return;
- }
- this.closed = true;
-
- // Close out all channels
- Set keys = this.selector.keys();
- for (Iterator it = keys.iterator(); it.hasNext(); ) {
- try {
- SelectionKey key = (SelectionKey) it.next();
- Channel channel = key.channel();
- if (channel != null) {
- channel.close();
- }
- } catch (IOException ignore) {
- }
- }
- // Stop dispatching I/O events
- this.selector.close();
- // Stop the workers
- try {
- stopWorkers(500);
- } catch (InterruptedException ex) {
- throw new InterruptedIOException(ex.getMessage());
- }
}
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java?rev=575682&r1=575681&r2=575682&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java Fri Sep 14 06:55:02 2007
@@ -45,6 +45,9 @@
void execute(IOEventDispatch eventDispatch)
throws IOException;
+ void shutdown(long gracePeriod)
+ throws IOException;
+
void shutdown()
throws IOException;