You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/29 14:21:53 UTC
[pulsar] 01/03: [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af741a79383466a9966621eddb8c11b705627478
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Apr 20 14:47:25 2022 +0300
[Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
* Align Netty DNS resolver cache settings with Java DNS cache settings
- Netty DNS resolver caches forever by default
- this could cause problems with Kubernetes deployments
* Share Netty DNSNameResolver in proxy
* Remove overriding of ConnectionPool.close since it's not necessary
* Address review comment: remove ProxyConnectionPool
(cherry picked from commit 39b186269c6ccdfa94412a28cc3719a4338368ac)
---
.../apache/pulsar/client/impl/ConnectionPool.java | 26 ++++++--
pulsar-common/pom.xml | 5 ++
.../pulsar/common/util/netty/DnsResolverUtil.java | 75 ++++++++++++++++++++++
.../pulsar/proxy/server/ProxyConnection.java | 36 ++++++-----
.../pulsar/proxy/server/ProxyConnectionPool.java | 60 -----------------
.../apache/pulsar/proxy/server/ProxyService.java | 3 +
.../proxy/server/ServiceChannelInitializer.java | 2 +-
7 files changed, 126 insertions(+), 81 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index a0ffe5d3feb..7a077b2e45b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -36,6 +36,7 @@ import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +48,7 @@ import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,13 +64,20 @@ public class ConnectionPool implements AutoCloseable {
private final boolean isSniProxy;
protected final DnsNameResolver dnsResolver;
+ private final boolean shouldCloseDnsResolver;
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+ Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+ this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+ }
+
+ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+ Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver)
+ throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.clientConfig = conf;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -92,8 +101,15 @@ public class ConnectionPool implements AutoCloseable {
throw new PulsarClientException(e);
}
- this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
- .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
+ this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
+ this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup));
+ }
+
+ private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+ DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next())
+ .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+ return dnsNameResolverBuilder.build();
}
private static final Random random = new Random();
@@ -312,7 +328,9 @@ public class ConnectionPool implements AutoCloseable {
@Override
public void close() throws Exception {
closeAllConnections();
- dnsResolver.close();
+ if (shouldCloseDnsResolver) {
+ dnsResolver.close();
+ }
}
private void cleanupConnection(InetSocketAddress address, int connectionKey,
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 8855a634c81..2fcd625d808 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -72,6 +72,11 @@
<artifactId>netty-handler</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-resolver-dns</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
new file mode 100644
index 00000000000..8b06dbf36ec
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DnsResolverUtil {
+ private static final int MIN_TTL = 0;
+ private static final int TTL;
+ private static final int NEGATIVE_TTL;
+
+ // default TTL value when JDK setting is "forever" (-1)
+ private static final int DEFAULT_TTL = 60;
+
+ // default negative TTL value when JDK setting is "forever" (-1)
+ private static final int DEFAULT_NEGATIVE_TTL = 10;
+
+ static {
+ int ttl = DEFAULT_TTL;
+ int negativeTtl = DEFAULT_NEGATIVE_TTL;
+ try {
+ // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting
+ // effective JDK settings for DNS caching
+ Class<?> inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy");
+ Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
+ ttl = (Integer) getTTLMethod.invoke(null);
+ Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative");
+ negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
+ } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException
+ | IllegalAccessException e) {
+ log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e);
+ }
+ TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL);
+ NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL);
+ }
+
+ private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) {
+ return ttl < 0 ? defaultTtl : ttl;
+ }
+
+ private DnsResolverUtil() {
+ // utility class with static methods, prevent instantiation
+ }
+
+ /**
+ * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings.
+ * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds.
+ *
+ * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings
+ */
+ public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) {
+ dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
+ dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
+ }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 29060faeff2..5d81c4b803a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -22,9 +22,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.resolver.dns.DnsNameResolver;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -77,6 +79,7 @@ public class ProxyConnection extends PulsarHandler {
private final AtomicLong requestIdGenerator =
new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private final ProxyService service;
+ private final DnsNameResolver dnsNameResolver;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
@@ -125,9 +128,11 @@ public class ProxyConnection extends PulsarHandler {
return connectionPool;
}
- public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
+ public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier,
+ DnsNameResolver dnsNameResolver) {
super(30, TimeUnit.SECONDS);
this.service = proxyService;
+ this.dnsNameResolver = dnsNameResolver;
this.state = State.Init;
this.sslHandlerSupplier = sslHandlerSupplier;
this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -235,27 +240,26 @@ public class ProxyConnection extends PulsarHandler {
}
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+ Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
- if (this.connectionPool == null) {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
- clientAuthMethod, protocolVersionToAdvertise));
- } else {
- LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
- remoteAddress, state, clientAuthRole);
- }
+ clientCnxSupplier =
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersionToAdvertise);
} else {
- if (this.connectionPool == null) {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
- } else {
- LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
- remoteAddress, state);
- }
+ clientCnxSupplier =
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+ }
+
+ if (this.connectionPool == null) {
+ this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
+ clientCnxSupplier, Optional.of(dnsNameResolver));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+ remoteAddress, state, clientAuthRole);
}
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
deleted file mode 100644
index cd1b31d3434..00000000000
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.EventLoopGroup;
-
-public class ProxyConnectionPool extends ConnectionPool {
- public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
- Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
- super(clientConfig, eventLoopGroup, clientCnxSupplier);
- }
-
- @Override
- public void close() throws IOException {
- log.info("Closing ProxyConnectionPool.");
- pool.forEach((address, clientCnxPool) -> {
- if (clientCnxPool != null) {
- clientCnxPool.forEach((identifier, clientCnx) -> {
- if (clientCnx != null && clientCnx.isDone()) {
- try {
- clientCnx.get().close();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Unable to close get client connection future.", e);
- }
- }
- });
- }
- });
- dnsResolver.close();
- }
-
- private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class);
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 302ceba044d..31c07299bf3 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -155,6 +156,8 @@ public class ProxyService implements Closeable {
DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
.channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+ DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+
dnsNameResolver = dnsNameResolverBuilder.build();
brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 12abf871b50..4f423c1f5d5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -174,7 +174,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
}
ch.pipeline().addLast("handler",
- new ProxyConnection(proxyService, sslHandlerSupplier));
+ new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver()));
}
}