You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/07/16 11:27:18 UTC
svn commit: r1752927 [1/3] - in /httpcomponents/httpcore/trunk/httpcore5/src:
examples/org/apache/hc/core5/http/examples/
main/java/org/apache/hc/core5/http/bootstrap/nio/
main/java/org/apache/hc/core5/http/impl/nio/
main/java/org/apache/hc/core5/http/...
Author: olegk
Date: Sat Jul 16 11:27:17 2016
New Revision: 1752927
URL: http://svn.apache.org/viewvc?rev=1752927&view=rev
Log:
Redesigned and optimized non-blocking I/O session initialization and termination code; reduced public API footprint
Added:
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandler.java
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java (contents, props changed)
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandler.java
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java (contents, props changed)
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java (contents, props changed)
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandler.java (contents, props changed)
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventDispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventHandlerFactory.java (contents, props changed)
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionClosedCallback.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestAsyncHttp.java (contents, props changed)
- copied, changed from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlers.java
Removed:
httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/ElementalEchoServer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/nio/BasicNIOConnFactory.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIODispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOEventDispatch.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionClosedCallback.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlers.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/pool/io/TestBasicConnPool.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/pool/nio/TestBasicNIOConnPool.java
Modified:
httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpClient.java
httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpReverseProxy.java
httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/PipeliningHttpClient.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/bootstrap/nio/HttpServer.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/nio/BasicNIOConnPool.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/SocketAddressResolver.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/BaseIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorConfig.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSession.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestClientOutOfSequenceResponse.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestCustomSSL.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersBrokenExpectContinue.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncHandlersPipelining.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestHttpAsyncTimeout.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestServerSidePipelining.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpClientNio.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpCoreNIOTestBase.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/HttpServerNio.java
httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/testserver/nio/LoggingIOSession.java
Modified: httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpClient.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpClient.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpClient.java Sat Jul 16 11:27:17 2016
@@ -36,7 +36,7 @@ import org.apache.hc.core5.http.HttpResp
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.impl.nio.BasicAsyncRequestProducer;
import org.apache.hc.core5.http.impl.nio.BasicAsyncResponseConsumer;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIOEventHandlerFactory;
import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
import org.apache.hc.core5.http.impl.nio.HttpAsyncRequester;
import org.apache.hc.core5.http.message.BasicHttpRequest;
@@ -51,7 +51,8 @@ import org.apache.hc.core5.http.protocol
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.reactor.ConnectingIOReactor;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
-import org.apache.hc.core5.reactor.IOEventDispatch;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* Minimal asynchronous HTTP/1.1 client.
@@ -74,13 +75,16 @@ public class NHttpClient {
.add(new RequestExpectContinue()).build();
// Create client-side HTTP protocol handler
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
- // Create client-side I/O event dispatch
- final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler,
+ // Create client-side I/O event handler factory
+ final IOEventHandlerFactory eventHandlerFactory = new DefaultHttpClientIOEventHandlerFactory(
+ protocolHandler,
ConnectionConfig.DEFAULT);
// Create client-side I/O reactor
- final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
+ final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(
+ eventHandlerFactory,
+ IOReactorConfig.DEFAULT);
// Create HTTP connection pool
- BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT);
+ BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor);
// Limit total number of connections to just two
pool.setDefaultMaxPerRoute(2);
pool.setMaxTotal(2);
@@ -91,7 +95,7 @@ public class NHttpClient {
public void run() {
try {
// Ready to go!
- ioReactor.execute(ioEventDispatch);
+ ioReactor.execute();
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
Modified: httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpReverseProxy.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpReverseProxy.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpReverseProxy.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/NHttpReverseProxy.java Sat Jul 16 11:27:17 2016
@@ -37,15 +37,14 @@ import org.apache.hc.core5.http.Connecti
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.entity.ContentType;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.impl.nio.BasicAsyncResponseProducer;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIODispatch;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIODispatch;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIOEventHandlerFactory;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIOEventHandlerFactory;
import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
import org.apache.hc.core5.http.impl.nio.HttpAsyncRequester;
import org.apache.hc.core5.http.impl.nio.HttpAsyncService;
@@ -82,11 +81,9 @@ import org.apache.hc.core5.http.protocol
import org.apache.hc.core5.http.protocol.ResponseDate;
import org.apache.hc.core5.http.protocol.ResponseServer;
import org.apache.hc.core5.pool.PoolStats;
-import org.apache.hc.core5.pool.nio.NIOConnFactory;
import org.apache.hc.core5.reactor.ConnectingIOReactor;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
-import org.apache.hc.core5.reactor.IOEventDispatch;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListeningIOReactor;
@@ -119,54 +116,49 @@ public class NHttpReverseProxy {
.setSoTimeout(3000)
.setConnectTimeout(3000)
.build();
- final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config);
- final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config);
-
- // Set up HTTP protocol processor for incoming connections
- HttpProcessor inhttpproc = new ImmutableHttpProcessor(
- new ResponseDate(),
- new ResponseServer("Test/1.1"),
- new ResponseContent(),
- new ResponseConnControl());
// Set up HTTP protocol processor for outgoing connections
HttpProcessor outhttpproc;
outhttpproc = new ImmutableHttpProcessor(
- new HttpRequestInterceptor[] {
- new RequestContent(),
- new RequestTargetHost(),
- new RequestConnControl(),
- new RequestUserAgent("Test/1.1"),
- new RequestExpectContinue()
- });
-
- ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler();
+ new RequestContent(),
+ new RequestTargetHost(),
+ new RequestConnControl(),
+ new RequestUserAgent("Test/1.1"),
+ new RequestExpectContinue());
+
+ ProxyClientProtocolHandler clientProtocolHandler = new ProxyClientProtocolHandler();
+ final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(
+ new DefaultHttpClientIOEventHandlerFactory(clientProtocolHandler, ConnectionConfig.DEFAULT));
HttpAsyncRequester executor = new HttpAsyncRequester(outhttpproc);
- ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, ConnectionConfig.DEFAULT);
+ ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, 0);
connPool.setMaxTotal(100);
connPool.setDefaultMaxPerRoute(20);
+
+ // Set up HTTP protocol processor for incoming connections
+ HttpProcessor inhttpproc = new ImmutableHttpProcessor(
+ new ResponseDate(),
+ new ResponseServer("Test/1.1"),
+ new ResponseContent(),
+ new ResponseConnControl());
+
UriHttpAsyncRequestHandlerMapper handlerRegistry = new UriHttpAsyncRequestHandlerMapper();
handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool));
- ProxyServiceHandler serviceHandler = new ProxyServiceHandler(
+ ProxyServiceHandler serverProtocolHandler = new ProxyServiceHandler(
inhttpproc,
new ProxyIncomingConnectionReuseStrategy(),
handlerRegistry);
-
- final IOEventDispatch connectingEventDispatch = new DefaultHttpClientIODispatch(
- clientHandler, ConnectionConfig.DEFAULT);
-
- final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch(
- serviceHandler, ConnectionConfig.DEFAULT);
+ final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(
+ new DefaultHttpServerIOEventHandlerFactory(serverProtocolHandler, ConnectionConfig.DEFAULT));
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
- connectingIOReactor.execute(connectingEventDispatch);
+ connectingIOReactor.execute();
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException ex) {
@@ -184,7 +176,7 @@ public class NHttpReverseProxy {
t.start();
try {
listeningIOReactor.listen(new InetSocketAddress(port));
- listeningIOReactor.execute(listeningEventDispatch);
+ listeningIOReactor.execute();
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException ex) {
@@ -863,15 +855,8 @@ public class NHttpReverseProxy {
public ProxyConnPool(
final ConnectingIOReactor ioreactor,
- final ConnectionConfig config) {
- super(ioreactor, config);
- }
-
- public ProxyConnPool(
- final ConnectingIOReactor ioreactor,
- final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
final int connectTimeout) {
- super(ioreactor, connFactory, connectTimeout);
+ super(ioreactor, connectTimeout);
}
@Override
Modified: httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/PipeliningHttpClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/PipeliningHttpClient.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/PipeliningHttpClient.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/PipeliningHttpClient.java Sat Jul 16 11:27:17 2016
@@ -38,7 +38,7 @@ import org.apache.hc.core5.http.HttpResp
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.impl.nio.BasicAsyncRequestProducer;
import org.apache.hc.core5.http.impl.nio.BasicAsyncResponseConsumer;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpClientIOEventHandlerFactory;
import org.apache.hc.core5.http.impl.nio.HttpAsyncRequestExecutor;
import org.apache.hc.core5.http.impl.nio.HttpAsyncRequester;
import org.apache.hc.core5.http.message.BasicHttpRequest;
@@ -53,7 +53,8 @@ import org.apache.hc.core5.http.protocol
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.reactor.ConnectingIOReactor;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
-import org.apache.hc.core5.reactor.IOEventDispatch;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
+import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* Minimal pipelining HTTP/1.1 client.
@@ -77,13 +78,16 @@ public class PipeliningHttpClient {
.add(new RequestExpectContinue()).build();
// Create client-side HTTP protocol handler
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
- // Create client-side I/O event dispatch
- final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler,
+ // Create client-side I/O event handler factory
+ IOEventHandlerFactory eventHandlerFactory = new DefaultHttpClientIOEventHandlerFactory(
+ new HttpAsyncRequestExecutor(),
ConnectionConfig.DEFAULT);
// Create client-side I/O reactor
- final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
+ final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(
+ eventHandlerFactory,
+ IOReactorConfig.DEFAULT);
// Create HTTP connection pool
- BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT);
+ BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, 0);
// Limit total number of connections to just two
pool.setDefaultMaxPerRoute(2);
pool.setMaxTotal(2);
@@ -94,7 +98,7 @@ public class PipeliningHttpClient {
public void run() {
try {
// Ready to go!
- ioReactor.execute(ioEventDispatch);
+ ioReactor.execute();
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/bootstrap/nio/HttpServer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/bootstrap/nio/HttpServer.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/bootstrap/nio/HttpServer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/bootstrap/nio/HttpServer.java Sat Jul 16 11:27:17 2016
@@ -35,12 +35,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.http.ExceptionLogger;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIODispatch;
+import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIOEventHandlerFactory;
import org.apache.hc.core5.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.hc.core5.http.nio.NHttpConnectionFactory;
import org.apache.hc.core5.http.nio.NHttpServerEventHandler;
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
-import org.apache.hc.core5.reactor.IOEventDispatch;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
import org.apache.hc.core5.reactor.IOReactorExceptionHandler;
@@ -57,7 +56,6 @@ public class HttpServer {
private final InetAddress ifAddress;
private final IOReactorConfig ioReactorConfig;
private final NHttpServerEventHandler serverEventHandler;
- private final NHttpConnectionFactory<? extends DefaultNHttpServerConnection> connectionFactory;
private final ExceptionLogger exceptionLogger;
private final ExecutorService listenerExecutorService;
private final ThreadGroup dispatchThreads;
@@ -77,13 +75,13 @@ public class HttpServer {
this.ifAddress = ifAddress;
this.ioReactorConfig = ioReactorConfig;
this.serverEventHandler = serverEventHandler;
- this.connectionFactory = connectionFactory;
this.exceptionLogger = exceptionLogger;
this.listenerExecutorService = Executors.newSingleThreadExecutor(
new ThreadFactoryImpl("HTTP-listener-" + this.port));
this.dispatchThreads = new ThreadGroup("I/O-dispatchers");
try {
this.ioReactor = new DefaultListeningIOReactor(
+ new DefaultHttpServerIOEventHandlerFactory(this.serverEventHandler, connectionFactory),
this.ioReactorConfig,
new ThreadFactoryImpl("I/O-dispatch", this.dispatchThreads));
} catch (final IOReactorException ex) {
@@ -112,14 +110,12 @@ public class HttpServer {
public void start() {
if (this.status.compareAndSet(Status.READY, Status.ACTIVE)) {
this.endpoint = this.ioReactor.listen(new InetSocketAddress(this.ifAddress, this.port > 0 ? this.port : 0));
- final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(
- this.serverEventHandler, this.connectionFactory);
this.listenerExecutorService.execute(new Runnable() {
@Override
public void run() {
try {
- ioReactor.execute(ioEventDispatch);
+ ioReactor.execute();
} catch (final Exception ex) {
exceptionLogger.log(ex);
}
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandler.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandler.java Sat Jul 16 11:27:17 2016
@@ -29,78 +29,35 @@ package org.apache.hc.core5.http.impl.ni
import java.io.IOException;
-import javax.net.ssl.SSLContext;
-
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.nio.NHttpClientEventHandler;
-import org.apache.hc.core5.http.nio.NHttpConnectionFactory;
-import org.apache.hc.core5.reactor.AbstractIODispatch;
+import org.apache.hc.core5.reactor.AbstractIOEventHandler;
import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.ssl.SSLSetupHandler;
import org.apache.hc.core5.util.Args;
/**
- * Default {@link org.apache.hc.core5.reactor.IOEventDispatch} implementation
+ * Default {@link org.apache.hc.core5.reactor.IOEventHandler} implementation
* that supports both plain (non-encrypted) and SSL encrypted client side HTTP
* connections.
*
- * @since 4.2
+ * @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
-public class DefaultHttpClientIODispatch
- extends AbstractIODispatch<DefaultNHttpClientConnection> {
+public class DefaultHttpClientIOEventHandler
+ extends AbstractIOEventHandler<DefaultNHttpClientConnection> {
private final NHttpClientEventHandler handler;
- private final NHttpConnectionFactory<DefaultNHttpClientConnection> connFactory;
/**
* Creates a new instance of this class to be used for dispatching I/O event
* notifications to the given protocol handler.
*
* @param handler the client protocol handler.
- * @param connFactory HTTP client connection factory.
*/
- public DefaultHttpClientIODispatch(
- final NHttpClientEventHandler handler,
- final NHttpConnectionFactory<DefaultNHttpClientConnection> connFactory) {
+ public DefaultHttpClientIOEventHandler(final NHttpClientEventHandler handler) {
super();
this.handler = Args.notNull(handler, "HTTP client handler");
- this.connFactory = Args.notNull(connFactory, "HTTP client connection factory");
- }
-
- /**
- * @since 4.3
- */
- public DefaultHttpClientIODispatch(final NHttpClientEventHandler handler, final ConnectionConfig config) {
- this(handler, new DefaultNHttpClientConnectionFactory(config));
- }
-
- /**
- * @since 4.3
- */
- public DefaultHttpClientIODispatch(
- final NHttpClientEventHandler handler,
- final SSLContext sslcontext,
- final SSLSetupHandler sslHandler,
- final ConnectionConfig config) {
- this(handler, new SSLNHttpClientConnectionFactory(sslcontext, sslHandler, config));
- }
-
- /**
- * @since 4.3
- */
- public DefaultHttpClientIODispatch(
- final NHttpClientEventHandler handler,
- final SSLContext sslcontext,
- final ConnectionConfig config) {
- this(handler, new SSLNHttpClientConnectionFactory(sslcontext, null, config));
- }
-
- @Override
- protected DefaultNHttpClientConnection createConnection(final IOSession session) {
- return this.connFactory.createConnection(session);
}
@Override
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIODispatch.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java Sat Jul 16 11:27:17 2016
@@ -27,8 +27,6 @@
package org.apache.hc.core5.http.impl.nio;
-import java.io.IOException;
-
import javax.net.ssl.SSLContext;
import org.apache.hc.core5.annotation.Contract;
@@ -36,21 +34,20 @@ import org.apache.hc.core5.annotation.Th
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.nio.NHttpClientEventHandler;
import org.apache.hc.core5.http.nio.NHttpConnectionFactory;
-import org.apache.hc.core5.reactor.AbstractIODispatch;
+import org.apache.hc.core5.reactor.AbstractIOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ssl.SSLSetupHandler;
import org.apache.hc.core5.util.Args;
/**
- * Default {@link org.apache.hc.core5.reactor.IOEventDispatch} implementation
- * that supports both plain (non-encrypted) and SSL encrypted client side HTTP
- * connections.
+ * Factory for {@link DefaultHttpClientIOEventHandler}.
*
- * @since 4.2
+ * @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
-public class DefaultHttpClientIODispatch
- extends AbstractIODispatch<DefaultNHttpClientConnection> {
+public class DefaultHttpClientIOEventHandlerFactory implements IOEventHandlerFactory {
private final NHttpClientEventHandler handler;
private final NHttpConnectionFactory<DefaultNHttpClientConnection> connFactory;
@@ -62,7 +59,7 @@ public class DefaultHttpClientIODispatch
* @param handler the client protocol handler.
* @param connFactory HTTP client connection factory.
*/
- public DefaultHttpClientIODispatch(
+ public DefaultHttpClientIOEventHandlerFactory(
final NHttpClientEventHandler handler,
final NHttpConnectionFactory<DefaultNHttpClientConnection> connFactory) {
super();
@@ -70,17 +67,11 @@ public class DefaultHttpClientIODispatch
this.connFactory = Args.notNull(connFactory, "HTTP client connection factory");
}
- /**
- * @since 4.3
- */
- public DefaultHttpClientIODispatch(final NHttpClientEventHandler handler, final ConnectionConfig config) {
+ public DefaultHttpClientIOEventHandlerFactory(final NHttpClientEventHandler handler, final ConnectionConfig config) {
this(handler, new DefaultNHttpClientConnectionFactory(config));
}
- /**
- * @since 4.3
- */
- public DefaultHttpClientIODispatch(
+ public DefaultHttpClientIOEventHandlerFactory(
final NHttpClientEventHandler handler,
final SSLContext sslcontext,
final SSLSetupHandler sslHandler,
@@ -88,10 +79,7 @@ public class DefaultHttpClientIODispatch
this(handler, new SSLNHttpClientConnectionFactory(sslcontext, sslHandler, config));
}
- /**
- * @since 4.3
- */
- public DefaultHttpClientIODispatch(
+ public DefaultHttpClientIOEventHandlerFactory(
final NHttpClientEventHandler handler,
final SSLContext sslcontext,
final ConnectionConfig config) {
@@ -99,47 +87,9 @@ public class DefaultHttpClientIODispatch
}
@Override
- protected DefaultNHttpClientConnection createConnection(final IOSession session) {
- return this.connFactory.createConnection(session);
- }
-
- @Override
- protected void onConnected(final DefaultNHttpClientConnection conn) {
- final Object attachment = conn.getContext().getAttribute(IOSession.ATTACHMENT_KEY);
- try {
- this.handler.connected(conn, attachment);
- } catch (final Exception ex) {
- this.handler.exception(conn, ex);
- }
- }
-
- @Override
- protected void onClosed(final DefaultNHttpClientConnection conn) {
- this.handler.closed(conn);
+ public IOEventHandler createHandler(final IOSession ioSession) {
+ final DefaultNHttpClientConnection connection = this.connFactory.createConnection(ioSession);
+ ioSession.setAttribute(AbstractIOEventHandler.CONNECTION_KEY, connection);
+ return new DefaultHttpClientIOEventHandler(this.handler);
}
-
- @Override
- protected void onException(final DefaultNHttpClientConnection conn, final IOException ex) {
- this.handler.exception(conn, ex);
- }
-
- @Override
- protected void onInputReady(final DefaultNHttpClientConnection conn) {
- conn.consumeInput(this.handler);
- }
-
- @Override
- protected void onOutputReady(final DefaultNHttpClientConnection conn) {
- conn.produceOutput(this.handler);
- }
-
- @Override
- protected void onTimeout(final DefaultNHttpClientConnection conn) {
- try {
- this.handler.timeout(conn);
- } catch (final Exception ex) {
- this.handler.exception(conn, ex);
- }
- }
-
}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpClientIOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandler.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandler.java Sat Jul 16 11:27:17 2016
@@ -29,71 +29,28 @@ package org.apache.hc.core5.http.impl.ni
import java.io.IOException;
-import javax.net.ssl.SSLContext;
-
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.http.config.ConnectionConfig;
-import org.apache.hc.core5.http.nio.NHttpConnectionFactory;
import org.apache.hc.core5.http.nio.NHttpServerEventHandler;
-import org.apache.hc.core5.reactor.AbstractIODispatch;
-import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.ssl.SSLSetupHandler;
+import org.apache.hc.core5.reactor.AbstractIOEventHandler;
import org.apache.hc.core5.util.Args;
/**
- * Default {@link org.apache.hc.core5.reactor.IOEventDispatch} implementation
+ * Default {@link org.apache.hc.core5.reactor.IOEventHandler} implementation
* that supports both plain (non-encrypted) and SSL encrypted server side HTTP
* connections.
*
- * @since 4.2
+ * @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
-public class DefaultHttpServerIODispatch
- extends AbstractIODispatch<DefaultNHttpServerConnection> {
+public class DefaultHttpServerIOEventHandler
+ extends AbstractIOEventHandler<DefaultNHttpServerConnection> {
private final NHttpServerEventHandler handler;
- private final NHttpConnectionFactory<? extends DefaultNHttpServerConnection> connFactory;
- public DefaultHttpServerIODispatch(
- final NHttpServerEventHandler handler,
- final NHttpConnectionFactory<? extends DefaultNHttpServerConnection> connFactory) {
+ public DefaultHttpServerIOEventHandler(final NHttpServerEventHandler handler) {
super();
- this.handler = Args.notNull(handler, "HTTP client handler");
- this.connFactory = Args.notNull(connFactory, "HTTP server connection factory");
- }
-
- /**
- * @since 4.3
- */
- public DefaultHttpServerIODispatch(final NHttpServerEventHandler handler, final ConnectionConfig config) {
- this(handler, new DefaultNHttpServerConnectionFactory(config));
- }
-
- /**
- * @since 4.3
- */
- public DefaultHttpServerIODispatch(
- final NHttpServerEventHandler handler,
- final SSLContext sslcontext,
- final SSLSetupHandler sslHandler,
- final ConnectionConfig config) {
- this(handler, new SSLNHttpServerConnectionFactory(sslcontext, sslHandler, config));
- }
-
- /**
- * @since 4.3
- */
- public DefaultHttpServerIODispatch(
- final NHttpServerEventHandler handler,
- final SSLContext sslcontext,
- final ConnectionConfig config) {
- this(handler, new SSLNHttpServerConnectionFactory(sslcontext, null, config));
- }
-
- @Override
- protected DefaultNHttpServerConnection createConnection(final IOSession session) {
- return this.connFactory.createConnection(session);
+ this.handler = Args.notNull(handler, "HTTP server handler");
}
@Override
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIODispatch.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java Sat Jul 16 11:27:17 2016
@@ -27,8 +27,6 @@
package org.apache.hc.core5.http.impl.nio;
-import java.io.IOException;
-
import javax.net.ssl.SSLContext;
import org.apache.hc.core5.annotation.Contract;
@@ -36,26 +34,25 @@ import org.apache.hc.core5.annotation.Th
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.nio.NHttpConnectionFactory;
import org.apache.hc.core5.http.nio.NHttpServerEventHandler;
-import org.apache.hc.core5.reactor.AbstractIODispatch;
+import org.apache.hc.core5.reactor.AbstractIOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ssl.SSLSetupHandler;
import org.apache.hc.core5.util.Args;
/**
- * Default {@link org.apache.hc.core5.reactor.IOEventDispatch} implementation
- * that supports both plain (non-encrypted) and SSL encrypted server side HTTP
- * connections.
+ * Factory for {@link DefaultHttpServerIOEventHandler}.
*
- * @since 4.2
+ * @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
-public class DefaultHttpServerIODispatch
- extends AbstractIODispatch<DefaultNHttpServerConnection> {
+public class DefaultHttpServerIOEventHandlerFactory implements IOEventHandlerFactory {
private final NHttpServerEventHandler handler;
private final NHttpConnectionFactory<? extends DefaultNHttpServerConnection> connFactory;
- public DefaultHttpServerIODispatch(
+ public DefaultHttpServerIOEventHandlerFactory(
final NHttpServerEventHandler handler,
final NHttpConnectionFactory<? extends DefaultNHttpServerConnection> connFactory) {
super();
@@ -63,17 +60,11 @@ public class DefaultHttpServerIODispatch
this.connFactory = Args.notNull(connFactory, "HTTP server connection factory");
}
- /**
- * @since 4.3
- */
- public DefaultHttpServerIODispatch(final NHttpServerEventHandler handler, final ConnectionConfig config) {
+ public DefaultHttpServerIOEventHandlerFactory(final NHttpServerEventHandler handler, final ConnectionConfig config) {
this(handler, new DefaultNHttpServerConnectionFactory(config));
}
- /**
- * @since 4.3
- */
- public DefaultHttpServerIODispatch(
+ public DefaultHttpServerIOEventHandlerFactory(
final NHttpServerEventHandler handler,
final SSLContext sslcontext,
final SSLSetupHandler sslHandler,
@@ -81,10 +72,7 @@ public class DefaultHttpServerIODispatch
this(handler, new SSLNHttpServerConnectionFactory(sslcontext, sslHandler, config));
}
- /**
- * @since 4.3
- */
- public DefaultHttpServerIODispatch(
+ public DefaultHttpServerIOEventHandlerFactory(
final NHttpServerEventHandler handler,
final SSLContext sslcontext,
final ConnectionConfig config) {
@@ -92,46 +80,9 @@ public class DefaultHttpServerIODispatch
}
@Override
- protected DefaultNHttpServerConnection createConnection(final IOSession session) {
- return this.connFactory.createConnection(session);
- }
-
- @Override
- protected void onConnected(final DefaultNHttpServerConnection conn) {
- try {
- this.handler.connected(conn);
- } catch (final Exception ex) {
- this.handler.exception(conn, ex);
- }
- }
-
- @Override
- protected void onClosed(final DefaultNHttpServerConnection conn) {
- this.handler.closed(conn);
- }
-
- @Override
- protected void onException(final DefaultNHttpServerConnection conn, final IOException ex) {
- this.handler.exception(conn, ex);
+ public IOEventHandler createHandler(final IOSession ioSession) {
+ final DefaultNHttpServerConnection connection = this.connFactory.createConnection(ioSession);
+ ioSession.setAttribute(AbstractIOEventHandler.CONNECTION_KEY, connection);
+ return new DefaultHttpServerIOEventHandler(this.handler);
}
-
- @Override
- protected void onInputReady(final DefaultNHttpServerConnection conn) {
- conn.consumeInput(this.handler);
- }
-
- @Override
- protected void onOutputReady(final DefaultNHttpServerConnection conn) {
- conn.produceOutput(this.handler);
- }
-
- @Override
- protected void onTimeout(final DefaultNHttpServerConnection conn) {
- try {
- this.handler.timeout(conn);
- } catch (final Exception ex) {
- this.handler.exception(conn, ex);
- }
- }
-
}
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/DefaultHttpServerIOEventHandlerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/nio/BasicNIOConnPool.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/nio/BasicNIOConnPool.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/nio/BasicNIOConnPool.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/pool/nio/BasicNIOConnPool.java Sat Jul 16 11:27:17 2016
@@ -26,6 +26,7 @@
*/
package org.apache.hc.core5.http.pool.nio;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Future;
@@ -36,12 +37,13 @@ import org.apache.hc.core5.annotation.Co
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
-import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.nio.NHttpClientConnection;
import org.apache.hc.core5.pool.nio.AbstractNIOConnPool;
import org.apache.hc.core5.pool.nio.NIOConnFactory;
import org.apache.hc.core5.pool.nio.SocketAddressResolver;
+import org.apache.hc.core5.reactor.AbstractIOEventHandler;
import org.apache.hc.core5.reactor.ConnectingIOReactor;
+import org.apache.hc.core5.reactor.IOSession;
/**
* A very basic {@link org.apache.hc.core5.pool.ConnPool} implementation that
@@ -86,25 +88,22 @@ public class BasicNIOConnPool extends Ab
}
- /**
- * @since 4.3
- */
- public BasicNIOConnPool(
- final ConnectingIOReactor ioreactor,
- final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
- final int connectTimeout) {
- super(ioreactor, connFactory, new BasicAddressResolver(), DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_TOTAL_CONNECTIONS);
- this.connectTimeout = connectTimeout;
- }
+ static class BasicNIOConnFactory implements NIOConnFactory<HttpHost, NHttpClientConnection> {
+
+ public BasicNIOConnFactory() {
+ super();
+ }
+
+ @Override
+ public NHttpClientConnection create(final HttpHost route, final IOSession session) throws IOException {
+ final Object connAttribute = session.getAttribute(AbstractIOEventHandler.CONNECTION_KEY);
+ if (connAttribute instanceof NHttpClientConnection) {
+ return (NHttpClientConnection) connAttribute;
+ } else {
+ throw new IllegalStateException("I/O session has not been initialized");
+ }
+ }
- /**
- * @since 4.3
- */
- public BasicNIOConnPool(
- final ConnectingIOReactor ioreactor,
- final int connectTimeout,
- final ConnectionConfig config) {
- this(ioreactor, new BasicNIOConnFactory(config), connectTimeout);
}
/**
@@ -112,15 +111,16 @@ public class BasicNIOConnPool extends Ab
*/
public BasicNIOConnPool(
final ConnectingIOReactor ioreactor,
- final ConnectionConfig config) {
- this(ioreactor, new BasicNIOConnFactory(config), 0);
+ final int connectTimeout) {
+ super(ioreactor, new BasicNIOConnFactory(), new BasicAddressResolver(), DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_TOTAL_CONNECTIONS);
+ this.connectTimeout = connectTimeout;
}
/**
* @since 4.3
*/
public BasicNIOConnPool(final ConnectingIOReactor ioreactor) {
- this(ioreactor, new BasicNIOConnFactory(ConnectionConfig.DEFAULT), 0);
+ this(ioreactor, 0);
}
@Override
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/SocketAddressResolver.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/SocketAddressResolver.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/SocketAddressResolver.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/pool/nio/SocketAddressResolver.java Sat Jul 16 11:27:17 2016
@@ -32,7 +32,7 @@ import java.net.SocketAddress;
/**
* Strategy that resolves an abstract connection route to a local or a remote {@link SocketAddress}.
- * .
+ *
* @since 4.3
*/
public interface SocketAddressResolver<T> {
Copied: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java (from r1752926, httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIODispatch.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java?p2=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIODispatch.java&r1=1752926&r2=1752927&rev=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIODispatch.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java Sat Jul 16 11:27:17 2016
@@ -33,15 +33,15 @@ import org.apache.hc.core5.reactor.ssl.S
import org.apache.hc.core5.util.Asserts;
/**
- * Abstract {@link IOEventDispatch} implementation that supports both plain (non-encrypted)
+ * Abstract {@link IOEventHandler} implementation that supports both plain (non-encrypted)
* and SSL encrypted HTTP connections.
*
* @param <T> the connection type.
* @since 4.2
*/
-public abstract class AbstractIODispatch<T> implements IOEventDispatch {
+public abstract class AbstractIOEventHandler<T> implements IOEventHandler {
- protected abstract T createConnection(IOSession session);
+ public static final String CONNECTION_KEY = "http.connection";
protected abstract void onConnected(T conn);
@@ -55,19 +55,17 @@ public abstract class AbstractIODispatch
protected abstract void onTimeout(T conn);
- private void ensureNotNull(final T conn) {
+ @SuppressWarnings("unchecked")
+ private T ensureNotNull(final IOSession session) {
+ final T conn = (T) session.getAttribute(CONNECTION_KEY);
Asserts.notNull(conn, "HTTP connection");
+ return conn;
}
@Override
public void connected(final IOSession session) {
- @SuppressWarnings("unchecked")
- T conn = (T) session.getAttribute(IOEventDispatch.CONNECTION_KEY);
+ final T conn = ensureNotNull(session);
try {
- if (conn == null) {
- conn = createConnection(session);
- session.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
- }
onConnected(conn);
final SSLIOSession ssliosession = (SSLIOSession) session.getAttribute(
SSLIOSession.SESSION_KEY);
@@ -92,8 +90,7 @@ public abstract class AbstractIODispatch
@Override
public void disconnected(final IOSession session) {
@SuppressWarnings("unchecked")
- final
- T conn = (T) session.getAttribute(IOEventDispatch.CONNECTION_KEY);
+ final T conn = (T) session.getAttribute(CONNECTION_KEY);
if (conn != null) {
onClosed(conn);
}
@@ -101,11 +98,8 @@ public abstract class AbstractIODispatch
@Override
public void inputReady(final IOSession session) {
- @SuppressWarnings("unchecked")
- final
- T conn = (T) session.getAttribute(IOEventDispatch.CONNECTION_KEY);
+ final T conn = ensureNotNull(session);
try {
- ensureNotNull(conn);
final SSLIOSession ssliosession = (SSLIOSession) session.getAttribute(
SSLIOSession.SESSION_KEY);
if (ssliosession == null) {
@@ -133,10 +127,8 @@ public abstract class AbstractIODispatch
@Override
public void outputReady(final IOSession session) {
@SuppressWarnings("unchecked")
- final
- T conn = (T) session.getAttribute(IOEventDispatch.CONNECTION_KEY);
+ final T conn = ensureNotNull(session);
try {
- ensureNotNull(conn);
final SSLIOSession ssliosession = (SSLIOSession) session.getAttribute(
SSLIOSession.SESSION_KEY);
if (ssliosession == null) {
@@ -163,13 +155,10 @@ public abstract class AbstractIODispatch
@Override
public void timeout(final IOSession session) {
- @SuppressWarnings("unchecked")
- final
- T conn = (T) session.getAttribute(IOEventDispatch.CONNECTION_KEY);
+ final T conn = ensureNotNull(session);
try {
final SSLIOSession ssliosession = (SSLIOSession) session.getAttribute(
SSLIOSession.SESSION_KEY);
- ensureNotNull(conn);
onTimeout(conn);
if (ssliosession != null) {
synchronized (ssliosession) {
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOEventHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactor.java Sat Jul 16 11:27:17 2016
@@ -35,13 +35,12 @@ import java.nio.channels.ClosedSelectorE
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
/**
* Generic implementation of {@link IOReactor} that can used as a subclass
@@ -52,28 +51,30 @@ import org.apache.hc.core5.util.Args;
*/
public abstract class AbstractIOReactor implements IOReactor {
- private volatile IOReactorStatus status;
-
- private final Object statusMutex;
- private final long selectTimeout;
+ private final IOReactorConfig reactorConfig;
+ private final IOEventHandlerFactory eventHandlerFactory;
private final Selector selector;
- private final Set<IOSession> sessions;
private final Queue<IOSession> closedSessions;
- private final Queue<ChannelEntry> newChannels;
+ private final Queue<PendingSession> pendingSessions;
+ private final Object statusMutex;
+
+ private volatile IOReactorStatus status;
/**
* Creates new AbstractIOReactor instance.
*
- * @param selectTimeout the select timeout.
+ * @param eventHandlerFactory the event handler factory.
+ * @param reactorConfig the reactor configuration.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
- public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
+ public AbstractIOReactor(
+ final IOEventHandlerFactory eventHandlerFactory,
+ final IOReactorConfig reactorConfig) throws IOReactorException {
super();
- Args.positive(selectTimeout, "Select timeout");
- this.selectTimeout = selectTimeout;
- this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
+ this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
+ this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
this.closedSessions = new ConcurrentLinkedQueue<>();
- this.newChannels = new ConcurrentLinkedQueue<>();
+ this.pendingSessions = new ConcurrentLinkedQueue<>();
try {
this.selector = Selector.open();
} catch (final IOException ex) {
@@ -136,11 +137,9 @@ public abstract class AbstractIOReactor
* <p>
* Super-classes can implement this method to react to the event.
*
- * @param key the selection key.
* @param session new I/O session.
*/
- protected void sessionCreated(final SelectionKey key, final IOSession session) {
- }
+ protected abstract void sessionCreated(final IOSession session);
/**
* Triggered when a session has been closed.
@@ -149,8 +148,7 @@ public abstract class AbstractIOReactor
*
* @param session closed I/O session.
*/
- protected void sessionClosed(final IOSession session) {
- }
+ protected abstract void sessionClosed(final IOSession session);
/**
* Triggered when a session has timed out.
@@ -159,8 +157,7 @@ public abstract class AbstractIOReactor
*
* @param session timed out I/O session.
*/
- protected void sessionTimedOut(final IOSession session) {
- }
+ protected abstract void sessionTimedOut(final IOSession session);
/**
* Obtains {@link IOSession} instance associated with the given selection
@@ -173,26 +170,35 @@ public abstract class AbstractIOReactor
return (IOSession) key.attachment();
}
+ protected IOEventHandler ensureEventHandler(final IOSession ioSession) {
+ Asserts.notNull(ioSession, "IO session");
+ final IOEventHandler handler = ioSession.getHandler();
+ Asserts.notNull(handler, "IO event handler");
+ return handler;
+ }
+
@Override
public IOReactorStatus getStatus() {
return this.status;
}
/**
- * Adds new channel entry. The channel will be asynchronously registered
+ * Enqueues pending session. The socket channel will be asynchronously registered
* with the selector.
*
- * @param channelEntry the channel entry.
+ * @param socketChannel the new socketChannel.
+ * @param sessionRequest the session request if applicable.
*/
- public void addChannel(final ChannelEntry channelEntry) {
- Args.notNull(channelEntry, "Channel entry");
- this.newChannels.add(channelEntry);
+ public void enqueuePendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
+ Args.notNull(socketChannel, "SocketChannel");
+ this.pendingSessions.add(new PendingSession(socketChannel, sessionRequest));
this.selector.wakeup();
}
/**
- * Activates the I/O reactor. The I/O reactor will start reacting to
- * I/O events and triggering notification methods.
+ * Activates the I/O reactor. The I/O reactor will start reacting to I/O
+ * events and and dispatch I/O event notifications to the {@link IOEventHandler}
+ * associated with the given I/O session.
* <p>
* This method will enter the infinite I/O select loop on
* the {@link Selector} instance associated with this I/O reactor.
@@ -206,21 +212,22 @@ public abstract class AbstractIOReactor
* @see #writable(SelectionKey)
* @see #timeoutCheck(SelectionKey, long)
* @see #validate(Set)
- * @see #sessionCreated(SelectionKey, IOSession)
+ * @see #sessionCreated(IOSession)
* @see #sessionClosed(IOSession)
*
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
- protected void execute() throws InterruptedIOException, IOReactorException {
+ public void execute() throws InterruptedIOException, IOReactorException {
this.status = IOReactorStatus.ACTIVE;
+ final long selectTimeout = this.reactorConfig.getSelectInterval();
try {
for (;;) {
final int readyCount;
try {
- readyCount = this.selector.select(this.selectTimeout);
+ readyCount = this.selector.select(selectTimeout);
} catch (final InterruptedIOException ex) {
throw ex;
} catch (final IOException ex) {
@@ -236,7 +243,7 @@ public abstract class AbstractIOReactor
// Graceful shutdown in process
// Try to close things out nicely
closeSessions();
- closeNewChannels();
+ closePendingSessions();
}
// Process selected I/O events
@@ -252,12 +259,12 @@ public abstract class AbstractIOReactor
// If active process new channels
if (this.status == IOReactorStatus.ACTIVE) {
- processNewChannels();
+ processPendingSessions();
}
// Exit select loop if graceful shutdown has been completed
if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
- && this.sessions.isEmpty()) {
+ && this.selector.keys().isEmpty()) {
break;
}
}
@@ -303,81 +310,40 @@ public abstract class AbstractIOReactor
writable(key);
}
} catch (final CancelledKeyException ex) {
- queueClosedSession(session);
- key.attach(null);
+ session.shutdown();
}
}
- /**
- * Queues the given I/O session to be processed asynchronously as closed.
- *
- * @param session the closed I/O session.
- */
- protected void queueClosedSession(final IOSession session) {
- if (session != null) {
- this.closedSessions.add(session);
- }
- }
-
- private void processNewChannels() throws IOReactorException {
- ChannelEntry entry;
- while ((entry = this.newChannels.poll()) != null) {
-
- final SocketChannel channel;
- final SelectionKey key;
+ private void processPendingSessions() throws IOReactorException {
+ PendingSession pendingSession;
+ while ((pendingSession = this.pendingSessions.poll()) != null) {
+ final IOSession session;
try {
- channel = entry.getChannel();
- channel.configureBlocking(false);
- key = channel.register(this.selector, SelectionKey.OP_READ);
+ final SocketChannel socketChannel = pendingSession.socketChannel;
+ socketChannel.configureBlocking(false);
+ final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_READ);
+ session = new IOSessionImpl(key, socketChannel, this.closedSessions);
+ session.setHandler(this.eventHandlerFactory.createHandler(session));
+ session.setSocketTimeout(this.reactorConfig.getSoTimeout());
+ key.attach(session);
} catch (final ClosedChannelException ex) {
- final SessionRequestImpl sessionRequest = entry.getSessionRequest();
+ final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
if (sessionRequest != null) {
sessionRequest.failed(ex);
}
return;
-
} catch (final IOException ex) {
- throw new IOReactorException("Failure registering channel " +
- "with the selector", ex);
+ throw new IOReactorException("Failure registering channel with the selector", ex);
}
-
- final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
-
- @Override
- public void sessionClosed(final IOSession session) {
- queueClosedSession(session);
- }
-
- };
-
- final IOSession session;
try {
- session = new IOSessionImpl(key, sessionClosedCallback);
- int timeout = 0;
- try {
- timeout = channel.socket().getSoTimeout();
- } catch (final IOException ex) {
- // Very unlikely to happen and is not fatal
- // as the protocol layer is expected to overwrite
- // this value anyways
- }
-
- session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
- session.setSocketTimeout(timeout);
- } catch (final CancelledKeyException ex) {
- continue;
- }
- try {
- this.sessions.add(session);
- final SessionRequestImpl sessionRequest = entry.getSessionRequest();
+ final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
if (sessionRequest != null) {
+ session.setAttribute(IOSession.ATTACHMENT_KEY, sessionRequest.getAttachment());
sessionRequest.completed(session);
}
- key.attach(session);
- sessionCreated(key, session);
+ sessionCreated(session);
} catch (final CancelledKeyException ex) {
- queueClosedSession(session);
- key.attach(null);
+ session.shutdown();
}
}
}
@@ -385,12 +351,10 @@ public abstract class AbstractIOReactor
private void processClosedSessions() {
IOSession session;
while ((session = this.closedSessions.poll()) != null) {
- if (this.sessions.remove(session)) {
- try {
- sessionClosed(session);
- } catch (final CancelledKeyException ex) {
- // ignore and move on
- }
+ try {
+ sessionClosed(session);
+ } catch (final CancelledKeyException ex) {
+ // ignore and move on
}
}
}
@@ -416,30 +380,24 @@ public abstract class AbstractIOReactor
}
}
- /**
- * Closes out all I/O sessions maintained by this I/O reactor.
- */
- protected void closeSessions() {
- synchronized (this.sessions) {
- for (final IOSession session : this.sessions) {
+ private void closeSessions() {
+ final Set<SelectionKey> keys = this.selector.keys();
+ for (final SelectionKey key : keys) {
+ final IOSession session = getSession(key);
+ if (session != null) {
session.close();
}
}
}
- /**
- * Closes out all new channels pending registration with the selector of
- * this I/O reactor.
- * @throws IOReactorException - not thrown currently
- */
- protected void closeNewChannels() throws IOReactorException {
- ChannelEntry entry;
- while ((entry = this.newChannels.poll()) != null) {
- final SessionRequestImpl sessionRequest = entry.getSessionRequest();
+ private void closePendingSessions() {
+ PendingSession pendingSession;
+ while ((pendingSession = this.pendingSessions.poll()) != null) {
+ final SessionRequestImpl sessionRequest = pendingSession.sessionRequest;
if (sessionRequest != null) {
sessionRequest.cancel();
}
- final SocketChannel channel = entry.getChannel();
+ final SocketChannel channel = pendingSession.socketChannel;
try {
channel.close();
} catch (final IOException ignore) {
@@ -492,7 +450,7 @@ public abstract class AbstractIOReactor
this.status = IOReactorStatus.SHUT_DOWN;
}
- closeNewChannels();
+ closePendingSessions();
closeActiveChannels();
processClosedSessions();
}
@@ -539,4 +497,16 @@ public abstract class AbstractIOReactor
shutdown(1000);
}
+ private static class PendingSession {
+
+ final SocketChannel socketChannel;
+ final SessionRequestImpl sessionRequest;
+
+ private PendingSession(final SocketChannel socketChannel, final SessionRequestImpl sessionRequest) {
+ this.socketChannel = socketChannel;
+ this.sessionRequest = sessionRequest;
+ }
+
+ }
+
}
Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java?rev=1752927&r1=1752926&r2=1752927&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractMultiworkerIOReactor.java Sat Jul 16 11:27:17 2016
@@ -36,6 +36,7 @@ import java.nio.channels.ClosedSelectorE
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -89,11 +90,11 @@ public abstract class AbstractMultiworke
protected volatile IOReactorStatus status;
- protected final IOReactorConfig config;
+ protected final IOReactorConfig reactorConfig;
protected final Selector selector;
- protected final long selectTimeout;
private final int workerCount;
+ private final IOEventHandlerFactory eventHandlerFactory;
private final ThreadFactory threadFactory;
private final BaseIOReactor[] dispatchers;
private final Worker[] workers;
@@ -109,24 +110,26 @@ public abstract class AbstractMultiworke
/**
* Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
*
- * @param config I/O reactor configuration.
+ * @param eventHandlerFactory the factory to create I/O event handlers.
+ * @param reactorConfig I/O reactor configuration.
* @param threadFactory the factory to create threads.
* Can be {@code null}.
* @throws IOReactorException in case if a non-recoverable I/O error.
*
- * @since 4.2
+ * @since 5.0
*/
public AbstractMultiworkerIOReactor(
- final IOReactorConfig config,
+ final IOEventHandlerFactory eventHandlerFactory,
+ final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory) throws IOReactorException {
super();
- this.config = config != null ? config : IOReactorConfig.DEFAULT;
+ this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
+ this.reactorConfig = reactorConfig != null ? reactorConfig : IOReactorConfig.DEFAULT;
try {
this.selector = Selector.open();
} catch (final IOException ex) {
throw new IOReactorException("Failure opening selector", ex);
}
- this.selectTimeout = this.config.getSelectInterval();
this.statusLock = new Object();
if (threadFactory != null) {
this.threadFactory = threadFactory;
@@ -134,7 +137,7 @@ public abstract class AbstractMultiworke
this.threadFactory = new DefaultThreadFactory();
}
this.auditLog = new ArrayList<>();
- this.workerCount = this.config.getIoThreadCount();
+ this.workerCount = this.reactorConfig.getIoThreadCount();
this.dispatchers = new BaseIOReactor[workerCount];
this.workers = new Worker[workerCount];
this.threads = new Thread[workerCount];
@@ -148,8 +151,9 @@ public abstract class AbstractMultiworke
*
* @since 4.2
*/
- public AbstractMultiworkerIOReactor() throws IOReactorException {
- this(null, null);
+ public AbstractMultiworkerIOReactor(
+ final IOEventHandlerFactory eventHandlerFactory) throws IOReactorException {
+ this(eventHandlerFactory, null, null);
}
@Override
@@ -227,8 +231,8 @@ public abstract class AbstractMultiworke
* Activates the main I/O reactor as well as all worker I/O reactors.
* The I/O main reactor will start reacting to I/O events and triggering
* notification methods. The worker I/O reactor in their turn will start
- * reacting to I/O events and dispatch I/O event notifications to the given
- * {@link IOEventDispatch} interface.
+ * reacting to I/O events and dispatch I/O event notifications to the
+ * {@link IOEventHandler} associated with the given I/O session.
* <p>
* This method will enter the infinite I/O select loop on
* the {@link Selector} instance associated with this I/O reactor and used
@@ -246,9 +250,7 @@ public abstract class AbstractMultiworke
* @throws IOReactorException in case if a non-recoverable I/O error.
*/
@Override
- public void execute(
- final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
- Args.notNull(eventDispatch, "Event dispatcher");
+ public void execute() throws InterruptedIOException, IOReactorException {
synchronized (this.statusLock) {
if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
this.status = IOReactorStatus.SHUT_DOWN;
@@ -260,13 +262,13 @@ public abstract class AbstractMultiworke
this.status = IOReactorStatus.ACTIVE;
// Start I/O dispatchers
for (int i = 0; i < this.dispatchers.length; i++) {
- final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout);
+ final BaseIOReactor dispatcher = new BaseIOReactor(this.eventHandlerFactory, this.reactorConfig);
dispatcher.setExceptionHandler(exceptionHandler);
this.dispatchers[i] = dispatcher;
}
for (int i = 0; i < this.workerCount; i++) {
final BaseIOReactor dispatcher = this.dispatchers[i];
- this.workers[i] = new Worker(dispatcher, eventDispatch);
+ this.workers[i] = new Worker(dispatcher);
this.threads[i] = this.threadFactory.newThread(this.workers[i]);
}
}
@@ -279,10 +281,11 @@ public abstract class AbstractMultiworke
this.threads[i].start();
}
+ final long selectTimeout = this.reactorConfig.getSelectInterval();
for (;;) {
final int readyCount;
try {
- readyCount = this.selector.select(this.selectTimeout);
+ readyCount = this.selector.select(selectTimeout);
} catch (final InterruptedIOException ex) {
throw ex;
} catch (final IOException ex) {
@@ -376,7 +379,7 @@ public abstract class AbstractMultiworke
dispatcher.gracefulShutdown();
}
- final long gracePeriod = this.config.getShutdownGracePeriod();
+ final long gracePeriod = this.reactorConfig.getShutdownGracePeriod();
try {
// Force shut down I/O dispatchers if they fail to terminate
@@ -411,12 +414,13 @@ public abstract class AbstractMultiworke
/**
* Assigns the given channel entry to one of the worker I/O reactors.
*
- * @param entry the channel entry.
+ * @param channel the new channel.
+ * @param sessionRequest the session request if applicable.
*/
- protected void addChannel(final ChannelEntry entry) {
+ protected void enqueuePendingSession(final SocketChannel channel, final SessionRequestImpl sessionRequest) {
// Distribute new channels among the workers
final int i = Math.abs(this.currentWorker++ % this.workerCount);
- this.dispatchers[i].addChannel(entry);
+ this.dispatchers[i].enqueuePendingSession(channel, sessionRequest);
}
/**
@@ -439,18 +443,18 @@ public abstract class AbstractMultiworke
* @throws IOException in case of an I/O error.
*/
protected void prepareSocket(final Socket socket) throws IOException {
- socket.setTcpNoDelay(this.config.isTcpNoDelay());
- socket.setKeepAlive(this.config.isSoKeepalive());
- if (this.config.getSoTimeout() > 0) {
- socket.setSoTimeout(this.config.getSoTimeout());
+ socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
+ socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
+ if (this.reactorConfig.getSoTimeout() > 0) {
+ socket.setSoTimeout(this.reactorConfig.getSoTimeout());
}
- if (this.config.getSndBufSize() > 0) {
- socket.setSendBufferSize(this.config.getSndBufSize());
+ if (this.reactorConfig.getSndBufSize() > 0) {
+ socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
}
- if (this.config.getRcvBufSize() > 0) {
- socket.setReceiveBufferSize(this.config.getRcvBufSize());
+ if (this.reactorConfig.getRcvBufSize() > 0) {
+ socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
}
- final int linger = this.config.getSoLinger();
+ final int linger = this.reactorConfig.getSoLinger();
if (linger >= 0) {
socket.setSoLinger(true, linger);
}
@@ -517,20 +521,18 @@ public abstract class AbstractMultiworke
static class Worker implements Runnable {
final BaseIOReactor dispatcher;
- final IOEventDispatch eventDispatch;
private volatile Exception exception;
- public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
+ public Worker(final BaseIOReactor dispatcher) {
super();
this.dispatcher = dispatcher;
- this.eventDispatch = eventDispatch;
}
@Override
public void run() {
try {
- this.dispatcher.execute(this.eventDispatch);
+ this.dispatcher.execute();
} catch (final Exception ex) {
this.exception = ex;
}