You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2006/11/20 20:35:58 UTC
svn commit: r477304 - in
/jakarta/httpcomponents/httpcore/trunk/module-nio/src:
examples/org/apache/http/nio/examples/ main/java/org/apache/http/nio/impl/
main/java/org/apache/http/nio/impl/reactor/
main/java/org/apache/http/nio/reactor/
Author: olegk
Date: Mon Nov 20 11:35:57 2006
New Revision: 477304
URL: http://svn.apache.org/viewvc?view=rev&rev=477304
Log:
Added default implementations of the client side (connecting) and server side (listening) multithreaded I/O reactors
Added:
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java (with props)
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java (with props)
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java
- copied, changed from r476807, jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultListeningIOReactor.java
- copied, changed from r476807, jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java
Removed:
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java
Modified:
jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/DefaultClientIOEventDispatch.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/SessionRequest.java
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/AsyncHttpServer.java Mon Nov 20 11:35:57 2006
@@ -28,7 +28,7 @@
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.NHttpServiceHandler;
import org.apache.http.nio.impl.DefaultServerIOEventDispatch;
-import org.apache.http.nio.impl.reactor.DefaultIOReactor;
+import org.apache.http.nio.impl.reactor.DefaultListeningIOReactor;
import org.apache.http.nio.protocol.AsyncHttpService;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
@@ -62,7 +62,7 @@
.setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
.setParameter(HttpProtocolParams.ORIGIN_SERVER, "Jakarta-HttpComponents-NIO/1.1");
- ListeningIOReactor ioReactor = new DefaultIOReactor(params);
+ ListeningIOReactor ioReactor = new DefaultListeningIOReactor(2, params);
// Set up request handlers
HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry();
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/ElementalEchoServer.java Mon Nov 20 11:35:57 2006
@@ -6,7 +6,7 @@
import java.nio.ByteBuffer;
import org.apache.http.impl.DefaultHttpParams;
-import org.apache.http.nio.impl.reactor.DefaultIOReactor;
+import org.apache.http.nio.impl.reactor.DefaultListeningIOReactor;
import org.apache.http.nio.reactor.EventMask;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOSession;
@@ -18,7 +18,7 @@
public static void main(String[] args) throws Exception {
HttpParams params = new DefaultHttpParams();
IOEventDispatch ioEventDispatch = new DefaultIoEventDispatch();
- ListeningIOReactor ioReactor = new DefaultIOReactor(params);
+ ListeningIOReactor ioReactor = new DefaultListeningIOReactor(2, params);
ioReactor.listen(new InetSocketAddress(8080));
try {
ioReactor.execute(ioEventDispatch);
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpClient.java Mon Nov 20 11:35:57 2006
@@ -24,7 +24,7 @@
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.impl.DefaultClientIOEventDispatch;
-import org.apache.http.nio.impl.reactor.DefaultIOReactor;
+import org.apache.http.nio.impl.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.SessionRequest;
@@ -55,7 +55,7 @@
.setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
.setParameter(HttpProtocolParams.USER_AGENT, "Jakarta-HttpComponents-NIO/1.1");
- ConnectingIOReactor ioReactor = new DefaultIOReactor(params);
+ ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(2, params);
SessionRequest[] reqs = new SessionRequest[3];
reqs[0] = ioReactor.connect(
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/examples/org/apache/http/nio/examples/NHttpServer.java Mon Nov 20 11:35:57 2006
@@ -33,7 +33,7 @@
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.NHttpServiceHandler;
import org.apache.http.nio.impl.DefaultServerIOEventDispatch;
-import org.apache.http.nio.impl.reactor.DefaultIOReactor;
+import org.apache.http.nio.impl.reactor.DefaultListeningIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.HttpConnectionParams;
@@ -65,7 +65,7 @@
.setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
.setParameter(HttpProtocolParams.ORIGIN_SERVER, "Jakarta-HttpComponents-NIO/1.1");
- ListeningIOReactor ioReactor = new DefaultIOReactor(params);
+ ListeningIOReactor ioReactor = new DefaultListeningIOReactor(2, params);
NHttpServiceHandler handler = new MyNHttpServiceHandler(args[0], params);
IOEventDispatch ioEventDispatch = new DefaultServerIOEventDispatch(handler, params);
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/DefaultClientIOEventDispatch.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/DefaultClientIOEventDispatch.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/DefaultClientIOEventDispatch.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/DefaultClientIOEventDispatch.java Mon Nov 20 11:35:57 2006
@@ -4,7 +4,6 @@
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOSession;
-import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.params.HttpParams;
public class DefaultClientIOEventDispatch implements IOEventDispatch {
@@ -33,9 +32,8 @@
this.params);
session.setAttribute(NHTTP_CONN, conn);
- SessionRequest request = (SessionRequest) session.getAttribute(SessionRequest.ATTRIB_KEY);
-
- this.handler.connected(conn, request.getAttachment());
+ Object attachment = session.getAttribute(IOSession.ATTACHMENT_KEY);
+ this.handler.connected(conn, attachment);
}
public void disconnected(final IOSession session) {
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractIOReactor.java Mon Nov 20 11:35:57 2006
@@ -50,6 +50,7 @@
private final Selector selector;
private final SessionSet sessions;
private final SessionQueue closedSessions;
+ private final ChannelQueue newChannels;
private long lastTimeoutCheck;
@@ -60,6 +61,7 @@
this.selector = Selector.open();
this.sessions = new SessionSet();
this.closedSessions = new SessionQueue();
+ this.newChannels = new ChannelQueue();
this.lastTimeoutCheck = System.currentTimeMillis();
}
@@ -73,25 +75,19 @@
protected abstract void timeoutCheck(SelectionKey key, long now);
- protected SelectionKey registerChannel(final SocketChannel channel)
- throws IOException {
- channel.configureBlocking(false);
- return channel.register(this.selector, 0);
- }
+ protected abstract void keyCreated(final SelectionKey key, final IOSession session);
- protected IOSession newSession(final SelectionKey key) {
- IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
-
- public void sessionClosed(IOSession session) {
- closedSessions.push(session);
- }
-
- });
- this.sessions.add(session);
- return session;
+ protected abstract IOSession keyCancelled(final SelectionKey key);
+
+ public void addChannel(final ChannelEntry channelEntry) {
+ if (channelEntry == null) {
+ throw new IllegalArgumentException("Channel entry may not be null");
+ }
+ this.newChannels.push(channelEntry);
+ this.selector.wakeup();
}
- public void execute(final IOEventDispatch eventDispatch) {
+ public void execute(final IOEventDispatch eventDispatch) throws IOException {
if (eventDispatch == null) {
throw new IllegalArgumentException("Event dispatcher may not be null");
}
@@ -100,15 +96,13 @@
try {
for (;;) {
- int readyCount = 0;
- try {
- readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
- } catch (IOException ex) {
- this.closed = true;
- }
+ int readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
if (this.closed) {
break;
}
+
+ processNewChannels();
+
if (readyCount > 0) {
processEvents(this.selector.selectedKeys());
}
@@ -155,13 +149,11 @@
writable(key);
}
} catch (CancelledKeyException ex) {
- Object attachment = key.attachment();
- if (attachment instanceof SessionHandle) {
- SessionHandle handle = (SessionHandle) attachment;
- IOSession session = handle.getSession();
+ IOSession session = keyCancelled(key);
+ if (session != null) {
this.closedSessions.push(session);
- key.attach(null);
}
+ key.attach(null);
}
}
@@ -170,6 +162,29 @@
for (Iterator it = keys.iterator(); it.hasNext();) {
SelectionKey key = (SelectionKey) it.next();
timeoutCheck(key, now);
+ }
+ }
+
+ private void processNewChannels() throws IOException {
+ ChannelEntry entry;
+ while ((entry = this.newChannels.pop()) != null) {
+
+ SocketChannel channel = entry.getChannel();
+ channel.configureBlocking(false);
+ SelectionKey key = channel.register(this.selector, 0);
+
+ IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
+
+ public void sessionClosed(IOSession session) {
+ closedSessions.push(session);
+ }
+
+ });
+ session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
+ session.setSocketTimeout(channel.socket().getSoTimeout());
+ this.sessions.add(session);
+ keyCreated(key, session);
+ this.eventDispatch.connected(session);
}
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/AbstractMultiworkerIOReactor.java Mon Nov 20 11:35:57 2006
@@ -31,7 +31,6 @@
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.nio.channels.SocketChannel;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactor;
@@ -40,7 +39,7 @@
private final int workerCount;
private final BaseIOReactor[] ioReactors;
- private final Thread[] threads;
+ private final WorkerThread[] threads;
private int currentWorker = 0;
@@ -51,16 +50,13 @@
}
this.workerCount = workerCount;
this.ioReactors = new BaseIOReactor[workerCount];
- this.threads = new Thread[workerCount];
+ this.threads = new WorkerThread[workerCount];
for (int i = 0; i < this.ioReactors.length; i++) {
this.ioReactors[i] = new BaseIOReactor();
}
}
protected void startWorkers(final IOEventDispatch eventDispatch) {
- if (eventDispatch == null) {
- throw new IllegalArgumentException("Event dispatcher may not be null");
- }
for (int i = 0; i < this.workerCount; i++) {
BaseIOReactor ioReactor = this.ioReactors[i];
this.threads[i] = new WorkerThread(ioReactor, eventDispatch);
@@ -83,9 +79,21 @@
}
}
- protected void addChannel(final SocketChannel channel) throws IOException {
+ protected void verifyWorkers() throws IOException {
+ for (int i = 0; i < this.workerCount; i++) {
+ WorkerThread worker = this.threads[i];
+ if (!worker.isAlive()) {
+ IOException ex = worker.getException();
+ if (ex != null) {
+ throw ex;
+ }
+ }
+ }
+ }
+
+ protected void addChannel(final ChannelEntry entry) throws IOException {
// Distribute new channels among the workers
- this.ioReactors[this.currentWorker++ % this.workerCount].addChannel(channel);
+ this.ioReactors[this.currentWorker++ % this.workerCount].addChannel(entry);
}
static class WorkerThread extends Thread {
@@ -93,6 +101,8 @@
final BaseIOReactor ioReactor;
final IOEventDispatch eventDispatch;
+ private volatile IOException exception;
+
public WorkerThread(final BaseIOReactor ioReactor, final IOEventDispatch eventDispatch) {
super();
this.ioReactor = ioReactor;
@@ -102,12 +112,19 @@
public void run() {
try {
this.ioReactor.execute(this.eventDispatch);
+ } catch (IOException ex) {
+ this.exception = ex;
} finally {
try {
this.ioReactor.shutdown();
- } catch (IOException ignore) {
+ } catch (IOException ex2) {
+ this.exception = ex2;
}
}
+ }
+
+ public IOException getException() {
+ return this.exception;
}
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/BaseIOReactor.java Mon Nov 20 11:35:57 2006
@@ -31,7 +31,6 @@
import java.io.IOException;
import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import org.apache.http.nio.reactor.IOSession;
@@ -77,14 +76,19 @@
}
}
- protected void addChannel(final SocketChannel channel) throws IOException {
- SelectionKey key = registerChannel(channel);
-
- IOSession session = newSession(key);
+ protected void keyCreated(final SelectionKey key, final IOSession session) {
SessionHandle handle = new SessionHandle(session);
key.attach(handle);
-
- this.eventDispatch.connected(session);
+ }
+
+ protected IOSession keyCancelled(final SelectionKey key) {
+ Object attachment = key.attachment();
+ if (attachment instanceof SessionHandle) {
+ SessionHandle handle = (SessionHandle) attachment;
+ return handle.getSession();
+ } else {
+ return null;
+ }
}
}
Added: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java?view=auto&rev=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java (added)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java Mon Nov 20 11:35:57 2006
@@ -0,0 +1,60 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ * Copyright 1999-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.impl.reactor;
+
+import java.nio.channels.SocketChannel;
+
+public class ChannelEntry {
+
+ private final SocketChannel channel;
+ private final Object attachment;
+
+ public ChannelEntry(final SocketChannel channel, final Object attachment) {
+ super();
+ if (channel == null) {
+ throw new IllegalArgumentException("Socket channel may not be null");
+ }
+ this.channel = channel;
+ this.attachment = attachment;
+ }
+
+ public ChannelEntry(final SocketChannel channel) {
+ this(channel, null);
+ }
+
+ public Object getAttachment() {
+ return this.attachment;
+ }
+
+ public SocketChannel getChannel() {
+ return this.channel;
+ }
+
+}
Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelEntry.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java?view=auto&rev=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java (added)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java Mon Nov 20 11:35:57 2006
@@ -0,0 +1,58 @@
+/*
+ * $HeadURL$
+ * $Revision$
+ * $Date$
+ *
+ * ====================================================================
+ *
+ * Copyright 1999-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.impl.reactor;
+
+import java.util.LinkedList;
+
+public class ChannelQueue {
+
+ private final LinkedList list;
+
+ public ChannelQueue() {
+ super();
+ this.list = new LinkedList();
+ }
+
+ public synchronized void push(final ChannelEntry entry) {
+ if (entry == null) {
+ return;
+ }
+ this.list.addLast(entry);
+ }
+
+ public synchronized ChannelEntry pop() {
+ if (!this.list.isEmpty()) {
+ return (ChannelEntry) this.list.removeFirst();
+ } else {
+ return null;
+ }
+ }
+
+}
Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/ChannelQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java (from r476807, jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java)
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java?view=diff&rev=477304&p1=jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java&r1=476807&p2=jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultConnectingIOReactor.java Mon Nov 20 11:35:57 2006
@@ -35,20 +35,18 @@
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.IOSession;
-import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
-public class DefaultIOReactor implements ListeningIOReactor, ConnectingIOReactor {
+public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
+ implements ConnectingIOReactor {
public static int TIMEOUT_CHECK_INTERVAL = 1000;
@@ -56,22 +54,17 @@
private final HttpParams params;
private final Selector selector;
- private final SessionSet sessions;
- private final SessionQueue closedSessions;
private long lastTimeoutCheck;
- private IOEventDispatch eventDispatch = null;
-
- public DefaultIOReactor(final HttpParams params) throws IOException {
- super();
+ public DefaultConnectingIOReactor(int workerCount, final HttpParams params)
+ throws IOException {
+ super(workerCount);
if (params == null) {
throw new IllegalArgumentException("HTTP parameters may not be null");
}
this.params = params;
this.selector = Selector.open();
- this.sessions = new SessionSet();
- this.closedSessions = new SessionQueue();
this.lastTimeoutCheck = System.currentTimeMillis();
}
@@ -79,32 +72,25 @@
if (eventDispatch == null) {
throw new IllegalArgumentException("Event dispatcher may not be null");
}
- this.eventDispatch = eventDispatch;
-
- try {
- for (;;) {
- int readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
- if (this.closed) {
- break;
- }
- if (readyCount > 0) {
- processEvents(this.selector.selectedKeys());
- }
-
- long currentTime = System.currentTimeMillis();
- if( (currentTime - this.lastTimeoutCheck) >= TIMEOUT_CHECK_INTERVAL) {
- this.lastTimeoutCheck = currentTime;
- Set keys = this.selector.keys();
- if (keys != null) {
- processTimeouts(keys);
- }
+ startWorkers(eventDispatch);
+ for (;;) {
+ int readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
+ if (this.closed) {
+ break;
+ }
+ if (readyCount > 0) {
+ processEvents(this.selector.selectedKeys());
+ }
+
+ long currentTime = System.currentTimeMillis();
+ if( (currentTime - this.lastTimeoutCheck) >= TIMEOUT_CHECK_INTERVAL) {
+ this.lastTimeoutCheck = currentTime;
+ Set keys = this.selector.keys();
+ if (keys != null) {
+ processTimeouts(keys);
}
-
- processClosedSessions();
-
}
- } finally {
- closeSessions();
+ verifyWorkers();
}
}
@@ -121,104 +107,34 @@
private void processEvent(final SelectionKey key) throws IOException {
try {
- if (key.isAcceptable()) {
-
- ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
- SocketChannel socketChannel = serverChannel.accept();
- if (socketChannel != null) {
- // Configure new socket
- onNewSocket(socketChannel.socket());
- // Register new channel with the selector
- socketChannel.configureBlocking(false);
- SelectionKey newkey = socketChannel.register(this.selector, 0);
- // Set up new session
- IOSession session = newSession(newkey);
-
- // Attach session handle to the selection key
- SessionHandle handle = new SessionHandle(session);
- newkey.attach(handle);
-
- this.eventDispatch.connected(session);
- }
- }
-
if (key.isConnectable()) {
- SocketChannel socketChannel = (SocketChannel) key.channel();
+ SocketChannel channel = (SocketChannel) key.channel();
// Get request handle
SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
// Finish connection process
try {
- socketChannel.finishConnect();
+ channel.finishConnect();
} catch (IOException ex) {
sessionRequest.failed(ex);
- key.cancel();
- return;
}
-
- // Configure new socket
- onNewSocket(socketChannel.socket());
- // Set up new session
- IOSession session = newSession(key);
-
- // Attach session handle to the selection key
- SessionHandle handle = new SessionHandle(session);
- key.attach(handle);
-
- // Add the session request to the session context
- session.setAttribute(SessionRequest.ATTRIB_KEY, sessionRequest);
-
- // Fire the request completion notification first
- sessionRequest.completed(session);
-
- // Followed by session connected notification
- this.eventDispatch.connected(session);
-
- }
-
- if (key.isReadable()) {
- SessionHandle handle = (SessionHandle) key.attachment();
- IOSession session = handle.getSession();
- handle.resetLastRead();
-
- this.eventDispatch.inputReady(session);
- }
-
- if (key.isWritable()) {
- SessionHandle handle = (SessionHandle) key.attachment();
- IOSession session = handle.getSession();
- handle.resetLastWrite();
-
- this.eventDispatch.outputReady(session);
+ key.cancel();
+ if (channel.isConnected()) {
+ prepareSocket(channel.socket());
+ Object attachment = sessionRequest.getAttachment();
+ ChannelEntry entry = new ChannelEntry(channel, attachment);
+ addChannel(entry);
+ }
}
-
+
} catch (CancelledKeyException ex) {
- Object attachment = key.attachment();
- if (attachment instanceof SessionHandle) {
- SessionHandle handle = (SessionHandle) attachment;
- IOSession session = handle.getSession();
- this.closedSessions.push(session);
- key.attach(null);
- }
+ key.attach(null);
}
}
- private IOSession newSession(final SelectionKey key) throws IOException {
- IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
-
- public void sessionClosed(IOSession session) {
- closedSessions.push(session);
- }
-
- });
- session.setSocketTimeout(HttpConnectionParams.getSoTimeout(this.params));
- this.sessions.add(session);
- return session;
- }
-
- protected void onNewSocket(final Socket socket) throws IOException {
+ protected void prepareSocket(final Socket socket) throws IOException {
socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
int linger = HttpConnectionParams.getLinger(this.params);
@@ -226,24 +142,13 @@
socket.setSoLinger(linger > 0, linger);
}
}
-
+
private void processTimeouts(final Set keys) {
long now = System.currentTimeMillis();
for (Iterator it = keys.iterator(); it.hasNext();) {
SelectionKey key = (SelectionKey) it.next();
Object attachment = key.attachment();
- if (attachment instanceof SessionHandle) {
- SessionHandle handle = (SessionHandle) key.attachment();
- IOSession session = handle.getSession();
- int timeout = session.getSocketTimeout();
- if (timeout > 0) {
- if (handle.getLastReadTime() + timeout < now) {
- this.eventDispatch.timeout(session);
- }
- }
- }
-
if (attachment instanceof SessionRequestHandle) {
SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
SessionRequestImpl sessionRequest = handle.getSessionRequest();
@@ -258,38 +163,6 @@
}
}
- private void processClosedSessions() {
- IOSession session;
- while ((session = this.closedSessions.pop()) != null) {
- if (this.sessions.remove(session)) {
- this.eventDispatch.disconnected(session);
- }
- }
- }
-
- private void closeSessions() {
- synchronized (this.sessions) {
- for (Iterator it = this.sessions.iterator(); it.hasNext(); ) {
- IOSession session = (IOSession) it.next();
- if (!session.isClosed()) {
-
- session.close();
- this.eventDispatch.disconnected(session);
- }
- }
- this.sessions.clear();
- }
- }
-
- public void listen(
- final SocketAddress address) throws IOException {
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- serverChannel.configureBlocking(false);
- serverChannel.socket().bind(address);
- SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
- key.attach(null);
- }
-
public SessionRequest connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress,
@@ -318,6 +191,8 @@
this.closed = true;
// Stop dispatching I/O events
this.selector.close();
+ // Stop the workers
+ stopWorkers(500);
}
}
Copied: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultListeningIOReactor.java (from r476807, jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java)
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultListeningIOReactor.java?view=diff&rev=477304&p1=jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java&r1=476807&p2=jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultListeningIOReactor.java&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultIOReactor.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/impl/reactor/DefaultListeningIOReactor.java Mon Nov 20 11:35:57 2006
@@ -40,15 +40,13 @@
import java.util.Iterator;
import java.util.Set;
-import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
-import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.ListeningIOReactor;
-import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
-public class DefaultIOReactor implements ListeningIOReactor, ConnectingIOReactor {
+public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
+ implements ListeningIOReactor {
public static int TIMEOUT_CHECK_INTERVAL = 1000;
@@ -56,55 +54,32 @@
private final HttpParams params;
private final Selector selector;
- private final SessionSet sessions;
- private final SessionQueue closedSessions;
- private long lastTimeoutCheck;
-
- private IOEventDispatch eventDispatch = null;
-
- public DefaultIOReactor(final HttpParams params) throws IOException {
- super();
+ public DefaultListeningIOReactor(int workerCount, final HttpParams params)
+ throws IOException {
+ super(workerCount);
if (params == null) {
throw new IllegalArgumentException("HTTP parameters may not be null");
}
this.params = params;
this.selector = Selector.open();
- this.sessions = new SessionSet();
- this.closedSessions = new SessionQueue();
- this.lastTimeoutCheck = System.currentTimeMillis();
}
- public void execute(final IOEventDispatch eventDispatch) throws IOException {
+ public void execute(final IOEventDispatch eventDispatch)
+ throws IOException {
if (eventDispatch == null) {
throw new IllegalArgumentException("Event dispatcher may not be null");
}
- this.eventDispatch = eventDispatch;
-
- try {
- for (;;) {
- int readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
- if (this.closed) {
- break;
- }
- if (readyCount > 0) {
- processEvents(this.selector.selectedKeys());
- }
-
- long currentTime = System.currentTimeMillis();
- if( (currentTime - this.lastTimeoutCheck) >= TIMEOUT_CHECK_INTERVAL) {
- this.lastTimeoutCheck = currentTime;
- Set keys = this.selector.keys();
- if (keys != null) {
- processTimeouts(keys);
- }
- }
-
- processClosedSessions();
-
+ startWorkers(eventDispatch);
+ for (;;) {
+ int readyCount = this.selector.select(TIMEOUT_CHECK_INTERVAL);
+ if (this.closed) {
+ break;
+ }
+ if (readyCount > 0) {
+ processEvents(this.selector.selectedKeys());
}
- } finally {
- closeSessions();
+ verifyWorkers();
}
}
@@ -126,99 +101,18 @@
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel.accept();
if (socketChannel != null) {
- // Configure new socket
- onNewSocket(socketChannel.socket());
- // Register new channel with the selector
- socketChannel.configureBlocking(false);
- SelectionKey newkey = socketChannel.register(this.selector, 0);
- // Set up new session
- IOSession session = newSession(newkey);
-
- // Attach session handle to the selection key
- SessionHandle handle = new SessionHandle(session);
- newkey.attach(handle);
-
- this.eventDispatch.connected(session);
- }
- }
-
- if (key.isConnectable()) {
-
- SocketChannel socketChannel = (SocketChannel) key.channel();
- // Get request handle
- SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
- SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
-
- // Finish connection process
- try {
- socketChannel.finishConnect();
- } catch (IOException ex) {
- sessionRequest.failed(ex);
- key.cancel();
- return;
+ prepareSocket(socketChannel.socket());
+ ChannelEntry entry = new ChannelEntry(socketChannel);
+ addChannel(entry);
}
-
- // Configure new socket
- onNewSocket(socketChannel.socket());
- // Set up new session
- IOSession session = newSession(key);
-
- // Attach session handle to the selection key
- SessionHandle handle = new SessionHandle(session);
- key.attach(handle);
-
- // Add the session request to the session context
- session.setAttribute(SessionRequest.ATTRIB_KEY, sessionRequest);
-
- // Fire the request completion notification first
- sessionRequest.completed(session);
-
- // Followed by session connected notification
- this.eventDispatch.connected(session);
-
- }
-
- if (key.isReadable()) {
- SessionHandle handle = (SessionHandle) key.attachment();
- IOSession session = handle.getSession();
- handle.resetLastRead();
-
- this.eventDispatch.inputReady(session);
- }
-
- if (key.isWritable()) {
- SessionHandle handle = (SessionHandle) key.attachment();
- IOSession session = handle.getSession();
- handle.resetLastWrite();
-
- this.eventDispatch.outputReady(session);
}
} catch (CancelledKeyException ex) {
- Object attachment = key.attachment();
- if (attachment instanceof SessionHandle) {
- SessionHandle handle = (SessionHandle) attachment;
- IOSession session = handle.getSession();
- this.closedSessions.push(session);
- key.attach(null);
- }
+ key.attach(null);
}
}
- private IOSession newSession(final SelectionKey key) throws IOException {
- IOSession session = new IOSessionImpl(key, new SessionClosedCallback() {
-
- public void sessionClosed(IOSession session) {
- closedSessions.push(session);
- }
-
- });
- session.setSocketTimeout(HttpConnectionParams.getSoTimeout(this.params));
- this.sessions.add(session);
- return session;
- }
-
- protected void onNewSocket(final Socket socket) throws IOException {
+ protected void prepareSocket(final Socket socket) throws IOException {
socket.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(this.params));
socket.setSoTimeout(HttpConnectionParams.getSoTimeout(this.params));
int linger = HttpConnectionParams.getLinger(this.params);
@@ -227,60 +121,6 @@
}
}
- private void processTimeouts(final Set keys) {
- long now = System.currentTimeMillis();
- for (Iterator it = keys.iterator(); it.hasNext();) {
- SelectionKey key = (SelectionKey) it.next();
- Object attachment = key.attachment();
-
- if (attachment instanceof SessionHandle) {
- SessionHandle handle = (SessionHandle) key.attachment();
- IOSession session = handle.getSession();
- int timeout = session.getSocketTimeout();
- if (timeout > 0) {
- if (handle.getLastReadTime() + timeout < now) {
- this.eventDispatch.timeout(session);
- }
- }
- }
-
- if (attachment instanceof SessionRequestHandle) {
- SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
- SessionRequestImpl sessionRequest = handle.getSessionRequest();
- int timeout = sessionRequest.getConnectTimeout();
- if (timeout > 0) {
- if (handle.getRequestTime() + timeout < now) {
- sessionRequest.timeout();
- }
- }
- }
-
- }
- }
-
- private void processClosedSessions() {
- IOSession session;
- while ((session = this.closedSessions.pop()) != null) {
- if (this.sessions.remove(session)) {
- this.eventDispatch.disconnected(session);
- }
- }
- }
-
- private void closeSessions() {
- synchronized (this.sessions) {
- for (Iterator it = this.sessions.iterator(); it.hasNext(); ) {
- IOSession session = (IOSession) it.next();
- if (!session.isClosed()) {
-
- session.close();
- this.eventDispatch.disconnected(session);
- }
- }
- this.sessions.clear();
- }
- }
-
public void listen(
final SocketAddress address) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -290,27 +130,6 @@
key.attach(null);
}
- public SessionRequest connect(
- final SocketAddress remoteAddress,
- final SocketAddress localAddress,
- final Object attachment) throws IOException {
- SocketChannel socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
- if (localAddress != null) {
- socketChannel.socket().bind(localAddress);
- }
- socketChannel.connect(remoteAddress);
- SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
-
- SessionRequestImpl sessionRequest = new SessionRequestImpl(
- remoteAddress, localAddress, attachment, key);
- sessionRequest.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(this.params));
-
- SessionRequestHandle requestHandle = new SessionRequestHandle(sessionRequest);
- key.attach(requestHandle);
- return sessionRequest;
- }
-
public void shutdown() throws IOException {
if (this.closed) {
return;
@@ -318,6 +137,8 @@
this.closed = true;
// Stop dispatching I/O events
this.selector.close();
+ // Stop the workers
+ stopWorkers(500);
}
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOSession.java Mon Nov 20 11:35:57 2006
@@ -34,6 +34,8 @@
public interface IOSession {
+ public static final String ATTACHMENT_KEY = "http.session.attachment";
+
ByteChannel channel();
SocketAddress getRemoteAddress();
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/SessionRequest.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/SessionRequest.java?view=diff&rev=477304&r1=477303&r2=477304
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/SessionRequest.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/SessionRequest.java Mon Nov 20 11:35:57 2006
@@ -34,8 +34,6 @@
public interface SessionRequest {
- public static final String ATTRIB_KEY = "http.connection-request";
-
SocketAddress getRemoteAddress();
SocketAddress getLocalAddress();