You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by as...@apache.org on 2007/12/13 19:54:06 UTC
svn commit: r603976 - in
/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http:
impl/nio/reactor/DefaultListeningIOReactor.java
impl/nio/reactor/ListenerEndpointImpl.java
nio/reactor/ListeningIOReactor.java
Author: asankha
Date: Thu Dec 13 10:54:05 2007
New Revision: 603976
URL: http://svn.apache.org/viewvc?rev=603976&view=rev
Log:
implement HTTPCORE-127 that allows a ListeningIOReactor to be paused and resumed. When paused, the reactor stops accepting new connections - but continues to serve existing connections until completed. This allows a system to appear offline to an external load balancer before being brought down for maintenence
Modified:
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=603976&r1=603975&r2=603976&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java Thu Dec 13 10:54:05 2007
@@ -55,6 +55,7 @@
implements ListeningIOReactor {
private final Queue<ListenerEndpointImpl> requestQueue;
+ private final Queue<ListenerEndpointImpl> pausedQueue;
public DefaultListeningIOReactor(
int workerCount,
@@ -62,6 +63,7 @@
final HttpParams params) throws IOReactorException {
super(workerCount, threadFactory, params);
this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
+ this.pausedQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
}
public DefaultListeningIOReactor(
@@ -159,7 +161,7 @@
throw new IOReactorException("Failure registering channel " +
"with the selector", ex);
}
- request.completed(serverChannel.socket().getLocalSocketAddress());
+ request.completed(serverChannel);
}
}
@@ -179,5 +181,30 @@
}
return list.toArray(new ListenerEndpoint[list.size()]);
}
-
+
+ public void pause() throws IOException {
+ if (this.selector.isOpen()) {
+ Set<SelectionKey> keys = this.selector.keys();
+ for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
+ SelectionKey key = it.next();
+ if (key.isValid()) {
+ ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
+ if (endpoint != null) {
+ endpoint.close();
+ this.pausedQueue.add(endpoint);
+ }
+ }
+ }
+ }
+ }
+
+ public void resume() throws IOException {
+ ListenerEndpointImpl endpoint;
+ while ((endpoint = this.pausedQueue.poll()) != null) {
+ ListenerEndpointImpl request = new ListenerEndpointImpl(endpoint.getAddress());
+ this.requestQueue.add(request);
+ }
+ this.pausedQueue.clear();
+ this.selector.wakeup();
+ }
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java?rev=603976&r1=603975&r2=603976&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java Thu Dec 13 10:54:05 2007
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
import org.apache.http.nio.reactor.ListenerEndpoint;
@@ -43,6 +44,7 @@
private volatile boolean closed;
private volatile SelectionKey key;
private SocketAddress address;
+ private ServerSocketChannel serverChannel = null;
private IOException exception = null;
@@ -77,7 +79,7 @@
}
}
- public void completed(final SocketAddress address) {
+ public void completed(final ServerSocketChannel serverChannel) {
if (address == null) {
throw new IllegalArgumentException("Address may not be null");
}
@@ -86,7 +88,8 @@
}
this.completed = true;
synchronized (this) {
- this.address = address;
+ this.address = serverChannel.socket().getLocalSocketAddress();
+ this.serverChannel = serverChannel;
notifyAll();
}
}
@@ -119,6 +122,11 @@
}
this.completed = true;
this.closed = true;
+ if (this.serverChannel != null && this.serverChannel.isOpen()) {
+ try {
+ this.serverChannel.close();
+ } catch (IOException ignore) {}
+ }
if (this.key != null) {
this.key.cancel();
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java?rev=603976&r1=603975&r2=603976&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java Thu Dec 13 10:54:05 2007
@@ -32,11 +32,18 @@
package org.apache.http.nio.reactor;
import java.net.SocketAddress;
+import java.io.IOException;
public interface ListeningIOReactor extends IOReactor {
ListenerEndpoint listen(SocketAddress address);
-
+
+ void pause()
+ throws IOException;
+
+ void resume()
+ throws IOException;
+
ListenerEndpoint[] getEndpoints();
}