You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:11 UTC

[pulsar] 04/10: [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 837cfbd9a7bb9a5c6836ae851d0b8ee43b5a46f4
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Apr 20 14:11:43 2022 +0300

    [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
    
    - make Netty DNS resolver settings match the JDK DNS caching settings
      - with the exception that the max TTL is 60 seconds if DNS max TTL is the default (forever)
    - reuse the DNS resolver instance on the Proxy
    
    (cherry picked from commit f5adc17ed52db2da3c1c58ee4abc9b75b73464ac)
---
 .../apache/pulsar/client/impl/ConnectionPool.java  | 26 ++++++--
 pulsar-common/pom.xml                              |  5 ++
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 75 ++++++++++++++++++++++
 .../pulsar/proxy/server/ProxyConnection.java       | 36 ++++++-----
 .../pulsar/proxy/server/ProxyConnectionPool.java   | 60 -----------------
 .../apache/pulsar/proxy/server/ProxyService.java   |  3 +
 .../proxy/server/ServiceChannelInitializer.java    |  2 +-
 7 files changed, 126 insertions(+), 81 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 92b4a3cda70..99492784e5f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -35,6 +35,7 @@ import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,13 +63,20 @@ public class ConnectionPool implements AutoCloseable {
     private final boolean isSniProxy;
 
     protected final DnsNameResolver dnsResolver;
+    private final boolean shouldCloseDnsResolver;
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
         this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
     }
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
-            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+                          Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+        this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+    }
+
+    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+                             Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver)
+            throws PulsarClientException {
         this.eventLoopGroup = eventLoopGroup;
         this.clientConfig = conf;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -91,8 +100,15 @@ public class ConnectionPool implements AutoCloseable {
             throw new PulsarClientException(e);
         }
 
-        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
-                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
+        this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
+        this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup));
+    }
+
+    private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next())
+                .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+        return dnsNameResolverBuilder.build();
     }
 
     private static final Random random = new Random();
@@ -314,7 +330,9 @@ public class ConnectionPool implements AutoCloseable {
     @Override
     public void close() throws Exception {
         closeAllConnections();
-        dnsResolver.close();
+        if (shouldCloseDnsResolver) {
+            dnsResolver.close();
+        }
     }
 
     private void cleanupConnection(InetSocketAddress address, int connectionKey,
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index e37022a3c74..0d746cd35e3 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -78,6 +78,11 @@
       <artifactId>netty-handler</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-resolver-dns</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-epoll</artifactId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
new file mode 100644
index 00000000000..8b06dbf36ec
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DnsResolverUtil {
+    private static final int MIN_TTL = 0;
+    private static final int TTL;
+    private static final int NEGATIVE_TTL;
+
+    // default TTL value when JDK setting is "forever" (-1)
+    private static final int DEFAULT_TTL = 60;
+
+    // default negative TTL value when JDK setting is "forever" (-1)
+    private static final int DEFAULT_NEGATIVE_TTL = 10;
+
+    static {
+        int ttl = DEFAULT_TTL;
+        int negativeTtl = DEFAULT_NEGATIVE_TTL;
+        try {
+            // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting
+            // effective JDK settings for DNS caching
+            Class<?> inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy");
+            Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
+            ttl = (Integer) getTTLMethod.invoke(null);
+            Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative");
+            negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
+        } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException
+                 | IllegalAccessException e) {
+            log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e);
+        }
+        TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL);
+        NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL);
+    }
+
+    private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) {
+        return ttl < 0 ? defaultTtl : ttl;
+    }
+
+    private DnsResolverUtil() {
+        // utility class with static methods, prevent instantiation
+    }
+
+    /**
+     * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings.
+     * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds.
+     *
+     * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings
+     */
+    public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) {
+        dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
+        dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 754e33f96a8..395d16bbbd8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -22,9 +22,11 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.resolver.dns.DnsNameResolver;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -77,6 +79,7 @@ public class ProxyConnection extends PulsarHandler {
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
     private final ProxyService service;
+    private final DnsNameResolver dnsNameResolver;
     private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
     private State state;
@@ -124,9 +127,11 @@ public class ProxyConnection extends PulsarHandler {
         return connectionPool;
     }
 
-    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
+    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier,
+                           DnsNameResolver dnsNameResolver) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
+        this.dnsNameResolver = dnsNameResolver;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -234,27 +239,26 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+        Supplier<ClientCnx> clientCnxSupplier;
         if (service.getConfiguration().isAuthenticationEnabled()) {
             if (service.getConfiguration().isForwardAuthorizationCredentials()) {
                 this.clientAuthData = clientData;
                 this.clientAuthMethod = authMethod;
             }
-            if (this.connectionPool == null) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                                clientAuthMethod, protocolVersionToAdvertise));
-            } else {
-                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
-                        remoteAddress, state, clientAuthRole);
-            }
+            clientCnxSupplier =
+                    () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                            clientAuthMethod, protocolVersionToAdvertise);
         } else {
-            if (this.connectionPool == null) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
-            } else {
-                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
-                        remoteAddress, state);
-            }
+            clientCnxSupplier =
+                    () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+        }
+
+        if (this.connectionPool == null) {
+            this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
+                    clientCnxSupplier, Optional.of(dnsNameResolver));
+        } else {
+            LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+                    remoteAddress, state, clientAuthRole);
         }
 
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
deleted file mode 100644
index cd1b31d3434..00000000000
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.EventLoopGroup;
-
-public class ProxyConnectionPool extends ConnectionPool {
-    public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
-            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
-        super(clientConfig, eventLoopGroup, clientCnxSupplier);
-    }
-
-    @Override
-    public void close() throws IOException {
-        log.info("Closing ProxyConnectionPool.");
-        pool.forEach((address, clientCnxPool) -> {
-            if (clientCnxPool != null) {
-                clientCnxPool.forEach((identifier, clientCnx) -> {
-                    if (clientCnx != null && clientCnx.isDone()) {
-                        try {
-                            clientCnx.get().close();
-                        } catch (InterruptedException | ExecutionException e) {
-                            log.error("Unable to close get client connection future.", e);
-                        }
-                    }
-                });
-            }
-        });
-        dnsResolver.close();
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class);
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index c9348837dcf..f24caf943b0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.proxy.stats.TopicStats;
@@ -147,6 +148,8 @@ public class ProxyService implements Closeable {
 
         DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
                 .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+
         dnsNameResolver = dnsNameResolverBuilder.build();
 
         brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index a033a87912d..4aae1196fb6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -159,7 +159,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
         }
 
         ch.pipeline().addLast("handler",
-                new ProxyConnection(proxyService, sslHandlerSupplier));
+                new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver()));
 
     }
 }