You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hc.apache.org by "arturobernalg (via GitHub)" <gi...@apache.org> on 2023/04/04 17:09:26 UTC

[GitHub] [httpcomponents-client] arturobernalg commented on a diff in pull request #428: Implement HappyEyeballsV2AsyncClientConnectionOperator

arturobernalg commented on code in PR #428:
URL: https://github.com/apache/httpcomponents-client/pull/428#discussion_r1157540582


##########
httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/HappyEyeballsV2AsyncClientConnectionOperator.java:
##########
@@ -0,0 +1,680 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.nio;
+
+import org.apache.hc.client5.http.DnsResolver;
+import org.apache.hc.client5.http.SchemePortResolver;
+import org.apache.hc.client5.http.SystemDefaultDnsResolver;
+import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
+import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
+import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.config.Lookup;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactor.ConnectionInitiator;
+import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+/**
+ * The {@link AsyncClientConnectionOperator} implementation that uses Happy Eyeballs V2 algorithm to connect
+ * to the target server. Happy Eyeballs V2 (HEV2) algorithm is used to connect to the target server by concurrently
+ * attempting to establish multiple connections to different IP addresses. The first connection to complete
+ * successfully is selected and the others are closed. If all connections fail, the last error is rethrown.
+ * The algorithm also applies a configurable delay before subsequent connection attempts. HEV2 was introduced
+ * as a means to mitigate the latency issues caused by IPv4 and IPv6 co-existence in the Internet. HEV2 is defined
+ * in RFC 8305.
+ *
+ * <p>
+ * This connection operator maintains a connection pool for each unique route (combination of target host and
+ * target port) and selects the next connection from the pool to establish a new connection or reuse an
+ * existing connection. The connection pool uses a First-In-First-Out (FIFO) queue and has a configurable limit
+ * on the maximum number of connections that can be kept alive in the pool. Once the maximum number of connections
+ * has been reached, the oldest connection in the pool is closed to make room for a new one.
+ * </p>
+ *
+ * <p>
+ * This class is thread-safe and can be used in a multi-threaded environment.
+ * </p>
+ *
+ * <p>
+ * The HEV2 algorithm is configurable through the following parameters:
+ * <ul>
+ *   <li>{@code dualStackEnabled}: Whether to enable dual-stack connectivity. When set to {@code true},
+ *   the operator attempts to connect to both IPv4 and IPv6 addresses concurrently. When set to {@code false},
+ *   only IPv4 or IPv6 addresses are attempted depending on the address type of the target server.</li>
+ *   <li>{@code maxAttempts}: The maximum number of connection attempts to be made before failing. If all
+ *   attempts fail, the last error is rethrown.</li>
+ *   <li>{@code delay}: The delay (in milliseconds) to apply before subsequent connection attempts.</li>
+ *   <li>{@code connectTimeout}: The connection timeout (in milliseconds) for each attempt.</li>
+ * </ul>
+ * </p>
+ *
+ *
+ * <p>
+ * This class can be used with any {@link org.apache.hc.core5.http.nio.AsyncClientEndpoint} implementation
+ * that supports HTTP/1.1 or HTTP/2 protocols.
+ * </p>
+ *
+ * @since 5.3
+ */
+public class HappyEyeballsV2AsyncClientConnectionOperator implements AsyncClientConnectionOperator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncClientConnectionOperator.class);
+
+    /**
+     * The underlying {@link AsyncClientConnectionOperator} that is used to establish connections
+     * to the target server.
+     */
+    private final AsyncClientConnectionOperator connectionOperator;
+
+    /**
+     * The DNS resolver used to resolve hostnames to IP addresses.
+     */
+    private final DnsResolver dnsResolver;
+
+    /**
+     * A lookup table used to determine the {@link TlsStrategy} to use for a given connection route.
+     */
+    private final Lookup<TlsStrategy> tlsStrategyLookup;
+
+    /**
+     * The default timeout for connection establishment attempts. If a connection cannot be established
+     * within this timeout, the attempt is considered failed.
+     */
+    private final Timeout timeout;
+
+    /**
+     * The minimum delay between connection establishment attempts.
+     */
+    private final Timeout minimumConnectionAttemptDelay;
+
+    /**
+     * The maximum delay between connection establishment attempts.
+     */
+    private final Timeout maximumConnectionAttemptDelay;
+
+    /**
+     * The current delay between connection establishment attempts.
+     */
+    private final Timeout connectionAttemptDelay;
+
+    /**
+     * The delay before resolution is started.
+     */
+    private final Timeout resolution_delay;
+
+    /**
+     * The number of IP addresses of each address family to include in the initial list of
+     * IP addresses to attempt connections to. This value is set to 2 by default, but can be
+     * increased to more aggressively favor a particular address family (e.g. set to 4 for IPv6).
+     */
+    private final int firstAddressFamilyCount;
+
+    /**
+     * The address family to use for establishing connections. This can be set to either
+     * {@link AddressFamily#IPv4} or {@link AddressFamily#IPv6}.
+     */
+    private final AddressFamily addressFamily;
+
+
+    /**
+     * The AddressFamily enum represents the possible address families that can be used when attempting to establish
+     * <p>
+     * connections using the Happy Eyeballs V2 algorithm.
+     *
+     * <p>
+     * The Happy Eyeballs V2 algorithm allows for concurrent connection attempts to be made to different IP addresses,
+     * <p>
+     * so this enum specifies whether connections should be attempted using IPv4 or IPv6 addresses.
+     *
+     * </p>
+     */
+    public enum AddressFamily {
+        IPv4, IPv6
+    }
+
+    /**
+     * Constructs a new {@link HappyEyeballsV2AsyncClientConnectionOperator} with the specified parameters.
+     *
+     * @param tlsStrategyLookup             the lookup object used to retrieve a {@link TlsStrategy} for a given {@link Route}
+     * @param connectionOperator            the underlying {@link AsyncClientConnectionOperator} to use for establishing connections
+     * @param dnsResolver                   the {@link DnsResolver} to use for resolving target hostnames
+     * @param timeout                       the timeout duration for establishing a connection
+     * @param resolution_delay              the configurable delay before subsequent DNS resolution attempts
+     * @param minimumConnectionAttemptDelay the minimum configurable delay between connection attempts
+     * @param maximumConnectionAttemptDelay the maximum configurable delay between connection attempts
+     * @param connectionAttemptDelay        the configurable delay before attempting to establish a connection
+     * @param firstAddressFamilyCount       the number of initial address families to use for establishing a connection
+     * @param addressFamily                 the preferred address family to use for establishing a connection
+     * @throws IllegalArgumentException if {@code firstAddressFamilyCount} is not positive
+     */
+    public HappyEyeballsV2AsyncClientConnectionOperator(final Lookup<TlsStrategy> tlsStrategyLookup,
+                                                        final AsyncClientConnectionOperator connectionOperator,
+                                                        final DnsResolver dnsResolver,
+                                                        final Timeout timeout,
+                                                        final Timeout resolution_delay,
+                                                        final Timeout minimumConnectionAttemptDelay,
+                                                        final Timeout maximumConnectionAttemptDelay,
+                                                        final Timeout connectionAttemptDelay,
+                                                        final int firstAddressFamilyCount,
+                                                        final AddressFamily addressFamily) {
+        this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup");
+        this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
+        this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
+        this.timeout = timeout != null ? timeout : Timeout.ofMilliseconds(250);
+        this.resolution_delay = resolution_delay != null ? resolution_delay : Timeout.ofMilliseconds(50);
+        this.minimumConnectionAttemptDelay = minimumConnectionAttemptDelay != null ? minimumConnectionAttemptDelay : Timeout.ofMilliseconds(100);
+        this.maximumConnectionAttemptDelay = maximumConnectionAttemptDelay != null ? maximumConnectionAttemptDelay : Timeout.ofSeconds(2);
+        this.connectionAttemptDelay = connectionAttemptDelay != null ? connectionAttemptDelay : Timeout.ofMilliseconds(250);
+        this.firstAddressFamilyCount = Args.positive(firstAddressFamilyCount, "firstAddressFamilyCount");
+        this.addressFamily = addressFamily;
+    }
+
+    /**
+     * Constructs a new instance of {@link HappyEyeballsV2AsyncClientConnectionOperator} using the specified
+     * {@link Lookup} for {@link TlsStrategy} and {@link SchemePortResolver} and {@link DnsResolver}.
+     * <p>
+     * The constructor internally creates a new instance of {@link DefaultAsyncClientConnectionOperator} with the
+     * specified {@link Lookup} for {@link TlsStrategy}, {@link SchemePortResolver} and {@link DnsResolver}. The
+     * created {@link AsyncClientConnectionOperator} is then passed to the main constructor along with default values
+     * for other parameters.
+     * </p>
+     *
+     * @param tlsStrategyLookup  The {@link Lookup} for {@link TlsStrategy}.
+     * @param schemePortResolver The {@link SchemePortResolver} to use for resolving scheme ports.
+     * @param dnsResolver        The {@link DnsResolver} to use for resolving hostnames to IP addresses.
+     * @throws IllegalArgumentException if the {@code tlsStrategyLookup} or {@code schemePortResolver} or {@code dnsResolver} parameter is {@code null}.
+     */
+    public HappyEyeballsV2AsyncClientConnectionOperator(
+            final Lookup<TlsStrategy> tlsStrategyLookup,
+            final SchemePortResolver schemePortResolver,
+            final DnsResolver dnsResolver) {
+        this(tlsStrategyLookup,
+                new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
+                dnsResolver,
+                null,
+                null,
+                null,
+                null,
+                null,
+                1,
+                AddressFamily.IPv6);
+    }
+
+    /**
+     * Creates a new instance of {@link HappyEyeballsV2AsyncClientConnectionOperator} using the provided TLS strategy lookup
+     * and scheme-port resolver. The DNS resolver will be set to the system default resolver.
+     *
+     * @param tlsStrategyLookup  The lookup instance for {@link TlsStrategy} to be used for establishing connections.
+     * @param schemePortResolver The resolver instance for mapping scheme names to default port numbers.
+     * @throws IllegalArgumentException if {@code tlsStrategyLookup} is {@code null}.
+     */
+    public HappyEyeballsV2AsyncClientConnectionOperator(
+            final Lookup<TlsStrategy> tlsStrategyLookup,
+            final SchemePortResolver schemePortResolver) {
+        this(tlsStrategyLookup, schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE, null);
+    }
+
+    /**
+     * Creates a new instance of {@link HappyEyeballsV2AsyncClientConnectionOperator} using the provided TLS strategy lookup.
+     * The scheme-port resolver and DNS resolver will be set to their default instances.
+     *
+     * @param tlsStrategyLookup The lookup instance for {@link TlsStrategy} to be used for establishing connections.
+     * @throws IllegalArgumentException if {@code tlsStrategyLookup} is {@code null}.
+     */
+    public HappyEyeballsV2AsyncClientConnectionOperator(
+            final Lookup<TlsStrategy> tlsStrategyLookup) {
+        this(tlsStrategyLookup, DefaultSchemePortResolver.INSTANCE, null);
+    }
+
+
+    /**
+     * Attempts to connect to the given host and returns a Future that will be completed when the connection is established
+     * or when an error occurs. This method may attempt to connect to multiple IP addresses associated with the host,
+     * depending on the address family and the number of connection attempts to execute. The address family and number of
+     * connection attempts can be configured by calling the corresponding setters on this class.
+     *
+     * @param connectionInitiator the connection initiator to use when creating the connection
+     * @param host                the host to connect to
+     * @param localAddress        the local address to bind to when connecting, or null to use any available local address
+     * @param connectTimeout      the timeout to use when connecting, or null to use the default timeout
+     * @param attachment          the attachment to associate with the connection, or null if no attachment is needed
+     * @param callback            the callback to invoke when the connection is established or an error occurs, or null if no callback is needed
+     * @return a Future that will be completed when the connection is established or when an error occurs
+     */
+    @Override
+    public Future<ManagedAsyncClientConnection> connect(
+            final ConnectionInitiator connectionInitiator,
+            final HttpHost host,
+            final SocketAddress localAddress,
+            final Timeout connectTimeout,
+            final Object attachment,
+            final FutureCallback<ManagedAsyncClientConnection> callback) {
+
+        final CompletableFuture<ManagedAsyncClientConnection> connectionFuture = new CompletableFuture<>();
+
+        final Timeout conTimeout = connectTimeout != null ? connectTimeout : timeout;
+
+        resolveDnsAsync(host.getHostName())
+                .thenCompose(inetAddresses -> {
+                    final List<InetAddress> ipv4Addresses = new ArrayList<>();
+                    final List<InetAddress> ipv6Addresses = new ArrayList<>();
+
+                    for (final InetAddress inetAddress : inetAddresses) {
+                        if (inetAddress instanceof Inet4Address) {
+                            ipv4Addresses.add(inetAddress);
+                        } else if (inetAddress instanceof Inet6Address) {
+                            ipv6Addresses.add(inetAddress);
+                        }
+                    }
+
+                    sortAndInterleave(inetAddresses);
+
+                    final List<CompletableFuture<ManagedAsyncClientConnection>> connectionFutures = new ArrayList<>();
+
+                    // Create a list of connection attempts to execute
+                    final List<CompletableFuture<ManagedAsyncClientConnection>> attempts = new ArrayList<>();
+
+                    // Create a list of connection attempts to execute
+                    if (addressFamily == AddressFamily.IPv4 && !ipv4Addresses.isEmpty()) {
+                        for (int i = 0; i < firstAddressFamilyCount && i < ipv4Addresses.size(); i++) {
+                            attempts.add(connectAttempt(connectionInitiator, host, conTimeout, attachment,
+                                    Collections.singletonList(ipv4Addresses.get(i)), localAddress));
+                        }
+                    } else if (addressFamily == AddressFamily.IPv6 && !ipv6Addresses.isEmpty()) {
+                        for (int i = 0; i < firstAddressFamilyCount && i < ipv6Addresses.size(); i++) {
+                            attempts.add(connectAttempt(connectionInitiator, host, conTimeout, attachment,
+                                    Collections.singletonList(ipv6Addresses.get(i)), localAddress));
+                        }
+                    } else {
+                        if (!ipv4Addresses.isEmpty()) {
+                            for (int i = 0; i < firstAddressFamilyCount && i < ipv4Addresses.size(); i++) {
+                                attempts.add(connectAttempt(connectionInitiator, host, conTimeout, attachment,
+                                        Collections.singletonList(ipv4Addresses.get(i)), localAddress));
+                            }
+                        }
+                        if (!ipv6Addresses.isEmpty()) {
+                            for (int i = 0; i < firstAddressFamilyCount && i < ipv6Addresses.size(); i++) {
+                                attempts.add(connectAttempt(connectionInitiator, host, conTimeout, attachment,
+                                        Collections.singletonList(ipv6Addresses.get(i)), localAddress));
+                            }
+                        }
+                    }
+
+                    // Execute the connection attempts concurrently using CompletableFuture.anyOf
+                    return CompletableFuture.anyOf(attempts.toArray(new CompletableFuture[0]))
+                            .thenCompose(result -> {
+                                if (result instanceof ManagedAsyncClientConnection) {
+                                    // If there is a result, cancel all other attempts and complete the connectionFuture
+                                    connectionFutures.forEach(future -> future.cancel(true));
+                                    connectionFuture.complete((ManagedAsyncClientConnection) result);
+                                } else {
+                                    // If there is an exception, complete the connectionFuture exceptionally with the exception
+                                    connectionFuture.completeExceptionally(new ConnectException("Failed to connect to any address for " + host));
+                                }
+                                // Invoke the callback if provided
+                                if (callback != null) {
+                                    connectionFuture.whenComplete((conn, ex) -> {
+                                        if (ex != null) {
+                                            callback.failed(new Exception(ex));
+                                        } else {
+                                            callback.completed(conn);
+                                        }
+                                    });
+                                }
+                                return connectionFuture;
+                            });
+                })
+                .exceptionally(e -> {
+                    connectionFuture.completeExceptionally(e);
+                    if (callback != null) {
+                        callback.failed(new Exception(e));
+                    }
+                    return null;
+                });
+
+        return connectionFuture;
+    }
+
+    /**
+     * Asynchronously resolves the DNS for the given host name and returns a CompletableFuture that will be completed
+     * with an array of InetAddress objects representing the IP addresses of the host.
+     * The resolution of AAAA records is delayed by the configured resolution delay to allow for a chance for A records to be
+     * returned first.
+     *
+     * @param host the host name to resolve DNS for
+     * @return a CompletableFuture that will be completed with an array of InetAddress objects representing the IP addresses
+     */
+    private CompletableFuture<InetAddress[]> resolveDnsAsync(final String host) {
+        final CompletableFuture<InetAddress[]> dnsFuture = new CompletableFuture<>();
+        CompletableFuture.runAsync(() -> {
+            try {
+                final InetAddress[] inetAddresses = dnsResolver.resolve(host);
+                // Introduce a delay before resolving AAAA records after receiving A records
+                resolution_delay.sleep();

Review Comment:
   Instead of using Thread.sleep(), i could use non-blocking methods(ScheduledExecutorService) to delay the execution of the next connection attempt. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@hc.apache.org
For additional commands, e-mail: dev-help@hc.apache.org