You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by as...@apache.org on 2007/12/13 19:54:06 UTC

svn commit: r603976 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http: impl/nio/reactor/DefaultListeningIOReactor.java impl/nio/reactor/ListenerEndpointImpl.java nio/reactor/ListeningIOReactor.java

Author: asankha
Date: Thu Dec 13 10:54:05 2007
New Revision: 603976

URL: http://svn.apache.org/viewvc?rev=603976&view=rev
Log:
implement HTTPCORE-127 that allows a ListeningIOReactor to be paused and resumed. When paused, the reactor stops accepting new connections - but continues to serve existing connections until completed. This allows a system to appear offline to an external load balancer before being brought down for maintenence

Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=603976&r1=603975&r2=603976&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java Thu Dec 13 10:54:05 2007
@@ -55,6 +55,7 @@
         implements ListeningIOReactor {
 
     private final Queue<ListenerEndpointImpl> requestQueue;
+    private final Queue<ListenerEndpointImpl> pausedQueue;
     
     public DefaultListeningIOReactor(
             int workerCount, 
@@ -62,6 +63,7 @@
             final HttpParams params) throws IOReactorException {
         super(workerCount, threadFactory, params);
         this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
+        this.pausedQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
     }
 
     public DefaultListeningIOReactor(
@@ -159,7 +161,7 @@
                 throw new IOReactorException("Failure registering channel " +
                         "with the selector", ex);
             }
-            request.completed(serverChannel.socket().getLocalSocketAddress());
+            request.completed(serverChannel);
         }
     }
     
@@ -179,5 +181,30 @@
         }
         return list.toArray(new ListenerEndpoint[list.size()]);
     }
-    
+
+    public void pause() throws IOException {
+        if (this.selector.isOpen()) {
+            Set<SelectionKey> keys = this.selector.keys();
+            for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
+                SelectionKey key = it.next();
+                if (key.isValid()) {
+                    ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
+                    if (endpoint != null) {
+                        endpoint.close();
+                        this.pausedQueue.add(endpoint);
+                    }
+                }
+            }
+        }
+    }
+
+    public void resume() throws IOException {
+        ListenerEndpointImpl endpoint;
+        while ((endpoint = this.pausedQueue.poll()) != null) {
+            ListenerEndpointImpl request = new ListenerEndpointImpl(endpoint.getAddress());
+            this.requestQueue.add(request);
+        }
+        this.pausedQueue.clear();
+        this.selector.wakeup();
+    }
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java?rev=603976&r1=603975&r2=603976&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java Thu Dec 13 10:54:05 2007
@@ -34,6 +34,7 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
 
 import org.apache.http.nio.reactor.ListenerEndpoint;
 
@@ -43,6 +44,7 @@
     private volatile boolean closed;
     private volatile SelectionKey key;
     private SocketAddress address;
+    private ServerSocketChannel serverChannel = null;
 
     private IOException exception = null;
     
@@ -77,7 +79,7 @@
         }
     }
     
-    public void completed(final SocketAddress address) {
+    public void completed(final ServerSocketChannel serverChannel) {
         if (address == null) {
             throw new IllegalArgumentException("Address may not be null");
         }
@@ -86,7 +88,8 @@
         }
         this.completed = true;
         synchronized (this) {
-            this.address = address;
+            this.address = serverChannel.socket().getLocalSocketAddress();
+            this.serverChannel = serverChannel;
             notifyAll();
         }
     }
@@ -119,6 +122,11 @@
         }
         this.completed = true;
         this.closed = true;
+        if (this.serverChannel != null && this.serverChannel.isOpen()) {
+            try {
+                this.serverChannel.close();
+            } catch (IOException ignore) {}
+        }
         if (this.key != null) {
             this.key.cancel();
         }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java?rev=603976&r1=603975&r2=603976&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java Thu Dec 13 10:54:05 2007
@@ -32,11 +32,18 @@
 package org.apache.http.nio.reactor;
 
 import java.net.SocketAddress;
+import java.io.IOException;
 
 public interface ListeningIOReactor extends IOReactor {
 
     ListenerEndpoint listen(SocketAddress address);
-    
+
+    void pause()
+        throws IOException;
+
+    void resume()
+        throws IOException;
+
     ListenerEndpoint[] getEndpoints();
     
 }