You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/09 18:30:23 UTC

[pulsar] 02/02: [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3bac91a74ad0e9358c0d5c12f87b89166276c67
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Feb 9 20:24:37 2022 +0200

    [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)
    
    (cherry picked from commit 640b4e6ec14d9b812da608037c58b664e6778637)
    (cherry picked from commit 3d2e6ce84b6e69667a1c2095b766d9941a258b61)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |   1 +
 .../ProxySaslAuthenticationTest.java               |   1 +
 .../pulsar/common/protocol/PulsarHandler.java      |   2 +-
 pulsar-proxy/pom.xml                               |  13 +-
 .../proxy/server/BrokerDiscoveryProvider.java      |  10 +
 .../pulsar/proxy/server/BrokerProxyValidator.java  | 204 +++++++++++++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    | 148 ++++++++-------
 .../pulsar/proxy/server/ProxyConfiguration.java    |  39 ++++
 .../pulsar/proxy/server/ProxyConnection.java       | 101 ++++++++--
 .../apache/pulsar/proxy/server/ProxyService.java   |  17 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   3 +
 .../proxy/server/ServiceChannelInitializer.java    |   8 +
 .../proxy/server/TargetAddressDeniedException.java |  26 +++
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |   1 +
 .../proxy/server/BrokerProxyValidatorTest.java     | 102 +++++++++++
 .../proxy/server/ProxyAdditionalServletTest.java   |   1 +
 .../ProxyAuthenticatedProducerConsumerTest.java    |   1 +
 .../proxy/server/ProxyAuthenticationTest.java      |   1 +
 .../pulsar/proxy/server/ProxyConnectionTest.java   |  38 ++++
 .../server/ProxyConnectionThrottlingTest.java      |   1 +
 .../server/ProxyEnableHAProxyProtocolTest.java     |   1 +
 .../proxy/server/ProxyForwardAuthDataTest.java     |   1 +
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java |   1 +
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    |   1 +
 .../proxy/server/ProxyLookupThrottlingTest.java    |   1 +
 .../pulsar/proxy/server/ProxyParserTest.java       |   1 +
 .../proxy/server/ProxyRolesEnforcementTest.java    |   1 +
 .../proxy/server/ProxyServiceStarterTest.java      |  14 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  15 +-
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |   1 +
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |   1 +
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |   1 +
 .../pulsar/proxy/server/ProxyTlsTestWithAuth.java  |   1 +
 .../server/ProxyWithAuthorizationNegTest.java      |   1 +
 .../proxy/server/ProxyWithAuthorizationTest.java   |   2 +
 .../server/ProxyWithJwtAuthorizationTest.java      |   1 +
 .../server/ProxyWithoutServiceDiscoveryTest.java   |   1 +
 .../SuperUserAuthedAdminProxyHandlerTest.java      |   1 +
 .../server/UnauthedAdminProxyHandlerTest.java      |   1 +
 40 files changed, 682 insertions(+), 85 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 9356988..c0b9856 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -512,6 +512,8 @@ The Apache Software License, Version 2.0
     - io.vertx-vertx-web-3.5.3.jar
   * Apache ZooKeeper
     - org.apache.zookeeper-zookeeper-jute-3.5.9.jar
+  * IPAddress
+    - com.github.seancfoley-ipaddress-5.3.3.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
diff --git a/pom.xml b/pom.xml
index aee8f00..f8a7689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,7 @@ flexible messaging model and an intuitive client API.</description>
     <kubernetesclient.version>9.0.2</kubernetesclient.version>
     <nsq-client.version>1.0</nsq-client.version>
     <docker-java.version>3.2.7</docker-java.version>
+    <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
 
     <!-- test dependencies -->
     <cassandra.version>3.6.0</cassandra.version>
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index ae8b925..f27503a 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -223,6 +223,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 		ProxyConfiguration proxyConfig = new ProxyConfiguration();
 		proxyConfig.setAuthenticationEnabled(true);
 		proxyConfig.setServicePort(Optional.of(0));
+		proxyConfig.setBrokerProxyAllowedTargetPorts("*");
 		proxyConfig.setWebServicePort(Optional.of(0));
 		proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 		proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 7a8d5c0..ea16251 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -112,7 +112,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
         }
     }
 
-    protected void cancelKeepAliveTask() {
+    public void cancelKeepAliveTask() {
         if (keepAliveTask != null) {
             keepAliveTask.cancel(false);
             keepAliveTask = null;
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index e1bfdf5..f3ab04d 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -19,7 +19,7 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
@@ -177,5 +177,16 @@
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.seancfoley</groupId>
+      <artifactId>ipaddress</artifactId>
+      <version>${seancfoley.ipaddress.version}</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index 57a7a02..3933880 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.pulsar.proxy.server.util.ZookeeperCacheLoader;
 import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -83,6 +84,15 @@ public class BrokerDiscoveryProvider implements Closeable {
     }
 
     /**
+     * Access the list of available brokers.
+     * @return the list of available brokers
+     * @throws PulsarServerException
+     */
+    public List<? extends ServiceLookupData> getAvailableBrokers() throws PulsarServerException {
+        return localZkCache.getAvailableBrokers();
+    }
+
+    /**
      * Find next broker {@link LoadManagerReport} in round-robin fashion.
      *
      * @return
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
new file mode 100644
index 0000000..3c49653
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import inet.ipaddr.ipv4.IPv4Address;
+import inet.ipaddr.ipv6.IPv6Address;
+import io.netty.resolver.AddressResolver;
+import io.netty.util.concurrent.Future;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BrokerProxyValidator {
+    private static final String SEPARATOR = "\\s*,\\s*";
+    private static final String ALLOW_ANY = "*";
+    private final int[] allowedTargetPorts;
+    private final boolean allowAnyTargetPort;
+    private final List<IPAddress> allowedIPAddresses;
+    private final boolean allowAnyIPAddress;
+    private final AddressResolver<InetSocketAddress> inetSocketAddressResolver;
+    private final List<Pattern> allowedHostNames;
+    private final boolean allowAnyHostName;
+
+    public BrokerProxyValidator(AddressResolver<InetSocketAddress> inetSocketAddressResolver, String allowedHostNames,
+                                String allowedIPAddresses, String allowedTargetPorts) {
+        this.inetSocketAddressResolver = inetSocketAddressResolver;
+        List<String> allowedHostNamesStrings = parseCommaSeparatedConfigValue(allowedHostNames);
+        if (allowedHostNamesStrings.contains(ALLOW_ANY)) {
+            this.allowAnyHostName = true;
+            this.allowedHostNames = Collections.emptyList();
+        } else {
+            this.allowAnyHostName = false;
+            this.allowedHostNames = allowedHostNamesStrings.stream()
+                    .map(BrokerProxyValidator::parseWildcardPattern).collect(Collectors.toList());
+        }
+        List<String> allowedIPAddressesStrings = parseCommaSeparatedConfigValue(allowedIPAddresses);
+        if (allowedIPAddressesStrings.contains(ALLOW_ANY)) {
+            allowAnyIPAddress = true;
+            this.allowedIPAddresses = Collections.emptyList();
+        } else {
+            allowAnyIPAddress = false;
+            this.allowedIPAddresses = allowedIPAddressesStrings.stream().map(IPAddressString::new)
+                    .filter(ipAddressString -> {
+                        if (ipAddressString.isValid()) {
+                            return true;
+                        } else {
+                            throw new IllegalArgumentException("Invalid IP address filter '" + ipAddressString + "'",
+                                    ipAddressString.getAddressStringException());
+                        }
+                    }).map(IPAddressString::getAddress)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+        }
+        List<String> allowedTargetPortsStrings = parseCommaSeparatedConfigValue(allowedTargetPorts);
+        if (allowedTargetPortsStrings.contains(ALLOW_ANY)) {
+            allowAnyTargetPort = true;
+            this.allowedTargetPorts = new int[0];
+        } else {
+            allowAnyTargetPort = false;
+            this.allowedTargetPorts =
+                    allowedTargetPortsStrings.stream().mapToInt(Integer::parseInt).toArray();
+        }
+    }
+
+    private static Pattern parseWildcardPattern(String wildcardPattern) {
+        String regexPattern =
+                Collections.list(new StringTokenizer(wildcardPattern, "*", true))
+                        .stream()
+                        .map(String::valueOf)
+                        .map(token -> {
+                            if ("*".equals(token)) {
+                                return ".*";
+                            } else {
+                                return Pattern.quote(token);
+                            }
+                        }).collect(Collectors.joining());
+        return Pattern.compile(
+                "^" + regexPattern + "$",
+                Pattern.CASE_INSENSITIVE);
+    }
+
+    private static List<String> parseCommaSeparatedConfigValue(String configValue) {
+        return Arrays.stream(configValue.split(SEPARATOR)).map(String::trim).filter(s -> s.length() > 0)
+                .collect(Collectors.toList());
+    }
+
+    public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
+        int pos = hostAndPort.indexOf(':');
+        String host = hostAndPort.substring(0, pos);
+        int port = Integer.parseInt(hostAndPort.substring(pos + 1));
+        if (!isPortAllowed(port)) {
+            return FutureUtil.failedFuture(
+                    new TargetAddressDeniedException("Given port in '" + hostAndPort + "' isn't allowed."));
+        } else if (!isHostAllowed(host)) {
+            return FutureUtil.failedFuture(
+                    new TargetAddressDeniedException("Given host in '" + hostAndPort + "' isn't allowed."));
+        } else {
+            return toCompletableFuture(
+                            inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, port)))
+                    .thenCompose(resolvedAddress -> {
+                        CompletableFuture<InetSocketAddress> result = new CompletableFuture();
+                        if (isIPAddressAllowed(resolvedAddress)) {
+                            result.complete(resolvedAddress);
+                        } else {
+                            result.completeExceptionally(new TargetAddressDeniedException(
+                                    "The IP address of the given host and port '" + hostAndPort + "' isn't allowed."));
+                        }
+                        return result;
+                    });
+        }
+    }
+
+    // backported from NettyFutureUtil.toCompletableFuture
+    private static <V> CompletableFuture<V> toCompletableFuture(Future<V> future) {
+        Objects.requireNonNull(future, "future cannot be null");
+
+        CompletableFuture<V> adapter = new CompletableFuture<>();
+        if (future.isDone()) {
+            if (future.isSuccess()) {
+                adapter.complete(future.getNow());
+            } else {
+                adapter.completeExceptionally(future.cause());
+            }
+        } else {
+            future.addListener((Future<V> f) -> {
+                if (f.isSuccess()) {
+                    adapter.complete(f.getNow());
+                } else {
+                    adapter.completeExceptionally(f.cause());
+                }
+            });
+        }
+        return adapter;
+    }
+
+    private boolean isPortAllowed(int port) {
+        if (allowAnyTargetPort) {
+            return true;
+        }
+        for (int allowedPort : allowedTargetPorts) {
+            if (allowedPort == port) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isIPAddressAllowed(InetSocketAddress resolvedAddress) {
+        if (allowAnyIPAddress) {
+            return true;
+        }
+        byte[] addressBytes = resolvedAddress.getAddress().getAddress();
+        IPAddress candidateAddress =
+                addressBytes.length == 4 ? new IPv4Address(addressBytes) : new IPv6Address(addressBytes);
+        for (IPAddress allowedAddress : allowedIPAddresses) {
+            if (allowedAddress.contains(candidateAddress)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isHostAllowed(String host) {
+        if (allowAnyHostName) {
+            return true;
+        }
+        boolean matched = false;
+        for (Pattern allowedHostName : allowedHostNames) {
+            if (allowedHostName.matcher(host).matches()) {
+                matched = true;
+                break;
+            }
+        }
+        return matched;
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 3e3d668..f761517 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -39,6 +39,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
 import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -49,6 +50,7 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import javax.net.ssl.SSLSession;
@@ -73,25 +75,25 @@ import org.slf4j.LoggerFactory;
 public class DirectProxyHandler {
 
     @Getter
-    private Channel inboundChannel;
+    private final Channel inboundChannel;
     @Getter
     Channel outboundChannel;
     @Getter
     private final Rate inboundChannelRequestsRate;
     protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
-    private String originalPrincipal;
-    private AuthData clientAuthData;
-    private String clientAuthMethod;
-    private int protocolVersion;
+    private final String originalPrincipal;
+    private final AuthData clientAuthData;
+    private final String clientAuthMethod;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
-    private final Supplier<SslHandler> sslHandlerSupplier;
     private AuthenticationDataProvider authenticationDataProvider;
-    private ProxyService service;
+    private final ProxyService service;
+    private final Runnable onHandshakeCompleteAction;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
-            int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
+                              InetSocketAddress targetBrokerAddress, int protocolVersion,
+                              Supplier<SslHandler> sslHandlerSupplier) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -99,8 +101,7 @@ public class DirectProxyHandler {
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
-        this.protocolVersion = protocolVersion;
-        this.sslHandlerSupplier = sslHandlerSupplier;
+        this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -109,13 +110,22 @@ public class DirectProxyHandler {
         // switches when passing data between the 2
         // connections
         b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+        int brokerProxyConnectTimeoutMs = service.getConfiguration().getBrokerProxyConnectTimeoutMs();
+        if (brokerProxyConnectTimeoutMs > 0) {
+            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs);
+        }
         b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
-            protected void initChannel(SocketChannel ch) throws Exception {
+            protected void initChannel(SocketChannel ch) {
                 if (sslHandlerSupplier != null) {
                     ch.pipeline().addLast(TLS_HANDLER, sslHandlerSupplier.get());
                 }
+                int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs();
+                if (brokerProxyReadTimeoutMs > 0) {
+                    ch.pipeline().addLast("readTimeoutHandler",
+                            new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
+                }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
                 ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
@@ -133,7 +143,7 @@ public class DirectProxyHandler {
             return;
         }
 
-        ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort());
+        ChannelFuture f = b.connect(targetBrokerAddress);
         outboundChannel = f.channel();
         f.addListener(future -> {
             if (!future.isSuccess()) {
@@ -211,8 +221,8 @@ public class DirectProxyHandler {
         private BackendState state = BackendState.Init;
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
-        private ProxyConfiguration config;
-        private int protocolVersion;
+        private final ProxyConfiguration config;
+        private final int protocolVersion;
 
         public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
             this.config = config;
@@ -225,7 +235,7 @@ public class DirectProxyHandler {
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
             AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-            ByteBuf command = null;
+            ByteBuf command;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command);
@@ -293,12 +303,11 @@ public class DirectProxyHandler {
                 outboundChannel.read();
             } catch (Exception e) {
                 log.error("Error mutual verify", e);
-                return;
             }
         }
 
         @Override
-        public void operationComplete(Future<Void> future) throws Exception {
+        public void operationComplete(Future<Void> future) {
             // This is invoked when the write operation on the paired connection
             // is completed
             if (future.isSuccess()) {
@@ -317,6 +326,7 @@ public class DirectProxyHandler {
 
         @Override
         protected void handleConnected(CommandConnected connected) {
+            checkArgument(state == BackendState.Init, "Unexpected state %s. BackendState.Init was expected.", state);
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
             }
@@ -332,58 +342,68 @@ public class DirectProxyHandler {
 
             state = BackendState.HandshakeCompleted;
 
-            ChannelFuture channelFuture;
-            if (connected.hasMaxMessageSize()) {
-                channelFuture = inboundChannel.writeAndFlush(
-                    Commands.newConnected(connected.getProtocolVersion(), connected.getMaxMessageSize()));
-            } else {
-                channelFuture = inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
-            }
+            onHandshakeCompleteAction.run();
+            startDirectProxying(connected);
+
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(future -> {
+                        if (future.isSuccess()) {
+                            // Start reading from both connections
+                            inboundChannel.read();
+                            outboundChannel.read();
+                        } else {
+                            log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.",
+                                    inboundChannel,
+                                    outboundChannel, future.cause());
+                            inboundChannel.close();
+                        }
+                    });
+        }
 
-            channelFuture.addListener(future -> {
+        private void startDirectProxying(CommandConnected connected) {
+            if (service.getProxyLogLevel() == 0) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
                 }
-                if (service.getProxyLogLevel() == 0) {
-                    // direct tcp proxy
-                    inboundChannel.pipeline().remove("frameDecoder");
-                    outboundChannel.pipeline().remove("frameDecoder");
+                // direct tcp proxy
+                inboundChannel.pipeline().remove("frameDecoder");
+                outboundChannel.pipeline().remove("frameDecoder");
+            } else {
+                // Enable parsing feature, proxyLogLevel(1 or 2)
+                // Add parser handler
+                if (connected.hasMaxMessageSize()) {
+                    inboundChannel.pipeline()
+                            .replace("frameDecoder", "newFrameDecoder",
+                                    new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+                                            + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                            0, 4, 0, 4));
+                    outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
+                            new LengthFieldBasedFrameDecoder(
+                                    connected.getMaxMessageSize()
+                                            + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                    0, 4, 0, 4));
+
+                    inboundChannel.pipeline().addBefore("handler", "inboundParser",
+                            new ParserProxyHandler(service, inboundChannel,
+                                    ParserProxyHandler.FRONTEND_CONN,
+                                    connected.getMaxMessageSize()));
+                    outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                            new ParserProxyHandler(service, outboundChannel,
+                                    ParserProxyHandler.BACKEND_CONN,
+                                    connected.getMaxMessageSize()));
                 } else {
-                    // Enable parsing feature, proxyLogLevel(1 or 2)
-                    // Add parser handler
-                    if (connected.hasMaxMessageSize()) {
-                        inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
-                                                          new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
-                                                                                           + Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                                                                           0, 4, 0, 4));
-                        outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
-                                                           new LengthFieldBasedFrameDecoder(
-                                                               connected.getMaxMessageSize()
-                                                               + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-
-                        inboundChannel.pipeline().addBefore("handler", "inboundParser",
-                                                            new ParserProxyHandler(service, inboundChannel,
-                                                                                   ParserProxyHandler.FRONTEND_CONN,
-                                                                                   connected.getMaxMessageSize()));
-                        outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                                                             new ParserProxyHandler(service, outboundChannel,
-                                                                                    ParserProxyHandler.BACKEND_CONN,
-                                                                                    connected.getMaxMessageSize()));
-                    } else {
-                        inboundChannel.pipeline().addBefore("handler", "inboundParser",
-                                                            new ParserProxyHandler(service, inboundChannel,
-                                                                                   ParserProxyHandler.FRONTEND_CONN,
-                                                                                   Commands.DEFAULT_MAX_MESSAGE_SIZE));
-                        outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                                                             new ParserProxyHandler(service, outboundChannel,
-                                                                                    ParserProxyHandler.BACKEND_CONN,
-                                                                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
-                    }
+                    inboundChannel.pipeline().addBefore("handler", "inboundParser",
+                            new ParserProxyHandler(service, inboundChannel,
+                                    ParserProxyHandler.FRONTEND_CONN,
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                    outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                            new ParserProxyHandler(service, outboundChannel,
+                                    ParserProxyHandler.BACKEND_CONN,
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
                 }
-                // Start reading from both connections
-                inboundChannel.read();
-                outboundChannel.read();
-            });
+            }
         }
 
         @Override
@@ -404,7 +424,7 @@ public class DirectProxyHandler {
         private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
 
-            SSLSession sslSession = null;
+            SSLSession sslSession;
             if (sslHandler != null) {
                 sslSession = ((SslHandler) sslHandler).engine().getSession();
                 return (new TlsHostnameVerifier()).verify(hostname, sslSession);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 3847f82..a5c44ee 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -49,6 +49,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
     @Category
     private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";
     @Category
+    private static final String CATEGORY_BROKER_PROXY = "Broker Proxy";
+    @Category
     private static final String CATEGORY_AUTHENTICATION = "Proxy Authentication";
     @Category
     private static final String CATEGORY_AUTHORIZATION = "Proxy Authorization";
@@ -136,6 +138,43 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private String functionWorkerWebServiceURLTLS;
 
+    @FieldContext(category = CATEGORY_BROKER_PROXY,
+            doc = "When enabled, checks that the target broker is active before connecting. "
+                    + "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+                    + "for retrieving the active brokers.")
+    private boolean checkActiveBrokers = false;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Broker proxy connect timeout.\n"
+                    + "The timeout value for Broker proxy connect timeout is in millisecond. Set to 0 to disable."
+    )
+    private int brokerProxyConnectTimeoutMs = 10000;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Broker proxy read timeout.\n"
+                    + "The timeout value for Broker proxy read timeout is in millisecond. Set to 0 to disable."
+    )
+    private int brokerProxyReadTimeoutMs = 75000;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target host names. "
+                    + "Supports multiple comma separated entries and a wildcard.")
+    private String brokerProxyAllowedHostNames = "*";
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target ip addresses or ip networks / netmasks. "
+                    + "Supports multiple comma separated entries.")
+    private String brokerProxyAllowedIPAddresses = "*";
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target ports")
+    private String brokerProxyAllowedTargetPorts = "6650,6651";
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Hostname or IP address the service binds on"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index c559120..f1e45f0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,7 +31,7 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
-import io.netty.handler.codec.haproxy.HAProxyMessage;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -50,6 +53,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,11 +70,12 @@ import lombok.Getter;
  *
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
     // ConnectionPool is used by the proxy to issue lookup requests
     private ConnectionPool connectionPool;
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
-    private ProxyService service;
+    private final ProxyService service;
     private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
     private State state;
@@ -79,6 +84,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     private LookupProxyHandler lookupProxyHandler = null;
     @Getter
     private DirectProxyHandler directProxyHandler = null;
+    private final BrokerProxyValidator brokerProxyValidator;
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
@@ -108,6 +114,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         // looking into it
         ProxyConnectionToBroker,
 
+        Closing,
+
         Closed,
     }
 
@@ -120,6 +128,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         this.service = proxyService;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
+        this.brokerProxyValidator = service.getBrokerProxyValidator();
     }
 
     @Override
@@ -127,6 +136,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         super.channelRegistered(ctx);
         ProxyService.activeConnections.inc();
         if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+            state = State.Closing;
             ctx.close();
             ProxyService.rejectedConnections.inc();
         }
@@ -172,6 +182,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        state = State.Closing;
         super.exceptionCaught(ctx, cause);
         LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
                 ClientCnx.isKnownException(cause) ? null : cause);
@@ -210,7 +221,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     @Override
-    public void operationComplete(Future<Void> future) throws Exception {
+    public void operationComplete(Future<Void> future) {
         // This is invoked when the write operation on the paired connection is
         // completed
         if (future.isSuccess()) {
@@ -246,14 +257,51 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
 
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
-            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+                remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
-            // Client already knows which broker to connect. Let's open a
-            // connection there and just pass bytes in both directions
-            state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
-                protocolVersionToAdvertise, sslHandlerSupplier);
-            cancelKeepAliveTask();
+            // Optimize proxy connection to fail-fast if the target broker isn't active
+            // Pulsar client will retry connecting after a back off timeout
+            if (service.getConfiguration().isCheckActiveBrokers()
+                    && !isBrokerActive(proxyToBrokerUrl)) {
+                state = State.Closing;
+                LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
+                        remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
+                ctx()
+                        .writeAndFlush(
+                                Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."))
+                        .addListener(future -> ctx().close());
+                return;
+            }
+
+            brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
+                    .thenAccept(address -> ctx().executor().submit(() -> {
+                        // Client already knows which broker to connect. Let's open a
+                        // connection there and just pass bytes in both directions
+                        state = State.ProxyConnectionToBroker;
+                        directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
+                                protocolVersionToAdvertise, sslHandlerSupplier);
+                    }))
+                    .exceptionally(throwable -> {
+                        if (throwable instanceof TargetAddressDeniedException
+                                || throwable.getCause() instanceof TargetAddressDeniedException) {
+                            TargetAddressDeniedException targetAddressDeniedException =
+                                    (TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException
+                                            ? throwable : throwable.getCause());
+
+                            LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
+                                    remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
+                                    authMethod, clientAuthRole);
+                        } else {
+                            LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
+                                    remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
+                        }
+                        ctx()
+                                .writeAndFlush(
+                                        Commands.newError(-1, ServerError.ServiceNotReady,
+                                                "Target broker cannot be validated."))
+                                .addListener(future -> ctx().close());
+                        return null;
+                    });
         } else {
             // Client is doing a lookup, we can consider the handshake complete
             // and we'll take care of just topics and
@@ -305,6 +353,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
         if (remoteEndpointProtocolVersion < PulsarApi.ProtocolVersion.v10_VALUE) {
             LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
+            state = State.Closing;
             ctx.close();
             return;
         }
@@ -489,6 +538,36 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return haProxyMessage;
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
+    private boolean isBrokerActive(String targetBrokerHostPort) {
+        for (ServiceLookupData serviceLookupData : getAvailableBrokers()) {
+            if (matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort)
+                    || matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(),
+                    targetBrokerHostPort)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private List<? extends ServiceLookupData> getAvailableBrokers() {
+        if (service.getDiscoveryProvider() == null) {
+            LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null."
+                    + "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+                    + "when checkActiveBrokers is enabled.");
+            return Collections.emptyList();
+        }
+        try {
+            return service.getDiscoveryProvider().getAvailableBrokers();
+        } catch (PulsarServerException e) {
+            LOG.error("Unable to get available brokers", e);
+            return Collections.emptyList();
+        }
+    }
 
+    static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) {
+        return pulsarServiceUrl != null
+                && pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length()
+                && pulsarServiceUrl.startsWith(expectedPrefix)
+                && pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length());
+    }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 58ba3cf..c934883 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -26,6 +26,8 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -67,6 +69,10 @@ import com.google.common.collect.Sets;
 public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
+    @Getter
+    private final DnsNameResolver dnsNameResolver;
+    @Getter
+    private final BrokerProxyValidator brokerProxyValidator;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationCacheService configurationCacheService;
@@ -139,6 +145,15 @@ public class ProxyService implements Closeable {
         this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
         this.authenticationService = authenticationService;
 
+        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
+                .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+        dnsNameResolver = dnsNameResolverBuilder.build();
+
+        brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
+                proxyConfig.getBrokerProxyAllowedHostNames(),
+                proxyConfig.getBrokerProxyAllowedIPAddresses(),
+                proxyConfig.getBrokerProxyAllowedTargetPorts());
+
         statsExecutor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor"));
         statsExecutor.schedule(()->{
@@ -220,6 +235,8 @@ public class ProxyService implements Closeable {
     }
 
     public void close() throws IOException {
+        dnsNameResolver.close();
+
         if (discoveryProvider != null) {
             discoveryProvider.close();
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 25e0f59..2a01d5b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -27,6 +27,7 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 
 import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -87,8 +88,10 @@ public class ProxyServiceStarter {
 
     private ProxyConfiguration config;
 
+    @Getter
     private ProxyService proxyService;
 
+    @Getter
     private WebServer server;
 
     public ProxyServiceStarter(String[] args) throws Exception {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 658dd87..a033a87 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.proxy.server;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -46,6 +48,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
     private final ProxyService proxyService;
     private final boolean enableTls;
     private final boolean tlsEnabledWithKeyStore;
+    private final int brokerProxyReadTimeoutMs;
 
     private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
     private SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
@@ -58,6 +61,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
         this.proxyService = proxyService;
         this.enableTls = enableTls;
         this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
+        this.brokerProxyReadTimeoutMs = serviceConfig.getBrokerProxyReadTimeoutMs();
 
         if (enableTls) {
             if (tlsEnabledWithKeyStore) {
@@ -127,6 +131,10 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
             ch.pipeline().addLast(TLS_HANDLER,
                     new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
         }
+        if (brokerProxyReadTimeoutMs > 0) {
+            ch.pipeline().addLast("readTimeoutHandler",
+                    new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
+        }
         if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
new file mode 100644
index 0000000..e62525f
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+class TargetAddressDeniedException extends RuntimeException {
+    public TargetAddressDeniedException(String message) {
+        super(message);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 7b34695..a1c6e63 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -79,6 +79,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setAuthenticationEnabled(true);
         proxyConfig.setAuthorizationEnabled(true);
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
new file mode 100644
index 0000000..8e45755
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.netty.resolver.AddressResolver;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.SucceededFuture;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.curator.shaded.com.google.common.net.InetAddresses;
+import org.testng.annotations.Test;
+
+public class BrokerProxyValidatorTest {
+
+    @Test
+    public void shouldAllowValidInput() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "myhost"
+                , "1.2.0.0/16"
+                , "6650");
+        InetSocketAddress inetSocketAddress = brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+        assertNotNull(inetSocketAddress);
+        assertEquals(inetSocketAddress.getAddress().getHostAddress(), "1.2.3.4");
+        assertEquals(inetSocketAddress.getPort(), 6650);
+    }
+
+    @Test(expectedExceptions = ExecutionException.class,
+            expectedExceptionsMessageRegExp = ".*Given host in 'myhost:6650' isn't allowed.")
+    public void shouldPreventInvalidHostName() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "allowedhost"
+                , "1.2.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+    }
+
+    @Test(expectedExceptions = ExecutionException.class,
+            expectedExceptionsMessageRegExp = ".* The IP address of the given host and port 'myhost:6650' isn't allowed.")
+    public void shouldPreventInvalidIPAddress() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "myhost"
+                , "1.3.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+    }
+
+    @Test
+    public void shouldSupportHostNamePattern() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "*.mydomain"
+                , "1.2.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    @Test
+    public void shouldAllowAllWithWildcard() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "*"
+                , "*"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    private AddressResolver<InetSocketAddress> createMockedAddressResolver(String ipAddressResult) {
+        AddressResolver<InetSocketAddress> inetSocketAddressResolver = mock(AddressResolver.class);
+        when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> {
+            InetSocketAddress address = (InetSocketAddress) invocationOnMock.getArgument(0);
+            return new SucceededFuture<SocketAddress>(mock(EventExecutor.class),
+                    new InetSocketAddress(InetAddresses.forString(ipAddressResult), address.getPort()));
+        });
+        return inetSocketAddressResolver;
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index ad28ebe..4bf30b0 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -72,6 +72,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index ddc90df..8bc0da4 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -104,6 +104,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index bcdd031..4bca0ea 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -212,6 +212,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
 		ProxyConfiguration proxyConfig = new ProxyConfiguration();
 		proxyConfig.setAuthenticationEnabled(true);
 		proxyConfig.setServicePort(Optional.of(0));
+		proxyConfig.setBrokerProxyAllowedTargetPorts("*");
 		proxyConfig.setWebServicePort(Optional.of(0));
 		proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
new file mode 100644
index 0000000..5f533e3
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionTest {
+
+    @Test
+    public void testMatchesHostAndPort() {
+        assertTrue(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:6650", "1.2.3.4:6650"));
+        assertTrue(ProxyConnection
+                .matchesHostAndPort("pulsar+ssl://", "pulsar+ssl://1.2.3.4:6650", "1.2.3.4:6650"));
+        assertFalse(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "5.6.7.8:1234"));
+        assertFalse(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index c30ae4d..0b064f8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -53,6 +53,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 1f0420c..4859e2c 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -57,6 +57,7 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest
         internalSetup();
 
         proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
         proxyConfig.setHaProxyProtocolEnabled(true);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index a39b3d0..82150ef 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -104,6 +104,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index a075851..444b013 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -79,6 +79,7 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index b920ef4..47dcbb7 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -74,6 +74,7 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 8949f8d..a82d40b 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -52,6 +52,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index a218068..7f64bbf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -72,6 +72,7 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
         //enable full parsing feature
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 57018ea..f0eb11f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -209,6 +209,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 6dbb663..65424f2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -49,6 +49,8 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
     static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
 
     private ProxyServiceStarter serviceStarter;
+    private String serviceUrl;
+    private int webPort;
 
     @Override
     @BeforeClass
@@ -57,10 +59,14 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         serviceStarter = new ProxyServiceStarter(ARGS);
         serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
-        serviceStarter.getConfig().setServicePort(Optional.of(11000));
+        serviceStarter.getConfig().setWebServicePort(Optional.of(0));
+        serviceStarter.getConfig().setServicePort(Optional.of(0));
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+        serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         CollectorRegistry.defaultRegistry.clear();
         serviceStarter.start();
+        serviceUrl = serviceStarter.getProxyService().getServiceUrl();
+        webPort = serviceStarter.getServer().getListenPortHTTP().get();
     }
 
     @Override
@@ -73,7 +79,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:11000")
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
                 .build();
 
         @Cleanup
@@ -92,7 +98,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
-        String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        String produceUri = "ws://localhost:" + webPort + "/ws/producer/persistent/sample/test/local/websocket-topic";
         Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
 
         ProducerMessage produceRequest = new ProducerMessage();
@@ -103,7 +109,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
-        String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        String consumeUri = "ws://localhost:" + webPort + "/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
         Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
         consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
         producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 8198774..8cd7dee 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -53,6 +53,8 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
     private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
     private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
     private ProxyServiceStarter serviceStarter;
+    private String serviceUrl;
+    private int webPort;
 
     @Override
     @BeforeClass
@@ -65,12 +67,17 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
         serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
-        serviceStarter.getConfig().setServicePortTls(Optional.of(11043));
+        serviceStarter.getConfig().setServicePort(Optional.empty());
+        serviceStarter.getConfig().setServicePortTls(Optional.of(0));
+        serviceStarter.getConfig().setWebServicePort(Optional.of(0));
         serviceStarter.getConfig().setTlsEnabledWithBroker(true);
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
         serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
         serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         serviceStarter.start();
+        serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
+        webPort = serviceStarter.getServer().getListenPortHTTP().get();
     }
 
     protected void doInitConf() throws Exception {
@@ -89,7 +96,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043")
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
                 .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
                 .build();
 
@@ -109,7 +116,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
-        String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        String produceUri = "ws://localhost:" + webPort + "/ws/producer/persistent/sample/test/local/websocket-topic";
         Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
 
         ProducerMessage produceRequest = new ProducerMessage();
@@ -120,7 +127,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
-        String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        String consumeUri = "ws://localhost:" + webPort + "/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
         Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
         consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
         producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index e246625..a9febfc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -70,6 +70,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 8106ef3..4e8bfec 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -91,6 +91,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 465f4d2..942dab3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -57,6 +57,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 9273a63..800cd36 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -58,6 +58,7 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest {
         writer.close();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 14902c3..546ed99 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -111,6 +111,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 0e533a9..23a8227 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -183,6 +183,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
@@ -398,6 +399,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 4a7d0b8..830a251 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -97,6 +97,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
 
         // enable auth&auth and use JWT at proxy
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index d59b6c9..e1b30dc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -102,6 +102,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 17c0992..e048d61 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -76,6 +76,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas
         proxyConfig.setAuthenticationEnabled(true);
         proxyConfig.setAuthorizationEnabled(true);
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index ce0be5b..4429f01 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -66,6 +66,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
 
         // start proxy service
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
         proxyConfig.setStatusFilePath(STATUS_FILE_PATH);