You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/08/05 09:22:38 UTC
[httpcomponents-core] 02/13: Async connection listeners to support
passing attachments to endpoints
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch 5.1.x
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit 5a14c29d176dd2a9a7fde95d12d9bb30e08f6f5e
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Fri Apr 10 15:39:30 2020 +0200
Async connection listeners to support passing attachments to endpoints
---
.../hc/core5/http/impl/bootstrap/AsyncServer.java | 12 +++++-
...tenerEndpointRequest.java => ChannelEntry.java} | 47 ++++++----------------
.../hc/core5/reactor/ConnectionAcceptor.java | 3 ++
...ectionAcceptor.java => ConnectionListener.java} | 7 ++--
.../core5/reactor/DefaultListeningIOReactor.java | 23 ++++++-----
.../hc/core5/reactor/ListenerEndpointImpl.java | 6 ++-
.../hc/core5/reactor/ListenerEndpointRequest.java | 4 +-
.../hc/core5/reactor/SingleCoreIOReactor.java | 20 ++++-----
.../reactor/SingleCoreListeningIOReactor.java | 33 +++++++++------
9 files changed, 82 insertions(+), 73 deletions(-)
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
index 480b592..8a5c040 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
@@ -40,6 +40,7 @@ import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.ConnectionAcceptor;
import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.ConnectionListener;
import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -53,7 +54,8 @@ import org.apache.hc.core5.util.TimeValue;
/**
* Protocol agnostic server side I/O session handler.
*/
-public class AsyncServer extends AbstractConnectionInitiatorBase implements IOReactorService, ConnectionAcceptor {
+public class AsyncServer extends AbstractConnectionInitiatorBase
+ implements IOReactorService, ConnectionListener, ConnectionAcceptor {
private final DefaultListeningIOReactor ioReactor;
@@ -87,8 +89,14 @@ public class AsyncServer extends AbstractConnectionInitiatorBase implements IORe
}
@Override
+ public Future<ListenerEndpoint> listen(
+ final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
+ return ioReactor.listen(address, attachment, callback);
+ }
+
+ @Override
public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
- return ioReactor.listen(address, callback);
+ return listen(address, null, callback);
}
public Future<ListenerEndpoint> listen(final SocketAddress address) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
similarity index 57%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
index 53b0c16..9451bf84 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
@@ -27,46 +27,25 @@
package org.apache.hc.core5.reactor;
-import java.io.Closeable;
-import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
-import org.apache.hc.core5.concurrent.BasicFuture;
+final class ChannelEntry {
-final class ListenerEndpointRequest implements Closeable {
+ final SocketChannel channel;
+ final Object attachment;
- final SocketAddress address;
- final BasicFuture<ListenerEndpoint> future;
-
- ListenerEndpointRequest(final SocketAddress address, final BasicFuture<ListenerEndpoint> future) {
- this.address = address;
- this.future = future;
- }
-
- public void completed(final ListenerEndpoint endpoint) {
- if (future != null) {
- future.completed(endpoint);
- }
- }
-
- public void failed(final Exception cause) {
- if (future != null) {
- future.failed(cause);
- }
- }
-
- public void cancel() {
- if (future != null) {
- future.cancel();
- }
- }
-
- public boolean isCancelled() {
- return future != null && future.isCancelled();
+ public ChannelEntry(final SocketChannel channel, final Object attachment) {
+ super();
+ this.channel = channel;
+ this.attachment = attachment;
}
@Override
- public void close() {
- cancel();
+ public String toString() {
+ return "[" +
+ "channel=" + channel +
+ ", attachment=" + attachment +
+ ']';
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
index e98eb61..490287b 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
@@ -38,7 +38,10 @@ import org.apache.hc.core5.concurrent.FutureCallback;
* Non-blocking connection acceptor.
*
* @since 5.0
+ *
+ * @deprecated Use {@link ConnectionListener}
*/
+@Deprecated
public interface ConnectionAcceptor {
/**
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionListener.java
similarity index 91%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionListener.java
index e98eb61..a70ac50 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionListener.java
@@ -37,9 +37,9 @@ import org.apache.hc.core5.concurrent.FutureCallback;
/**
* Non-blocking connection acceptor.
*
- * @since 5.0
+ * @since 5.1
*/
-public interface ConnectionAcceptor {
+public interface ConnectionListener {
/**
* Opens a new listener endpoint with the given socket address. Once
@@ -48,10 +48,11 @@ public interface ConnectionAcceptor {
* dispatcher.
*
* @param address the socket address to listen on.
+ * @param attachment the attachment object.
* @param callback the result callback.
* @return listener endpoint.
*/
- Future<ListenerEndpoint> listen(SocketAddress address, FutureCallback<ListenerEndpoint> callback);
+ Future<ListenerEndpoint> listen(SocketAddress address, Object attachment, FutureCallback<ListenerEndpoint> callback);
/**
* Suspends the I/O reactor preventing it from accepting new connections on
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
index 0a9a84c..b3b6a6a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
@@ -29,7 +29,6 @@ package org.apache.hc.core5.reactor;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
@@ -44,14 +43,14 @@ import org.apache.hc.core5.util.TimeValue;
/**
* Multi-core I/O reactor that can ask as both {@link ConnectionInitiator}
- * and {@link ConnectionAcceptor}. Internally this I/O reactor distributes newly created
+ * and {@link ConnectionListener}. Internally this I/O reactor distributes newly created
* I/O session equally across multiple I/O worker threads for a more optimal resource
* utilization and a better I/O performance. Usually it is recommended to have
* one worker I/O reactor per physical CPU core.
*
* @since 4.0
*/
-public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionAcceptor {
+public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionListener, ConnectionAcceptor {
private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
@@ -98,11 +97,11 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
}
final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount);
- this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, new Callback<SocketChannel>() {
+ this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, new Callback<ChannelEntry>() {
@Override
- public void execute(final SocketChannel channel) {
- enqueueChannel(channel);
+ public void execute(final ChannelEntry entry) {
+ enqueueChannel(entry);
}
});
@@ -147,8 +146,14 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
}
@Override
+ public Future<ListenerEndpoint> listen(
+ final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
+ return listener.listen(address, attachment, callback);
+ }
+
+ @Override
public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
- return listener.listen(address, callback);
+ return listen(address, null, callback);
}
public Future<ListenerEndpoint> listen(final SocketAddress address) {
@@ -180,9 +185,9 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
return workerSelector;
}
- private void enqueueChannel(final SocketChannel socketChannel) {
+ private void enqueueChannel(final ChannelEntry entry) {
try {
- workerSelector.next().enqueueChannel(socketChannel);
+ workerSelector.next().enqueueChannel(entry);
} catch (final IOReactorShutdownException ex) {
initiateShutdown();
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java
index e16be0d..f5ac698 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java
@@ -38,13 +38,15 @@ import org.apache.hc.core5.io.Closer;
class ListenerEndpointImpl implements ListenerEndpoint {
private final SelectionKey key;
- private final SocketAddress address;
+ final SocketAddress address;
+ final Object attachment;
private final AtomicBoolean closed;
- public ListenerEndpointImpl(final SelectionKey key, final SocketAddress address) {
+ public ListenerEndpointImpl(final SelectionKey key, final Object attachment, final SocketAddress address) {
super();
this.key = key;
this.address = address;
+ this.attachment = attachment;
this.closed = new AtomicBoolean(false);
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
index 53b0c16..faebb22 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
@@ -35,10 +35,12 @@ import org.apache.hc.core5.concurrent.BasicFuture;
final class ListenerEndpointRequest implements Closeable {
final SocketAddress address;
+ final Object attachment;
final BasicFuture<ListenerEndpoint> future;
- ListenerEndpointRequest(final SocketAddress address, final BasicFuture<ListenerEndpoint> future) {
+ ListenerEndpointRequest(final SocketAddress address, final Object attachment, final BasicFuture<ListenerEndpoint> future) {
this.address = address;
+ this.attachment = attachment;
this.future = future;
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index f986030..b691f3d 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -65,7 +65,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
private final IOSessionListener sessionListener;
private final Callback<IOSession> sessionShutdownCallback;
private final Queue<InternalDataChannel> closedSessions;
- private final Queue<SocketChannel> channelQueue;
+ private final Queue<ChannelEntry> channelQueue;
private final Queue<IOSessionRequest> requestQueue;
private final AtomicBoolean shutdownInitiated;
private final long selectTimeoutMillis;
@@ -91,12 +91,11 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
}
- void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
- Args.notNull(socketChannel, "SocketChannel");
+ void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
- this.channelQueue.add(socketChannel);
+ this.channelQueue.add(entry);
this.selector.wakeup();
}
@@ -186,8 +185,10 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
}
private void processPendingChannels() throws IOException {
- SocketChannel socketChannel;
- for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
+ ChannelEntry entry;
+ for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
+ final SocketChannel socketChannel = entry.channel;
+ final Object attachment = entry.attachment;
try {
prepareSocket(socketChannel.socket());
socketChannel.configureBlocking(false);
@@ -212,7 +213,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
null,
sessionListener,
closedSessions);
- dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
+ dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
key.attach(dataChannel);
dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
@@ -384,8 +385,9 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
}
private void closePendingChannels() {
- SocketChannel socketChannel;
- while ((socketChannel = this.channelQueue.poll()) != null) {
+ ChannelEntry entry;
+ while ((entry = this.channelQueue.poll()) != null) {
+ final SocketChannel socketChannel = entry.channel;
try {
socketChannel.close();
} catch (final IOException ex) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
index dff280a..d3603b8 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
@@ -50,19 +50,19 @@ import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.io.Closer;
-class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor {
+class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionListener, ConnectionAcceptor {
private final IOReactorConfig reactorConfig;
- private final Callback<SocketChannel> callback;
+ private final Callback<ChannelEntry> callback;
private final Queue<ListenerEndpointRequest> requestQueue;
- private final ConcurrentMap<ListenerEndpoint, Boolean> endpoints;
+ private final ConcurrentMap<ListenerEndpointImpl, Boolean> endpoints;
private final AtomicBoolean paused;
private final long selectTimeoutMillis;
SingleCoreListeningIOReactor(
final Callback<Exception> exceptionCallback,
final IOReactorConfig ioReactorConfig,
- final Callback<SocketChannel> callback) {
+ final Callback<ChannelEntry> callback) {
super(exceptionCallback);
this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
this.callback = callback;
@@ -124,28 +124,35 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
if (socketChannel == null) {
break;
}
- this.callback.execute(socketChannel);
+ final ListenerEndpointRequest endpointRequest = (ListenerEndpointRequest) key.attachment();
+ this.callback.execute(new ChannelEntry(socketChannel, endpointRequest.attachment));
}
}
} catch (final CancelledKeyException ex) {
- final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
+ final ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
this.endpoints.remove(endpoint);
key.attach(null);
}
}
@Override
- public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
+ public Future<ListenerEndpoint> listen(
+ final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
final BasicFuture<ListenerEndpoint> future = new BasicFuture<>(callback);
- this.requestQueue.add(new ListenerEndpointRequest(address, future));
+ this.requestQueue.add(new ListenerEndpointRequest(address, attachment, future));
this.selector.wakeup();
return future;
}
+ @Override
+ public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
+ return listen(address, null, callback);
+ }
+
private void processSessionRequests() throws IOException {
ListenerEndpointRequest request;
while ((request = this.requestQueue.poll()) != null) {
@@ -174,7 +181,7 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
key.attach(request);
- final ListenerEndpoint endpoint = new ListenerEndpointImpl(key, socket.getLocalSocketAddress());
+ final ListenerEndpointImpl endpoint = new ListenerEndpointImpl(key, request.attachment, socket.getLocalSocketAddress());
this.endpoints.put(endpoint, Boolean.TRUE);
request.completed(endpoint);
} catch (final IOException ex) {
@@ -187,7 +194,7 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
@Override
public Set<ListenerEndpoint> getEndpoints() {
final Set<ListenerEndpoint> set = new HashSet<>();
- final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
+ final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
while (it.hasNext()) {
final ListenerEndpoint endpoint = it.next();
if (!endpoint.isClosed()) {
@@ -202,12 +209,12 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
@Override
public void pause() throws IOException {
if (paused.compareAndSet(false, true)) {
- final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
+ final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
while (it.hasNext()) {
- final ListenerEndpoint endpoint = it.next();
+ final ListenerEndpointImpl endpoint = it.next();
if (!endpoint.isClosed()) {
endpoint.close();
- this.requestQueue.add(new ListenerEndpointRequest(endpoint.getAddress(), null));
+ this.requestQueue.add(new ListenerEndpointRequest(endpoint.address, endpoint.attachment, null));
}
it.remove();
}