You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/09/12 12:38:55 UTC

[httpcomponents-core] 02/18: Async connection listeners to support passing attachments to endpoints

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 4e52bcf62fd35527e3bbce3db34a4f131d729905
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Fri Apr 10 15:39:30 2020 +0200

    Async connection listeners to support passing attachments to endpoints
---
 .../hc/core5/http/impl/bootstrap/AsyncServer.java  | 12 +++++-
 ...tenerEndpointRequest.java => ChannelEntry.java} | 47 ++++++----------------
 .../hc/core5/reactor/ConnectionAcceptor.java       |  3 ++
 ...ectionAcceptor.java => ConnectionListener.java} |  7 ++--
 .../core5/reactor/DefaultListeningIOReactor.java   | 23 ++++++-----
 .../hc/core5/reactor/ListenerEndpointImpl.java     |  6 ++-
 .../hc/core5/reactor/ListenerEndpointRequest.java  |  4 +-
 .../hc/core5/reactor/SingleCoreIOReactor.java      | 20 ++++-----
 .../reactor/SingleCoreListeningIOReactor.java      | 33 +++++++++------
 9 files changed, 82 insertions(+), 73 deletions(-)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
index 480b592..8a5c040 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java
@@ -40,6 +40,7 @@ import org.apache.hc.core5.function.Decorator;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.reactor.ConnectionAcceptor;
 import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.ConnectionListener;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -53,7 +54,8 @@ import org.apache.hc.core5.util.TimeValue;
 /**
  * Protocol agnostic server side I/O session handler.
  */
-public class AsyncServer extends AbstractConnectionInitiatorBase implements IOReactorService, ConnectionAcceptor {
+public class AsyncServer extends AbstractConnectionInitiatorBase
+        implements IOReactorService, ConnectionListener, ConnectionAcceptor {
 
     private final DefaultListeningIOReactor ioReactor;
 
@@ -87,8 +89,14 @@ public class AsyncServer extends AbstractConnectionInitiatorBase implements IORe
     }
 
     @Override
+    public Future<ListenerEndpoint> listen(
+            final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
+        return ioReactor.listen(address, attachment, callback);
+    }
+
+    @Override
     public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
-        return ioReactor.listen(address, callback);
+        return listen(address, null, callback);
     }
 
     public Future<ListenerEndpoint> listen(final SocketAddress address) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
similarity index 57%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
index 53b0c16..9451bf84 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ChannelEntry.java
@@ -27,46 +27,25 @@
 
 package org.apache.hc.core5.reactor;
 
-import java.io.Closeable;
-import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
 
-import org.apache.hc.core5.concurrent.BasicFuture;
+final class ChannelEntry {
 
-final class ListenerEndpointRequest implements Closeable {
+    final SocketChannel channel;
+    final Object attachment;
 
-    final SocketAddress address;
-    final BasicFuture<ListenerEndpoint> future;
-
-    ListenerEndpointRequest(final SocketAddress address, final BasicFuture<ListenerEndpoint> future) {
-        this.address = address;
-        this.future = future;
-    }
-
-    public void completed(final ListenerEndpoint endpoint) {
-        if (future != null) {
-            future.completed(endpoint);
-        }
-    }
-
-    public void failed(final Exception cause) {
-        if (future != null) {
-            future.failed(cause);
-        }
-    }
-
-    public void cancel() {
-        if (future != null) {
-            future.cancel();
-        }
-    }
-
-    public boolean isCancelled() {
-        return future != null && future.isCancelled();
+    public ChannelEntry(final SocketChannel channel, final Object attachment) {
+        super();
+        this.channel = channel;
+        this.attachment = attachment;
     }
 
     @Override
-    public void close() {
-        cancel();
+    public String toString() {
+        return "[" +
+                "channel=" + channel +
+                ", attachment=" + attachment +
+                ']';
     }
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
index e98eb61..490287b 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
@@ -38,7 +38,10 @@ import org.apache.hc.core5.concurrent.FutureCallback;
  * Non-blocking connection acceptor.
  *
  * @since 5.0
+ *
+ * @deprecated Use {@link ConnectionListener}
  */
+@Deprecated
 public interface ConnectionAcceptor {
 
     /**
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionListener.java
similarity index 91%
copy from httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
copy to httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionListener.java
index e98eb61..a70ac50 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionAcceptor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ConnectionListener.java
@@ -37,9 +37,9 @@ import org.apache.hc.core5.concurrent.FutureCallback;
 /**
  * Non-blocking connection acceptor.
  *
- * @since 5.0
+ * @since 5.1
  */
-public interface ConnectionAcceptor {
+public interface ConnectionListener {
 
     /**
      * Opens a new listener endpoint with the given socket address. Once
@@ -48,10 +48,11 @@ public interface ConnectionAcceptor {
      * dispatcher.
      *
      * @param address the socket address to listen on.
+     * @param attachment the attachment object.
      * @param callback the result callback.
      * @return listener endpoint.
      */
-    Future<ListenerEndpoint> listen(SocketAddress address, FutureCallback<ListenerEndpoint> callback);
+    Future<ListenerEndpoint> listen(SocketAddress address, Object attachment, FutureCallback<ListenerEndpoint> callback);
 
     /**
      * Suspends the I/O reactor preventing it from accepting new connections on
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
index 0a9a84c..b3b6a6a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java
@@ -29,7 +29,6 @@ package org.apache.hc.core5.reactor;
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
@@ -44,14 +43,14 @@ import org.apache.hc.core5.util.TimeValue;
 
 /**
  * Multi-core I/O reactor that can ask as both {@link ConnectionInitiator}
- * and {@link ConnectionAcceptor}. Internally this I/O reactor distributes newly created
+ * and {@link ConnectionListener}. Internally this I/O reactor distributes newly created
  * I/O session equally across multiple I/O worker threads for a more optimal resource
  * utilization and a better I/O performance. Usually it is recommended to have
  * one worker I/O reactor per physical CPU core.
  *
  * @since 4.0
  */
-public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionAcceptor {
+public class DefaultListeningIOReactor extends AbstractIOReactorBase implements ConnectionListener, ConnectionAcceptor {
 
     private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true);
     private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true);
@@ -98,11 +97,11 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
         }
         final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1];
         System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount);
-        this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, new Callback<SocketChannel>() {
+        this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, new Callback<ChannelEntry>() {
 
             @Override
-            public void execute(final SocketChannel channel) {
-                enqueueChannel(channel);
+            public void execute(final ChannelEntry entry) {
+                enqueueChannel(entry);
             }
 
         });
@@ -147,8 +146,14 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
     }
 
     @Override
+    public Future<ListenerEndpoint> listen(
+            final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
+        return listener.listen(address, attachment, callback);
+    }
+
+    @Override
     public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
-        return listener.listen(address, callback);
+        return listen(address, null, callback);
     }
 
     public Future<ListenerEndpoint> listen(final SocketAddress address) {
@@ -180,9 +185,9 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements
         return workerSelector;
     }
 
-    private void enqueueChannel(final SocketChannel socketChannel) {
+    private void enqueueChannel(final ChannelEntry entry) {
         try {
-            workerSelector.next().enqueueChannel(socketChannel);
+            workerSelector.next().enqueueChannel(entry);
         } catch (final IOReactorShutdownException ex) {
             initiateShutdown();
         }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java
index e16be0d..f5ac698 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointImpl.java
@@ -38,13 +38,15 @@ import org.apache.hc.core5.io.Closer;
 class ListenerEndpointImpl implements ListenerEndpoint {
 
     private final SelectionKey key;
-    private final SocketAddress address;
+    final SocketAddress address;
+    final Object attachment;
     private final AtomicBoolean closed;
 
-    public ListenerEndpointImpl(final SelectionKey key, final SocketAddress address) {
+    public ListenerEndpointImpl(final SelectionKey key, final Object attachment, final SocketAddress address) {
         super();
         this.key = key;
         this.address = address;
+        this.attachment = attachment;
         this.closed = new AtomicBoolean(false);
     }
 
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
index 53b0c16..faebb22 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListenerEndpointRequest.java
@@ -35,10 +35,12 @@ import org.apache.hc.core5.concurrent.BasicFuture;
 final class ListenerEndpointRequest implements Closeable {
 
     final SocketAddress address;
+    final Object attachment;
     final BasicFuture<ListenerEndpoint> future;
 
-    ListenerEndpointRequest(final SocketAddress address, final BasicFuture<ListenerEndpoint> future) {
+    ListenerEndpointRequest(final SocketAddress address, final Object attachment, final BasicFuture<ListenerEndpoint> future) {
         this.address = address;
+        this.attachment = attachment;
         this.future = future;
     }
 
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index f986030..b691f3d 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -65,7 +65,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
     private final IOSessionListener sessionListener;
     private final Callback<IOSession> sessionShutdownCallback;
     private final Queue<InternalDataChannel> closedSessions;
-    private final Queue<SocketChannel> channelQueue;
+    private final Queue<ChannelEntry> channelQueue;
     private final Queue<IOSessionRequest> requestQueue;
     private final AtomicBoolean shutdownInitiated;
     private final long selectTimeoutMillis;
@@ -91,12 +91,11 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
         this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
     }
 
-    void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
-        Args.notNull(socketChannel, "SocketChannel");
+    void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
         if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
             throw new IOReactorShutdownException("I/O reactor has been shut down");
         }
-        this.channelQueue.add(socketChannel);
+        this.channelQueue.add(entry);
         this.selector.wakeup();
     }
 
@@ -186,8 +185,10 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
     }
 
     private void processPendingChannels() throws IOException {
-        SocketChannel socketChannel;
-        for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
+        ChannelEntry entry;
+        for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
+            final SocketChannel socketChannel = entry.channel;
+            final Object attachment = entry.attachment;
             try {
                 prepareSocket(socketChannel.socket());
                 socketChannel.configureBlocking(false);
@@ -212,7 +213,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
                     null,
                     sessionListener,
                     closedSessions);
-            dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
+            dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
             key.attach(dataChannel);
             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
@@ -384,8 +385,9 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
     }
 
     private void closePendingChannels() {
-        SocketChannel socketChannel;
-        while ((socketChannel = this.channelQueue.poll()) != null) {
+        ChannelEntry entry;
+        while ((entry = this.channelQueue.poll()) != null) {
+            final SocketChannel socketChannel = entry.channel;
             try {
                 socketChannel.close();
             } catch (final IOException ex) {
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
index dff280a..d3603b8 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
@@ -50,19 +50,19 @@ import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.io.Closer;
 
-class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor {
+class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionListener, ConnectionAcceptor {
 
     private final IOReactorConfig reactorConfig;
-    private final Callback<SocketChannel> callback;
+    private final Callback<ChannelEntry> callback;
     private final Queue<ListenerEndpointRequest> requestQueue;
-    private final ConcurrentMap<ListenerEndpoint, Boolean> endpoints;
+    private final ConcurrentMap<ListenerEndpointImpl, Boolean> endpoints;
     private final AtomicBoolean paused;
     private final long selectTimeoutMillis;
 
     SingleCoreListeningIOReactor(
             final Callback<Exception> exceptionCallback,
             final IOReactorConfig ioReactorConfig,
-            final Callback<SocketChannel> callback) {
+            final Callback<ChannelEntry> callback) {
         super(exceptionCallback);
         this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
         this.callback = callback;
@@ -124,28 +124,35 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
                     if (socketChannel == null) {
                         break;
                     }
-                    this.callback.execute(socketChannel);
+                    final ListenerEndpointRequest endpointRequest = (ListenerEndpointRequest) key.attachment();
+                    this.callback.execute(new ChannelEntry(socketChannel, endpointRequest.attachment));
                 }
             }
 
         } catch (final CancelledKeyException ex) {
-            final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
+            final ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
             this.endpoints.remove(endpoint);
             key.attach(null);
         }
     }
 
     @Override
-    public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
+    public Future<ListenerEndpoint> listen(
+            final SocketAddress address, final Object attachment, final FutureCallback<ListenerEndpoint> callback) {
         if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
             throw new IOReactorShutdownException("I/O reactor has been shut down");
         }
         final BasicFuture<ListenerEndpoint> future = new BasicFuture<>(callback);
-        this.requestQueue.add(new ListenerEndpointRequest(address, future));
+        this.requestQueue.add(new ListenerEndpointRequest(address, attachment, future));
         this.selector.wakeup();
         return future;
     }
 
+    @Override
+    public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint> callback) {
+        return listen(address, null, callback);
+    }
+
     private void processSessionRequests() throws IOException {
         ListenerEndpointRequest request;
         while ((request = this.requestQueue.poll()) != null) {
@@ -174,7 +181,7 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
 
                 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
                 key.attach(request);
-                final ListenerEndpoint endpoint = new ListenerEndpointImpl(key, socket.getLocalSocketAddress());
+                final ListenerEndpointImpl endpoint = new ListenerEndpointImpl(key, request.attachment, socket.getLocalSocketAddress());
                 this.endpoints.put(endpoint, Boolean.TRUE);
                 request.completed(endpoint);
             } catch (final IOException ex) {
@@ -187,7 +194,7 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
     @Override
     public Set<ListenerEndpoint> getEndpoints() {
         final Set<ListenerEndpoint> set = new HashSet<>();
-        final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
+        final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
         while (it.hasNext()) {
             final ListenerEndpoint endpoint = it.next();
             if (!endpoint.isClosed()) {
@@ -202,12 +209,12 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
     @Override
     public void pause() throws IOException {
         if (paused.compareAndSet(false, true)) {
-            final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
+            final Iterator<ListenerEndpointImpl> it = this.endpoints.keySet().iterator();
             while (it.hasNext()) {
-                final ListenerEndpoint endpoint = it.next();
+                final ListenerEndpointImpl endpoint = it.next();
                 if (!endpoint.isClosed()) {
                     endpoint.close();
-                    this.requestQueue.add(new ListenerEndpointRequest(endpoint.getAddress(), null));
+                    this.requestQueue.add(new ListenerEndpointRequest(endpoint.address, endpoint.attachment, null));
                 }
                 it.remove();
             }