You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2008/01/17 17:46:07 UTC
svn commit: r612862 - in
/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http:
impl/nio/reactor/AbstractMultiworkerIOReactor.java
nio/reactor/IOReactor.java nio/reactor/IOReactorStatus.java
Author: olegk
Date: Thu Jan 17 08:46:03 2008
New Revision: 612862
URL: http://svn.apache.org/viewvc?rev=612862&view=rev
Log:
Made several changes to the way I/O reactors perform graceful shutdown
* Public #shutdown methods no longer release shared resources; they are intended to initiate the shutdown sequence and block awaiting its completion
* The shutdown code is now executed on the I/O dispatch thread
Modified:
httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactorStatus.java
Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=612862&r1=612861&r2=612862&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java Thu Jan 17 08:46:03 2008
@@ -66,11 +66,13 @@
private final BaseIOReactor[] dispatchers;
private final Worker[] workers;
private final Thread[] threads;
+ private final long gracePeriod;
+ private final Object shutdownMutex;
protected IOReactorExceptionHandler exceptionHandler;
private int currentWorker = 0;
-
+
public AbstractMultiworkerIOReactor(
int workerCount,
final ThreadFactory threadFactory,
@@ -89,6 +91,8 @@
}
this.params = params;
this.selectTimeout = NIOReactorParams.getSelectInterval(params);
+ this.gracePeriod = 500;
+ this.shutdownMutex = new Object();
this.workerCount = workerCount;
if (threadFactory != null) {
this.threadFactory = threadFactory;
@@ -96,9 +100,6 @@
this.threadFactory = new DefaultThreadFactory();
}
this.dispatchers = new BaseIOReactor[workerCount];
- for (int i = 0; i < this.dispatchers.length; i++) {
- this.dispatchers[i] = new BaseIOReactor(selectTimeout);
- }
this.workers = new Worker[workerCount];
this.threads = new Thread[workerCount];
this.status = IOReactorStatus.INACTIVE;
@@ -110,10 +111,6 @@
public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
- for (int i = 0; i < this.workerCount; i++) {
- BaseIOReactor dispatcher = this.dispatchers[i];
- dispatcher.setExceptionHandler(exceptionHandler);
- }
}
protected abstract void processEvents(int count) throws IOReactorException;
@@ -128,6 +125,11 @@
this.status = IOReactorStatus.ACTIVE;
// Start I/O dispatchers
+ for (int i = 0; i < this.dispatchers.length; i++) {
+ BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout);
+ dispatcher.setExceptionHandler(exceptionHandler);
+ this.dispatchers[i] = dispatcher;
+ }
for (int i = 0; i < this.workerCount; i++) {
BaseIOReactor dispatcher = this.dispatchers[i];
this.workers[i] = new Worker(dispatcher, eventDispatch);
@@ -143,7 +145,6 @@
try {
for (;;) {
-
int readyCount;
try {
readyCount = this.selector.select(this.selectTimeout);
@@ -181,15 +182,15 @@
} finally {
// Shutdown
try {
- shutdown(500);
+ doShutdown();
} catch (IOException ex) {
throw new IOReactorException(ex.getMessage(), ex);
}
}
}
- public void shutdown(long gracePeriod) throws IOException {
- if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
+ protected void doShutdown() throws IOException {
+ if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
return;
}
this.status = IOReactorStatus.SHUTTING_DOWN;
@@ -225,7 +226,7 @@
for (int i = 0; i < this.workerCount; i++) {
BaseIOReactor dispatcher = this.dispatchers[i];
if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
- dispatcher.awaitShutdown(gracePeriod);
+ dispatcher.awaitShutdown(this.gracePeriod);
}
if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
dispatcher.hardShutdown();
@@ -235,20 +236,19 @@
for (int i = 0; i < this.workerCount; i++) {
Thread t = this.threads[i];
if (t != null) {
- t.join(gracePeriod);
+ t.join(this.gracePeriod);
}
}
} catch (InterruptedException ex) {
throw new InterruptedIOException(ex.getMessage());
} finally {
- this.status = IOReactorStatus.SHUT_DOWN;
+ synchronized (this.shutdownMutex) {
+ this.status = IOReactorStatus.SHUT_DOWN;
+ this.shutdownMutex.notifyAll();
+ }
}
}
- public void shutdown() throws IOException {
- shutdown(500);
- }
-
protected void addChannel(final ChannelEntry entry) {
// Distribute new channels among the workers
this.dispatchers[this.currentWorker++ % this.workerCount].addChannel(entry);
@@ -265,6 +265,38 @@
int linger = HttpConnectionParams.getLinger(this.params);
if (linger >= 0) {
socket.setSoLinger(linger > 0, linger);
+ }
+ }
+
+ protected void awaitShutdown(long timeout) throws InterruptedException {
+ synchronized (this.shutdownMutex) {
+ long deadline = System.currentTimeMillis() + timeout;
+ long remaining = timeout;
+ while (this.status != IOReactorStatus.SHUT_DOWN) {
+ this.shutdownMutex.wait(remaining);
+ if (timeout > 0) {
+ remaining = deadline - System.currentTimeMillis();
+ if (remaining <= 0) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public void shutdown() throws IOException {
+ shutdown(2000);
+ }
+
+ public void shutdown(long waitMs) throws IOException {
+ if (this.status != IOReactorStatus.ACTIVE) {
+ return;
+ }
+ this.status = IOReactorStatus.SHUTDOWN_REQUEST;
+ this.selector.wakeup();
+ try {
+ awaitShutdown(waitMs);
+ } catch (InterruptedException ignore) {
}
}
Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java?rev=612862&r1=612861&r2=612862&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java (original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java Thu Jan 17 08:46:03 2008
@@ -40,7 +40,7 @@
void execute(IOEventDispatch eventDispatch)
throws IOException;
- void shutdown(long gracePeriod)
+ void shutdown(long waitMs)
throws IOException;
void shutdown()
Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactorStatus.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactorStatus.java?rev=612862&r1=612861&r2=612862&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactorStatus.java (original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactorStatus.java Thu Jan 17 08:46:03 2008
@@ -35,6 +35,7 @@
INACTIVE,
ACTIVE,
+ SHUTDOWN_REQUEST,
SHUTTING_DOWN,
SHUT_DOWN;