You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 14:43:11 UTC
[pulsar] 04/10: [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 837cfbd9a7bb9a5c6836ae851d0b8ee43b5a46f4
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Apr 20 14:11:43 2022 +0300
[Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
- make Netty DNS resolver settings match the JDK DNS caching settings
- with the exception that the max TTL is 60 seconds if DNS max TTL is the default (forever)
- reuse the DNS resolver instance on the Proxy
(cherry picked from commit f5adc17ed52db2da3c1c58ee4abc9b75b73464ac)
---
.../apache/pulsar/client/impl/ConnectionPool.java | 26 ++++++--
pulsar-common/pom.xml | 5 ++
.../pulsar/common/util/netty/DnsResolverUtil.java | 75 ++++++++++++++++++++++
.../pulsar/proxy/server/ProxyConnection.java | 36 ++++++-----
.../pulsar/proxy/server/ProxyConnectionPool.java | 60 -----------------
.../apache/pulsar/proxy/server/ProxyService.java | 3 +
.../proxy/server/ServiceChannelInitializer.java | 2 +-
7 files changed, 126 insertions(+), 81 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 92b4a3cda70..99492784e5f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -35,6 +35,7 @@ import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,13 +63,20 @@ public class ConnectionPool implements AutoCloseable {
private final boolean isSniProxy;
protected final DnsNameResolver dnsResolver;
+ private final boolean shouldCloseDnsResolver;
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+ Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+ this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+ }
+
+ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+ Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver)
+ throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.clientConfig = conf;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -91,8 +100,15 @@ public class ConnectionPool implements AutoCloseable {
throw new PulsarClientException(e);
}
- this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
- .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
+ this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
+ this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup));
+ }
+
+ private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+ DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next())
+ .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+ return dnsNameResolverBuilder.build();
}
private static final Random random = new Random();
@@ -314,7 +330,9 @@ public class ConnectionPool implements AutoCloseable {
@Override
public void close() throws Exception {
closeAllConnections();
- dnsResolver.close();
+ if (shouldCloseDnsResolver) {
+ dnsResolver.close();
+ }
}
private void cleanupConnection(InetSocketAddress address, int connectionKey,
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index e37022a3c74..0d746cd35e3 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -78,6 +78,11 @@
<artifactId>netty-handler</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-resolver-dns</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
new file mode 100644
index 00000000000..8b06dbf36ec
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DnsResolverUtil {
+ private static final int MIN_TTL = 0;
+ private static final int TTL;
+ private static final int NEGATIVE_TTL;
+
+ // default TTL value when JDK setting is "forever" (-1)
+ private static final int DEFAULT_TTL = 60;
+
+ // default negative TTL value when JDK setting is "forever" (-1)
+ private static final int DEFAULT_NEGATIVE_TTL = 10;
+
+ static {
+ int ttl = DEFAULT_TTL;
+ int negativeTtl = DEFAULT_NEGATIVE_TTL;
+ try {
+ // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting
+ // effective JDK settings for DNS caching
+ Class<?> inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy");
+ Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
+ ttl = (Integer) getTTLMethod.invoke(null);
+ Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative");
+ negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
+ } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException
+ | IllegalAccessException e) {
+ log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e);
+ }
+ TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL);
+ NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL);
+ }
+
+ private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) {
+ return ttl < 0 ? defaultTtl : ttl;
+ }
+
+ private DnsResolverUtil() {
+ // utility class with static methods, prevent instantiation
+ }
+
+ /**
+ * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings.
+ * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds.
+ *
+ * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings
+ */
+ public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) {
+ dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
+ dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
+ }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 754e33f96a8..395d16bbbd8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -22,9 +22,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.resolver.dns.DnsNameResolver;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -77,6 +79,7 @@ public class ProxyConnection extends PulsarHandler {
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private final ProxyService service;
+ private final DnsNameResolver dnsNameResolver;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
@@ -124,9 +127,11 @@ public class ProxyConnection extends PulsarHandler {
return connectionPool;
}
- public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
+ public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier,
+ DnsNameResolver dnsNameResolver) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
+ this.dnsNameResolver = dnsNameResolver;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -234,27 +239,26 @@ public class ProxyConnection extends PulsarHandler {
}
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+ Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
- if (this.connectionPool == null) {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
- clientAuthMethod, protocolVersionToAdvertise));
- } else {
- LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
- remoteAddress, state, clientAuthRole);
- }
+ clientCnxSupplier =
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersionToAdvertise);
} else {
- if (this.connectionPool == null) {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
- } else {
- LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
- remoteAddress, state);
- }
+ clientCnxSupplier =
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+ }
+
+ if (this.connectionPool == null) {
+ this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
+ clientCnxSupplier, Optional.of(dnsNameResolver));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+ remoteAddress, state, clientAuthRole);
}
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
deleted file mode 100644
index cd1b31d3434..00000000000
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.EventLoopGroup;
-
-public class ProxyConnectionPool extends ConnectionPool {
- public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
- super(clientConfig, eventLoopGroup, clientCnxSupplier);
- }
-
- @Override
- public void close() throws IOException {
- log.info("Closing ProxyConnectionPool.");
- pool.forEach((address, clientCnxPool) -> {
- if (clientCnxPool != null) {
- clientCnxPool.forEach((identifier, clientCnx) -> {
- if (clientCnx != null && clientCnx.isDone()) {
- try {
- clientCnx.get().close();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Unable to close get client connection future.", e);
- }
- }
- });
- }
- });
- dnsResolver.close();
- }
-
- private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class);
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index c9348837dcf..f24caf943b0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.proxy.stats.TopicStats;
@@ -147,6 +148,8 @@ public class ProxyService implements Closeable {
DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
.channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+
dnsNameResolver = dnsNameResolverBuilder.build();
brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index a033a87912d..4aae1196fb6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -159,7 +159,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
}
ch.pipeline().addLast("handler",
- new ProxyConnection(proxyService, sslHandlerSupplier));
+ new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver()));
}
}