You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/29 14:28:51 UTC

[pulsar] branch branch-2.9 updated (2fbdb174718 -> 278531f2f92)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 2fbdb174718 [improve][broker] Support shrink for ConcurrentSortedLongPairSet  (#15354)
     new 687911344a5 [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
     new 5a94b19fdfd [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
     new 278531f2f92 [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/client/impl/ConnectionPool.java  |  26 +++-
 pulsar-common/pom.xml                              |   5 +
 .../pulsar/common/protocol/PulsarHandler.java      |  18 ++-
 .../pulsar/common/util/netty/DnsResolverUtil.java  |  75 +++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 +--
 .../pulsar/proxy/server/ProxyConnection.java       | 139 +++++++++++++++------
 .../pulsar/proxy/server/ProxyConnectionPool.java   |  60 ---------
 .../apache/pulsar/proxy/server/ProxyService.java   |   3 +
 .../proxy/server/ServiceChannelInitializer.java    |   2 +-
 9 files changed, 234 insertions(+), 113 deletions(-)
 create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 delete mode 100644 pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java


[pulsar] 02/03: [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5a94b19fdfdb6f7a72409d38ff4b718713786dca
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 29 14:59:32 2022 +0300

    [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress (#15366)
    
    * [Proxy] Fix proxy connection leak when inbound connection closes while connecting is in progress
    
    * Add logging when execution is rejected
    
    (cherry picked from commit 4ae070c5b26be3bf92c007ca309bd4adbae5ce93)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  19 ++--
 .../pulsar/proxy/server/ProxyConnection.java       | 103 ++++++++++++++++-----
 2 files changed, 92 insertions(+), 30 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index e04ac1e4ae8..24802f60a3d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -80,9 +80,7 @@ public class DirectProxyHandler {
     private final ProxyService service;
     private final Runnable onHandshakeCompleteAction;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String brokerHostAndPort,
-                              InetSocketAddress targetBrokerAddress, int protocolVersion,
-                              Supplier<SslHandler> sslHandlerSupplier) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -92,6 +90,10 @@ public class DirectProxyHandler {
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
         this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
+    }
+
+    public void connect(String brokerHostAndPort, InetSocketAddress targetBrokerAddress,
+                           int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -208,6 +210,12 @@ public class DirectProxyHandler {
             (byte) 'Y',
     };
 
+    public void close() {
+        if (outboundChannel != null) {
+            outboundChannel.close();
+        }
+    }
+
     enum BackendState {
         Init, HandshakeCompleted
     }
@@ -344,10 +352,7 @@ public class DirectProxyHandler {
             onHandshakeCompleteAction.run();
             startDirectProxying(connected);
 
-            int maxMessageSize =
-                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            proxyConnection.brokerConnected(DirectProxyHandler.this, connected);
         }
 
         private void startDirectProxying(CommandConnected connected) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5d81c4b803a..39870f62af8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,14 +19,16 @@
 package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
+import static com.google.common.base.Preconditions.checkState;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.resolver.dns.DnsNameResolver;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +52,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnect;
+import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.CommandGetSchema;
@@ -114,6 +117,9 @@ public class ProxyConnection extends PulsarHandler {
         // Follow redirects
         ProxyLookupRequests,
 
+        // Connecting to the broker
+        ProxyConnectingToBroker,
+
         // If we are proxying a connection to a specific broker, we
         // are just forwarding data between the 2 connections, without
         // looking into it
@@ -167,8 +173,8 @@ public class ProxyConnection extends PulsarHandler {
     public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
-            directProxyHandler.outboundChannel.close();
+        if (directProxyHandler != null) {
+            directProxyHandler.close();
             directProxyHandler = null;
         }
 
@@ -189,11 +195,22 @@ public class ProxyConnection extends PulsarHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        state = State.Closing;
         super.exceptionCaught(ctx, cause);
-        LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
+        LOG.warn("[{}] Got exception {} : Message: {} State: {}", remoteAddress, cause.getClass().getSimpleName(),
+                cause.getMessage(), state,
                 ClientCnx.isKnownException(cause) ? null : cause);
-        ctx.close();
+        if (state != State.Closed) {
+            state = State.Closing;
+        }
+        if (ctx.channel().isOpen()) {
+            ctx.close();
+        } else {
+            // close connection to broker if that is present
+            if (directProxyHandler != null) {
+                directProxyHandler.close();
+                directProxyHandler = null;
+            }
+        }
     }
 
     @Override
@@ -222,18 +239,26 @@ public class ProxyConnection extends PulsarHandler {
             break;
 
         case ProxyConnectionToBroker:
-            // Pass the buffer to the outbound connection and schedule next read
-            // only if we can write on the connection
-            ProxyService.opsCounter.inc();
-            if (msg instanceof ByteBuf) {
-                int bytes = ((ByteBuf) msg).readableBytes();
-                directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
-                ProxyService.bytesCounter.inc(bytes);
+            if (directProxyHandler != null) {
+                ProxyService.opsCounter.inc();
+                if (msg instanceof ByteBuf) {
+                    int bytes = ((ByteBuf) msg).readableBytes();
+                    directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+                    ProxyService.bytesCounter.inc(bytes);
+                }
+                directProxyHandler.outboundChannel.writeAndFlush(msg)
+                        .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            } else {
+                LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+                                + "Dropping the input message (readable bytes={}).", msg.getClass(), state,
+                        msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
             }
-            directProxyHandler.outboundChannel.writeAndFlush(msg)
-                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
             break;
-
+        case ProxyConnectingToBroker:
+            LOG.warn("Received message of type {} while connecting to broker. "
+                            + "Dropping the input message (readable bytes={}).", msg.getClass(),
+                    msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
+            break;
         default:
             break;
         }
@@ -279,14 +304,9 @@ public class ProxyConnection extends PulsarHandler {
                 return;
             }
 
+            state = State.ProxyConnectingToBroker;
             brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
-                    .thenAcceptAsync(address -> {
-                        // Client already knows which broker to connect. Let's open a
-                        // connection there and just pass bytes in both directions
-                        state = State.ProxyConnectionToBroker;
-                        directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
-                                protocolVersionToAdvertise, sslHandlerSupplier);
-                    }, ctx.executor())
+                    .thenAcceptAsync(this::connectToBroker, ctx.executor())
                     .exceptionally(throwable -> {
                         if (throwable instanceof TargetAddressDeniedException
                                 || throwable.getCause() instanceof TargetAddressDeniedException) {
@@ -319,6 +339,43 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void handleBrokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen() && this.directProxyHandler == null) {
+            this.directProxyHandler = directProxyHandler;
+            state = State.ProxyConnectionToBroker;
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        } else {
+            LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+                            + "Closing connection to broker '{}'.",
+                    remoteAddress, ctx.channel().isOpen() ? "open" : "already closed",
+                    state != State.ProxyConnectingToBroker ? "invalid state " + state : "state " + state,
+                    proxyToBrokerUrl);
+            directProxyHandler.close();
+            ctx.close();
+        }
+    }
+
+    private void connectToBroker(InetSocketAddress brokerAddress) {
+        checkState(ctx.executor().inEventLoop(), "This method should be called in the event loop");
+        DirectProxyHandler directProxyHandler = new DirectProxyHandler(service, this);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
+                protocolVersionToAdvertise, sslHandlerSupplier);
+    }
+
+    public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
+        try {
+            final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
+            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+        } catch (RejectedExecutionException e) {
+            LOG.error("Event loop was already closed. Closing broker connection.", e);
+            directProxyHandler.close();
+        }
+    }
+
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);


[pulsar] 03/03: [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 278531f2f923c1c0c31364c804e6650a0628dafa
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 29 17:07:11 2022 +0300

    [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382)
    
    * [Broker/Client] Close connection if a ping message cannot be sent
    
    - the connection should be closed if a ping message cannot be sent
    
    * Handle write errors for pong messages
    
    (cherry picked from commit 2ddef95f31ce37486f3f76b4d59730361a77bf6e)
---
 .../apache/pulsar/common/protocol/PulsarHandler.java   | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 48e7ffa7b88..6a0b7bb1fa5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -83,7 +83,14 @@ public abstract class PulsarHandler extends PulsarDecoder {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Replying back to ping message", ctx.channel());
         }
-        ctx.writeAndFlush(Commands.newPong());
+        ctx.writeAndFlush(Commands.newPong())
+                .addListener(future -> {
+                    if (!future.isSuccess()) {
+                        log.warn("[{}] Forcing connection to close since cannot send a pong message.",
+                                ctx.channel(), future.cause());
+                        ctx.close();
+                    }
+                });
     }
 
     @Override
@@ -110,7 +117,14 @@ public abstract class PulsarHandler extends PulsarDecoder {
                 log.debug("[{}] Sending ping message", ctx.channel());
             }
             waitingForPingResponse = true;
-            ctx.writeAndFlush(Commands.newPing());
+            ctx.writeAndFlush(Commands.newPing())
+                    .addListener(future -> {
+                        if (!future.isSuccess()) {
+                            log.warn("[{}] Forcing connection to close since cannot send a ping message.",
+                                    ctx.channel(), future.cause());
+                            ctx.close();
+                        }
+                    });
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Peer doesn't support keep-alive", ctx.channel());


[pulsar] 01/03: [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 687911344a5477462cfcc905611b648c13521825
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Fri Apr 29 17:24:15 2022 +0300

    [Proxy & Client] Configure Netty DNS resolver to match JDK DNS caching setting, share DNS resolver instance in Proxy (#15219)
    
    * Align Netty DNS resolver cache settings with Java DNS cache settings
    
    - Netty DNS resolver caches forever by default
      - this could cause problems with Kubernetes deployments
    
    * Share Netty DNSNameResolver in proxy
    
    * Remove overriding of ConnectionPool.close since it's not necessary
    
    * Address review comment: remove ProxyConnectionPool
    
    (cherry picked from commit 39b186269c6ccdfa94412a28cc3719a4338368ac)
---
 .../apache/pulsar/client/impl/ConnectionPool.java  | 26 ++++++--
 pulsar-common/pom.xml                              |  5 ++
 .../pulsar/common/util/netty/DnsResolverUtil.java  | 75 ++++++++++++++++++++++
 .../pulsar/proxy/server/ProxyConnection.java       | 36 ++++++-----
 .../pulsar/proxy/server/ProxyConnectionPool.java   | 60 -----------------
 .../apache/pulsar/proxy/server/ProxyService.java   |  3 +
 .../proxy/server/ServiceChannelInitializer.java    |  2 +-
 7 files changed, 126 insertions(+), 81 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 3df52eaed21..59aaac0477b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -36,6 +36,7 @@ import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,6 +48,7 @@ import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,13 +64,20 @@ public class ConnectionPool implements AutoCloseable {
     private final boolean isSniProxy;
 
     protected final DnsNameResolver dnsResolver;
+    private final boolean shouldCloseDnsResolver;
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
         this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
     }
 
     public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
-            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+                          Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
+        this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+    }
+
+    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+                             Supplier<ClientCnx> clientCnxSupplier, Optional<DnsNameResolver> dnsNameResolver)
+            throws PulsarClientException {
         this.eventLoopGroup = eventLoopGroup;
         this.clientConfig = conf;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -92,8 +101,15 @@ public class ConnectionPool implements AutoCloseable {
             throw new PulsarClientException(e);
         }
 
-        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
-                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
+        this.shouldCloseDnsResolver = !dnsNameResolver.isPresent();
+        this.dnsResolver = dnsNameResolver.orElseGet(() -> createDnsNameResolver(conf, eventLoopGroup));
+    }
+
+    private static DnsNameResolver createDnsNameResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(eventLoopGroup.next())
+                .traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+        return dnsNameResolverBuilder.build();
     }
 
     private static final Random random = new Random();
@@ -315,7 +331,9 @@ public class ConnectionPool implements AutoCloseable {
     @Override
     public void close() throws Exception {
         closeAllConnections();
-        dnsResolver.close();
+        if (shouldCloseDnsResolver) {
+            dnsResolver.close();
+        }
     }
 
     private void cleanupConnection(InetSocketAddress address, int connectionKey,
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 42c823a7f86..e8b5c595790 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -72,6 +72,11 @@
       <artifactId>netty-handler</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-resolver-dns</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-transport-native-epoll</artifactId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
new file mode 100644
index 00000000000..8b06dbf36ec
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DnsResolverUtil {
+    private static final int MIN_TTL = 0;
+    private static final int TTL;
+    private static final int NEGATIVE_TTL;
+
+    // default TTL value when JDK setting is "forever" (-1)
+    private static final int DEFAULT_TTL = 60;
+
+    // default negative TTL value when JDK setting is "forever" (-1)
+    private static final int DEFAULT_NEGATIVE_TTL = 10;
+
+    static {
+        int ttl = DEFAULT_TTL;
+        int negativeTtl = DEFAULT_NEGATIVE_TTL;
+        try {
+            // use reflection to call sun.net.InetAddressCachePolicy's get and getNegative methods for getting
+            // effective JDK settings for DNS caching
+            Class<?> inetAddressCachePolicyClass = Class.forName("sun.net.InetAddressCachePolicy");
+            Method getTTLMethod = inetAddressCachePolicyClass.getMethod("get");
+            ttl = (Integer) getTTLMethod.invoke(null);
+            Method getNegativeTTLMethod = inetAddressCachePolicyClass.getMethod("getNegative");
+            negativeTtl = (Integer) getNegativeTTLMethod.invoke(null);
+        } catch (NoSuchMethodException | ClassNotFoundException | InvocationTargetException
+                 | IllegalAccessException e) {
+            log.warn("Cannot get DNS TTL settings from sun.net.InetAddressCachePolicy class", e);
+        }
+        TTL = useDefaultTTLWhenSetToForever(ttl, DEFAULT_TTL);
+        NEGATIVE_TTL = useDefaultTTLWhenSetToForever(negativeTtl, DEFAULT_NEGATIVE_TTL);
+    }
+
+    private static int useDefaultTTLWhenSetToForever(int ttl, int defaultTtl) {
+        return ttl < 0 ? defaultTtl : ttl;
+    }
+
+    private DnsResolverUtil() {
+        // utility class with static methods, prevent instantiation
+    }
+
+    /**
+     * Configure Netty's {@link DnsNameResolverBuilder}'s ttl and negativeTtl to match the JDK's DNS caching settings.
+     * If the JDK setting for TTL is forever (-1), the TTL will be set to 60 seconds.
+     *
+     * @param dnsNameResolverBuilder The Netty {@link DnsNameResolverBuilder} instance to apply the settings
+     */
+    public static void applyJdkDnsCacheSettings(DnsNameResolverBuilder dnsNameResolverBuilder) {
+        dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
+        dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 29060faeff2..5d81c4b803a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -22,9 +22,11 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import io.netty.channel.ChannelFutureListener;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.resolver.dns.DnsNameResolver;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -77,6 +79,7 @@ public class ProxyConnection extends PulsarHandler {
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
     private final ProxyService service;
+    private final DnsNameResolver dnsNameResolver;
     AuthenticationDataSource authenticationData;
     private State state;
     private final Supplier<SslHandler> sslHandlerSupplier;
@@ -125,9 +128,11 @@ public class ProxyConnection extends PulsarHandler {
         return connectionPool;
     }
 
-    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
+    public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier,
+                           DnsNameResolver dnsNameResolver) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
+        this.dnsNameResolver = dnsNameResolver;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
@@ -235,27 +240,26 @@ public class ProxyConnection extends PulsarHandler {
     }
 
     private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+        Supplier<ClientCnx> clientCnxSupplier;
         if (service.getConfiguration().isAuthenticationEnabled()) {
             if (service.getConfiguration().isForwardAuthorizationCredentials()) {
                 this.clientAuthData = clientData;
                 this.clientAuthMethod = authMethod;
             }
-            if (this.connectionPool == null) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                                clientAuthMethod, protocolVersionToAdvertise));
-            } else {
-                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
-                        remoteAddress, state, clientAuthRole);
-            }
+            clientCnxSupplier =
+                    () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                            clientAuthMethod, protocolVersionToAdvertise);
         } else {
-            if (this.connectionPool == null) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
-            } else {
-                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
-                        remoteAddress, state);
-            }
+            clientCnxSupplier =
+                    () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+        }
+
+        if (this.connectionPool == null) {
+            this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
+                    clientCnxSupplier, Optional.of(dnsNameResolver));
+        } else {
+            LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+                    remoteAddress, state, clientAuthRole);
         }
 
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
deleted file mode 100644
index cd1b31d3434..00000000000
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnectionPool.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ClientCnx;
-import org.apache.pulsar.client.impl.ConnectionPool;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.channel.EventLoopGroup;
-
-public class ProxyConnectionPool extends ConnectionPool {
-    public ProxyConnectionPool(ClientConfigurationData clientConfig, EventLoopGroup eventLoopGroup,
-            Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
-        super(clientConfig, eventLoopGroup, clientCnxSupplier);
-    }
-
-    @Override
-    public void close() throws IOException {
-        log.info("Closing ProxyConnectionPool.");
-        pool.forEach((address, clientCnxPool) -> {
-            if (clientCnxPool != null) {
-                clientCnxPool.forEach((identifier, clientCnx) -> {
-                    if (clientCnx != null && clientCnx.isDone()) {
-                        try {
-                            clientCnx.get().close();
-                        } catch (InterruptedException | ExecutionException e) {
-                            log.error("Unable to close get client connection future.", e);
-                        }
-                    }
-                });
-            }
-        });
-        dnsResolver.close();
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(ProxyConnectionPool.class);
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index abefe49f059..0f4ea152b50 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -153,6 +154,8 @@ public class ProxyService implements Closeable {
 
         DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
                 .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+
         dnsNameResolver = dnsNameResolverBuilder.build();
 
         brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 12abf871b50..4f423c1f5d5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -174,7 +174,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
         }
 
         ch.pipeline().addLast("handler",
-                new ProxyConnection(proxyService, sslHandlerSupplier));
+                new ProxyConnection(proxyService, sslHandlerSupplier, proxyService.getDnsNameResolver()));
 
     }
 }