You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/02/09 18:32:47 UTC

[pulsar] branch branch-2.8 updated (7716894 -> 3d2e6ce)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 7716894  AbstractMetadataStore: invalidate childrenCache correctly when node created (#14177)
     new 25e6b65  [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)
     new 1742df8  Allow config of IO and acceptor threads in proxy (#14054)
     new 3d2e6ce  [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |   1 +
 .../ProxySaslAuthenticationTest.java               |   1 +
 .../apache/pulsar/client/impl/ConnectionPool.java  |  62 +++----
 .../pulsar/common/protocol/PulsarHandler.java      |   2 +-
 pulsar-proxy/pom.xml                               |  13 +-
 .../proxy/server/BrokerDiscoveryProvider.java      |  10 +
 .../pulsar/proxy/server/BrokerProxyValidator.java  | 181 ++++++++++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    | 154 ++++++++-------
 .../pulsar/proxy/server/ProxyConfiguration.java    |  53 ++++++
 .../pulsar/proxy/server/ProxyConnection.java       | 206 ++++++++++++++-------
 .../apache/pulsar/proxy/server/ProxyService.java   |  36 ++--
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   2 +
 .../proxy/server/ServiceChannelInitializer.java    |   8 +
 .../proxy/server/TargetAddressDeniedException.java |  10 +-
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |   1 +
 .../proxy/server/BrokerProxyValidatorTest.java     | 102 ++++++++++
 .../proxy/server/ProxyAdditionalServletTest.java   |   1 +
 .../ProxyAuthenticatedProducerConsumerTest.java    |   1 +
 .../proxy/server/ProxyAuthenticationTest.java      |   1 +
 .../pulsar/proxy/server/ProxyConnectionTest.java   |  21 ++-
 .../server/ProxyConnectionThrottlingTest.java      |   1 +
 .../server/ProxyEnableHAProxyProtocolTest.java     |   1 +
 .../proxy/server/ProxyForwardAuthDataTest.java     |   1 +
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java |   1 +
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    |   1 +
 .../proxy/server/ProxyLookupThrottlingTest.java    |   1 +
 .../pulsar/proxy/server/ProxyParserTest.java       |   1 +
 .../proxy/server/ProxyRolesEnforcementTest.java    |   1 +
 .../proxy/server/ProxyServiceStarterTest.java      |   5 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  15 +-
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |   1 +
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |   1 +
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |   1 +
 .../pulsar/proxy/server/ProxyTlsTestWithAuth.java  |   1 +
 .../server/ProxyWithAuthorizationNegTest.java      |   1 +
 .../proxy/server/ProxyWithAuthorizationTest.java   |   2 +
 .../server/ProxyWithJwtAuthorizationTest.java      |   1 +
 .../server/ProxyWithoutServiceDiscoveryTest.java   |   1 +
 .../SuperUserAuthedAdminProxyHandlerTest.java      |   1 +
 .../server/UnauthedAdminProxyHandlerTest.java      |   1 +
 41 files changed, 705 insertions(+), 202 deletions(-)
 create mode 100644 pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
 copy pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/StringSource.java => pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java (81%)
 create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
 copy pulsar-common/src/test/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicyTest.java => pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java (59%)

[pulsar] 03/03: [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3d2e6ce84b6e69667a1c2095b766d9941a258b61
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Feb 9 19:39:11 2022 +0200

    [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)
    
    (cherry picked from commit 640b4e6ec14d9b812da608037c58b664e6778637)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |   1 +
 .../ProxySaslAuthenticationTest.java               |   1 +
 .../pulsar/common/protocol/PulsarHandler.java      |   2 +-
 pulsar-proxy/pom.xml                               |  13 +-
 .../proxy/server/BrokerDiscoveryProvider.java      |  10 ++
 .../pulsar/proxy/server/BrokerProxyValidator.java  | 181 +++++++++++++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    | 154 ++++++++++--------
 .../pulsar/proxy/server/ProxyConfiguration.java    |  39 +++++
 .../pulsar/proxy/server/ProxyConnection.java       | 101 ++++++++++--
 .../apache/pulsar/proxy/server/ProxyService.java   |  17 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   2 +
 .../proxy/server/ServiceChannelInitializer.java    |   8 +
 .../proxy/server/TargetAddressDeniedException.java |  26 +++
 .../proxy/server/AuthedAdminProxyHandlerTest.java  |   1 +
 .../proxy/server/BrokerProxyValidatorTest.java     | 102 ++++++++++++
 .../proxy/server/ProxyAdditionalServletTest.java   |   1 +
 .../ProxyAuthenticatedProducerConsumerTest.java    |   1 +
 .../proxy/server/ProxyAuthenticationTest.java      |   1 +
 .../pulsar/proxy/server/ProxyConnectionTest.java   |  38 +++++
 .../server/ProxyConnectionThrottlingTest.java      |   1 +
 .../server/ProxyEnableHAProxyProtocolTest.java     |   1 +
 .../proxy/server/ProxyForwardAuthDataTest.java     |   1 +
 .../proxy/server/ProxyKeyStoreTlsTestWithAuth.java |   1 +
 .../server/ProxyKeyStoreTlsTestWithoutAuth.java    |   1 +
 .../proxy/server/ProxyLookupThrottlingTest.java    |   1 +
 .../pulsar/proxy/server/ProxyParserTest.java       |   1 +
 .../proxy/server/ProxyRolesEnforcementTest.java    |   1 +
 .../proxy/server/ProxyServiceStarterTest.java      |   5 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |  15 +-
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |   1 +
 .../org/apache/pulsar/proxy/server/ProxyTest.java  |   1 +
 .../apache/pulsar/proxy/server/ProxyTlsTest.java   |   1 +
 .../pulsar/proxy/server/ProxyTlsTestWithAuth.java  |   1 +
 .../server/ProxyWithAuthorizationNegTest.java      |   1 +
 .../proxy/server/ProxyWithAuthorizationTest.java   |   2 +
 .../server/ProxyWithJwtAuthorizationTest.java      |   1 +
 .../server/ProxyWithoutServiceDiscoveryTest.java   |   1 +
 .../SuperUserAuthedAdminProxyHandlerTest.java      |   1 +
 .../server/UnauthedAdminProxyHandlerTest.java      |   1 +
 40 files changed, 655 insertions(+), 85 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index cd71b38..4653b71 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -522,6 +522,8 @@ The Apache Software License, Version 2.0
     - com.google.http-client-google-http-client-1.38.0.jar
     - com.google.auto.value-auto-value-annotations-1.7.4.jar
     - com.google.re2j-re2j-1.5.jar
+  * IPAddress
+    - com.github.seancfoley-ipaddress-5.3.3.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
diff --git a/pom.xml b/pom.xml
index 1075d57..27dcb4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -195,6 +195,7 @@ flexible messaging model and an intuitive client API.</description>
     <cron-utils.version>9.1.6</cron-utils.version>
     <spring-context.version>5.3.15</spring-context.version>
     <apache-http-client.version>4.5.13</apache-http-client.version>
+    <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
 
     <!-- test dependencies -->
     <cassandra.version>3.6.0</cassandra.version>
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index aabdcf8..e7b2c44 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -224,6 +224,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
 		ProxyConfiguration proxyConfig = new ProxyConfiguration();
 		proxyConfig.setAuthenticationEnabled(true);
 		proxyConfig.setServicePort(Optional.of(0));
+		proxyConfig.setBrokerProxyAllowedTargetPorts("*");
 		proxyConfig.setWebServicePort(Optional.of(0));
 		proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 		proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index cdf372d..48e7ffa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -118,7 +118,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
         }
     }
 
-    protected void cancelKeepAliveTask() {
+    public void cancelKeepAliveTask() {
         if (keepAliveTask != null) {
             keepAliveTask.cancel(false);
             keepAliveTask = null;
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 39aecb5..aa2f463 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -19,7 +19,7 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
@@ -174,6 +174,17 @@
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.seancfoley</groupId>
+      <artifactId>ipaddress</artifactId>
+      <version>${seancfoley.ipaddress.version}</version>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index ae8e134..c549b45 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +78,15 @@ public class BrokerDiscoveryProvider implements Closeable {
     }
 
     /**
+     * Access the list of available brokers.
+     * @return the list of available brokers
+     * @throws PulsarServerException
+     */
+    public List<? extends ServiceLookupData> getAvailableBrokers() throws PulsarServerException {
+        return metadataStoreCacheLoader.getAvailableBrokers();
+    }
+
+    /**
      * Find next broker {@link LoadManagerReport} in round-robin fashion.
      *
      * @return
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
new file mode 100644
index 0000000..debe1f7
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import inet.ipaddr.IPAddress;
+import inet.ipaddr.IPAddressString;
+import inet.ipaddr.ipv4.IPv4Address;
+import inet.ipaddr.ipv6.IPv6Address;
+import io.netty.resolver.AddressResolver;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.NettyFutureUtil;
+
+@Slf4j
+public class BrokerProxyValidator {
+    private static final String SEPARATOR = "\\s*,\\s*";
+    private static final String ALLOW_ANY = "*";
+    private final int[] allowedTargetPorts;
+    private final boolean allowAnyTargetPort;
+    private final List<IPAddress> allowedIPAddresses;
+    private final boolean allowAnyIPAddress;
+    private final AddressResolver<InetSocketAddress> inetSocketAddressResolver;
+    private final List<Pattern> allowedHostNames;
+    private final boolean allowAnyHostName;
+
+    public BrokerProxyValidator(AddressResolver<InetSocketAddress> inetSocketAddressResolver, String allowedHostNames,
+                                String allowedIPAddresses, String allowedTargetPorts) {
+        this.inetSocketAddressResolver = inetSocketAddressResolver;
+        List<String> allowedHostNamesStrings = parseCommaSeparatedConfigValue(allowedHostNames);
+        if (allowedHostNamesStrings.contains(ALLOW_ANY)) {
+            this.allowAnyHostName = true;
+            this.allowedHostNames = Collections.emptyList();
+        } else {
+            this.allowAnyHostName = false;
+            this.allowedHostNames = allowedHostNamesStrings.stream()
+                    .map(BrokerProxyValidator::parseWildcardPattern).collect(Collectors.toList());
+        }
+        List<String> allowedIPAddressesStrings = parseCommaSeparatedConfigValue(allowedIPAddresses);
+        if (allowedIPAddressesStrings.contains(ALLOW_ANY)) {
+            allowAnyIPAddress = true;
+            this.allowedIPAddresses = Collections.emptyList();
+        } else {
+            allowAnyIPAddress = false;
+            this.allowedIPAddresses = allowedIPAddressesStrings.stream().map(IPAddressString::new)
+                    .filter(ipAddressString -> {
+                        if (ipAddressString.isValid()) {
+                            return true;
+                        } else {
+                            throw new IllegalArgumentException("Invalid IP address filter '" + ipAddressString + "'",
+                                    ipAddressString.getAddressStringException());
+                        }
+                    }).map(IPAddressString::getAddress)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+        }
+        List<String> allowedTargetPortsStrings = parseCommaSeparatedConfigValue(allowedTargetPorts);
+        if (allowedTargetPortsStrings.contains(ALLOW_ANY)) {
+            allowAnyTargetPort = true;
+            this.allowedTargetPorts = new int[0];
+        } else {
+            allowAnyTargetPort = false;
+            this.allowedTargetPorts =
+                    allowedTargetPortsStrings.stream().mapToInt(Integer::parseInt).toArray();
+        }
+    }
+
+    private static Pattern parseWildcardPattern(String wildcardPattern) {
+        String regexPattern =
+                Collections.list(new StringTokenizer(wildcardPattern, "*", true))
+                        .stream()
+                        .map(String::valueOf)
+                        .map(token -> {
+                            if ("*".equals(token)) {
+                                return ".*";
+                            } else {
+                                return Pattern.quote(token);
+                            }
+                        }).collect(Collectors.joining());
+        return Pattern.compile(
+                "^" + regexPattern + "$",
+                Pattern.CASE_INSENSITIVE);
+    }
+
+    private static List<String> parseCommaSeparatedConfigValue(String configValue) {
+        return Arrays.stream(configValue.split(SEPARATOR)).map(String::trim).filter(s -> s.length() > 0)
+                .collect(Collectors.toList());
+    }
+
+    public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
+        int pos = hostAndPort.indexOf(':');
+        String host = hostAndPort.substring(0, pos);
+        int port = Integer.parseInt(hostAndPort.substring(pos + 1));
+        if (!isPortAllowed(port)) {
+            return FutureUtil.failedFuture(
+                    new TargetAddressDeniedException("Given port in '" + hostAndPort + "' isn't allowed."));
+        } else if (!isHostAllowed(host)) {
+            return FutureUtil.failedFuture(
+                    new TargetAddressDeniedException("Given host in '" + hostAndPort + "' isn't allowed."));
+        } else {
+            return NettyFutureUtil.toCompletableFuture(
+                            inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, port)))
+                    .thenCompose(resolvedAddress -> {
+                        CompletableFuture<InetSocketAddress> result = new CompletableFuture();
+                        if (isIPAddressAllowed(resolvedAddress)) {
+                            result.complete(resolvedAddress);
+                        } else {
+                            result.completeExceptionally(new TargetAddressDeniedException(
+                                    "The IP address of the given host and port '" + hostAndPort + "' isn't allowed."));
+                        }
+                        return result;
+                    });
+        }
+    }
+
+    private boolean isPortAllowed(int port) {
+        if (allowAnyTargetPort) {
+            return true;
+        }
+        for (int allowedPort : allowedTargetPorts) {
+            if (allowedPort == port) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isIPAddressAllowed(InetSocketAddress resolvedAddress) {
+        if (allowAnyIPAddress) {
+            return true;
+        }
+        byte[] addressBytes = resolvedAddress.getAddress().getAddress();
+        IPAddress candidateAddress =
+                addressBytes.length == 4 ? new IPv4Address(addressBytes) : new IPv6Address(addressBytes);
+        for (IPAddress allowedAddress : allowedIPAddresses) {
+            if (allowedAddress.contains(candidateAddress)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isHostAllowed(String host) {
+        if (allowAnyHostName) {
+            return true;
+        }
+        boolean matched = false;
+        for (Pattern allowedHostName : allowedHostNames) {
+            if (allowedHostName.matcher(host).matches()) {
+                matched = true;
+                break;
+            }
+        }
+        return matched;
+    }
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index c896be5..37fc3d5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -39,6 +39,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
 import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.CharsetUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -49,6 +50,7 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import javax.net.ssl.SSLSession;
@@ -73,25 +75,25 @@ import org.slf4j.LoggerFactory;
 public class DirectProxyHandler {
 
     @Getter
-    private Channel inboundChannel;
+    private final Channel inboundChannel;
     @Getter
     Channel outboundChannel;
     @Getter
     private final Rate inboundChannelRequestsRate;
     protected static Map<ChannelId, ChannelId> inboundOutboundChannelMap = new ConcurrentHashMap<>();
-    private String originalPrincipal;
-    private AuthData clientAuthData;
-    private String clientAuthMethod;
-    private int protocolVersion;
+    private final String originalPrincipal;
+    private final AuthData clientAuthData;
+    private final String clientAuthMethod;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
-    private final Supplier<SslHandler> sslHandlerSupplier;
     private AuthenticationDataProvider authenticationDataProvider;
-    private ProxyService service;
+    private final ProxyService service;
+    private final Runnable onHandshakeCompleteAction;
 
     public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl,
-            int protocolVersion, Supplier<SslHandler> sslHandlerSupplier) {
+                              InetSocketAddress targetBrokerAddress, int protocolVersion,
+                              Supplier<SslHandler> sslHandlerSupplier) {
         this.service = service;
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
@@ -99,8 +101,7 @@ public class DirectProxyHandler {
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
-        this.protocolVersion = protocolVersion;
-        this.sslHandlerSupplier = sslHandlerSupplier;
+        this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -109,13 +110,22 @@ public class DirectProxyHandler {
         // switches when passing data between the 2
         // connections
         b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
+        int brokerProxyConnectTimeoutMs = service.getConfiguration().getBrokerProxyConnectTimeoutMs();
+        if (brokerProxyConnectTimeoutMs > 0) {
+            b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs);
+        }
         b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
-            protected void initChannel(SocketChannel ch) throws Exception {
+            protected void initChannel(SocketChannel ch) {
                 if (sslHandlerSupplier != null) {
                     ch.pipeline().addLast(TLS_HANDLER, sslHandlerSupplier.get());
                 }
+                int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs();
+                if (brokerProxyReadTimeoutMs > 0) {
+                    ch.pipeline().addLast("readTimeoutHandler",
+                            new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
+                }
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                     Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
                 ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion));
@@ -133,7 +143,7 @@ public class DirectProxyHandler {
             return;
         }
 
-        ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort());
+        ChannelFuture f = b.connect(targetBrokerAddress);
         outboundChannel = f.channel();
         f.addListener(future -> {
             if (!future.isSuccess()) {
@@ -211,8 +221,8 @@ public class DirectProxyHandler {
         private BackendState state = BackendState.Init;
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
-        private ProxyConfiguration config;
-        private int protocolVersion;
+        private final ProxyConfiguration config;
+        private final int protocolVersion;
 
         public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) {
             this.config = config;
@@ -225,7 +235,7 @@ public class DirectProxyHandler {
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
             AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-            ByteBuf command = null;
+            ByteBuf command;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command);
@@ -293,12 +303,11 @@ public class DirectProxyHandler {
                 outboundChannel.read();
             } catch (Exception e) {
                 log.error("Error mutual verify", e);
-                return;
             }
         }
 
         @Override
-        public void operationComplete(Future<Void> future) throws Exception {
+        public void operationComplete(Future<Void> future) {
             // This is invoked when the write operation on the paired connection
             // is completed
             if (future.isSuccess()) {
@@ -317,6 +326,7 @@ public class DirectProxyHandler {
 
         @Override
         protected void handleConnected(CommandConnected connected) {
+            checkArgument(state == BackendState.Init, "Unexpected state %s. BackendState.Init was expected.", state);
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
             }
@@ -332,58 +342,68 @@ public class DirectProxyHandler {
 
             state = BackendState.HandshakeCompleted;
 
-            ChannelFuture channelFuture;
-            if (connected.hasMaxMessageSize()) {
-                channelFuture = inboundChannel.writeAndFlush(
-                    Commands.newConnected(connected.getProtocolVersion(), connected.getMaxMessageSize()));
-            } else {
-                channelFuture = inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion()));
-            }
+            onHandshakeCompleteAction.run();
+            startDirectProxying(connected);
+
+            int maxMessageSize =
+                    connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+            inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize))
+                    .addListener(future -> {
+                        if (future.isSuccess()) {
+                            // Start reading from both connections
+                            inboundChannel.read();
+                            outboundChannel.read();
+                        } else {
+                            log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.",
+                                    inboundChannel,
+                                    outboundChannel, future.cause());
+                            inboundChannel.close();
+                        }
+                    });
+        }
 
-            channelFuture.addListener(future -> {
-                if (service.getProxyLogLevel() == 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
-                    }
-                    // direct tcp proxy
-                    inboundChannel.pipeline().remove("frameDecoder");
-                    outboundChannel.pipeline().remove("frameDecoder");
+        private void startDirectProxying(CommandConnected connected) {
+            if (service.getProxyLogLevel() == 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel);
+                }
+                // direct tcp proxy
+                inboundChannel.pipeline().remove("frameDecoder");
+                outboundChannel.pipeline().remove("frameDecoder");
+            } else {
+                // Enable parsing feature, proxyLogLevel(1 or 2)
+                // Add parser handler
+                if (connected.hasMaxMessageSize()) {
+                    inboundChannel.pipeline()
+                            .replace("frameDecoder", "newFrameDecoder",
+                                    new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
+                                            + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                            0, 4, 0, 4));
+                    outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
+                            new LengthFieldBasedFrameDecoder(
+                                    connected.getMaxMessageSize()
+                                            + Commands.MESSAGE_SIZE_FRAME_PADDING,
+                                    0, 4, 0, 4));
+
+                    inboundChannel.pipeline().addBefore("handler", "inboundParser",
+                            new ParserProxyHandler(service, inboundChannel,
+                                    ParserProxyHandler.FRONTEND_CONN,
+                                    connected.getMaxMessageSize()));
+                    outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                            new ParserProxyHandler(service, outboundChannel,
+                                    ParserProxyHandler.BACKEND_CONN,
+                                    connected.getMaxMessageSize()));
                 } else {
-                    // Enable parsing feature, proxyLogLevel(1 or 2)
-                    // Add parser handler
-                    if (connected.hasMaxMessageSize()) {
-                        inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
-                                                          new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize()
-                                                                                           + Commands.MESSAGE_SIZE_FRAME_PADDING,
-                                                                                           0, 4, 0, 4));
-                        outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder",
-                                                           new LengthFieldBasedFrameDecoder(
-                                                               connected.getMaxMessageSize()
-                                                               + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
-
-                        inboundChannel.pipeline().addBefore("handler", "inboundParser",
-                                                            new ParserProxyHandler(service, inboundChannel,
-                                                                                   ParserProxyHandler.FRONTEND_CONN,
-                                                                                   connected.getMaxMessageSize()));
-                        outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                                                             new ParserProxyHandler(service, outboundChannel,
-                                                                                    ParserProxyHandler.BACKEND_CONN,
-                                                                                    connected.getMaxMessageSize()));
-                    } else {
-                        inboundChannel.pipeline().addBefore("handler", "inboundParser",
-                                                            new ParserProxyHandler(service, inboundChannel,
-                                                                                   ParserProxyHandler.FRONTEND_CONN,
-                                                                                   Commands.DEFAULT_MAX_MESSAGE_SIZE));
-                        outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
-                                                             new ParserProxyHandler(service, outboundChannel,
-                                                                                    ParserProxyHandler.BACKEND_CONN,
-                                                                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
-                    }
+                    inboundChannel.pipeline().addBefore("handler", "inboundParser",
+                            new ParserProxyHandler(service, inboundChannel,
+                                    ParserProxyHandler.FRONTEND_CONN,
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
+                    outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
+                            new ParserProxyHandler(service, outboundChannel,
+                                    ParserProxyHandler.BACKEND_CONN,
+                                    Commands.DEFAULT_MAX_MESSAGE_SIZE));
                 }
-                // Start reading from both connections
-                inboundChannel.read();
-                outboundChannel.read();
-            });
+            }
         }
 
         @Override
@@ -404,7 +424,7 @@ public class DirectProxyHandler {
         private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
             ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
 
-            SSLSession sslSession = null;
+            SSLSession sslSession;
             if (sslHandler != null) {
                 sslSession = ((SslHandler) sslHandler).engine().getSession();
                 return (new TlsHostnameVerifier()).verify(hostname, sslSession);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index a50ac70..fd56e18 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -49,6 +49,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
     @Category
     private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery";
     @Category
+    private static final String CATEGORY_BROKER_PROXY = "Broker Proxy";
+    @Category
     private static final String CATEGORY_AUTHENTICATION = "Proxy Authentication";
     @Category
     private static final String CATEGORY_AUTHORIZATION = "Proxy Authorization";
@@ -136,6 +138,43 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private String functionWorkerWebServiceURLTLS;
 
+    @FieldContext(category = CATEGORY_BROKER_PROXY,
+            doc = "When enabled, checks that the target broker is active before connecting. "
+                    + "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+                    + "for retrieving the active brokers.")
+    private boolean checkActiveBrokers = false;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Broker proxy connect timeout.\n"
+                    + "The timeout value for Broker proxy connect timeout is in millisecond. Set to 0 to disable."
+    )
+    private int brokerProxyConnectTimeoutMs = 10000;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Broker proxy read timeout.\n"
+                    + "The timeout value for Broker proxy read timeout is in millisecond. Set to 0 to disable."
+    )
+    private int brokerProxyReadTimeoutMs = 75000;
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target host names. "
+                    + "Supports multiple comma separated entries and a wildcard.")
+    private String brokerProxyAllowedHostNames = "*";
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target ip addresses or ip networks / netmasks. "
+                    + "Supports multiple comma separated entries.")
+    private String brokerProxyAllowedIPAddresses = "*";
+
+    @FieldContext(
+            category = CATEGORY_BROKER_PROXY,
+            doc = "Allowed broker target ports")
+    private String brokerProxyAllowedTargetPorts = "6650,6651";
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Hostname or IP address the service binds on"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index ff392ca..df99aca 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.proxy.server;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.handler.codec.haproxy.HAProxyMessage;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,7 +31,7 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
-import io.netty.handler.codec.haproxy.HAProxyMessage;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -50,6 +53,7 @@ import org.apache.pulsar.common.api.proto.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,11 +70,12 @@ import lombok.Getter;
  *
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
     // ConnectionPool is used by the proxy to issue lookup requests
     private ConnectionPool connectionPool;
     private final AtomicLong requestIdGenerator =
             new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
-    private ProxyService service;
+    private final ProxyService service;
     AuthenticationDataSource authenticationData;
     private State state;
     private final Supplier<SslHandler> sslHandlerSupplier;
@@ -78,6 +83,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     private LookupProxyHandler lookupProxyHandler = null;
     @Getter
     private DirectProxyHandler directProxyHandler = null;
+    private final BrokerProxyValidator brokerProxyValidator;
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
@@ -109,6 +115,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         // looking into it
         ProxyConnectionToBroker,
 
+        Closing,
+
         Closed,
     }
 
@@ -121,6 +129,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         this.service = proxyService;
         this.state = State.Init;
         this.sslHandlerSupplier = sslHandlerSupplier;
+        this.brokerProxyValidator = service.getBrokerProxyValidator();
     }
 
     @Override
@@ -128,6 +137,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         super.channelRegistered(ctx);
         ProxyService.activeConnections.inc();
         if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
+            state = State.Closing;
             ctx.close();
             ProxyService.rejectedConnections.inc();
         }
@@ -173,6 +183,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        state = State.Closing;
         super.exceptionCaught(ctx, cause);
         LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(),
                 ClientCnx.isKnownException(cause) ? null : cause);
@@ -211,7 +222,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     @Override
-    public void operationComplete(Future<Void> future) throws Exception {
+    public void operationComplete(Future<Void> future) {
         // This is invoked when the write operation on the paired connection is
         // completed
         if (future.isSuccess()) {
@@ -247,14 +258,51 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
 
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
-            remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
+                remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
-            // Client already knows which broker to connect. Let's open a
-            // connection there and just pass bytes in both directions
-            state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl,
-                protocolVersionToAdvertise, sslHandlerSupplier);
-            cancelKeepAliveTask();
+            // Optimize proxy connection to fail-fast if the target broker isn't active
+            // Pulsar client will retry connecting after a back off timeout
+            if (service.getConfiguration().isCheckActiveBrokers()
+                    && !isBrokerActive(proxyToBrokerUrl)) {
+                state = State.Closing;
+                LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
+                        remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
+                ctx()
+                        .writeAndFlush(
+                                Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."))
+                        .addListener(future -> ctx().close());
+                return;
+            }
+
+            brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl)
+                    .thenAccept(address -> ctx().executor().submit(() -> {
+                        // Client already knows which broker to connect. Let's open a
+                        // connection there and just pass bytes in both directions
+                        state = State.ProxyConnectionToBroker;
+                        directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address,
+                                protocolVersionToAdvertise, sslHandlerSupplier);
+                    }))
+                    .exceptionally(throwable -> {
+                        if (throwable instanceof TargetAddressDeniedException
+                                || throwable.getCause() instanceof TargetAddressDeniedException) {
+                            TargetAddressDeniedException targetAddressDeniedException =
+                                    (TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException
+                                            ? throwable : throwable.getCause());
+
+                            LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
+                                    remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
+                                    authMethod, clientAuthRole);
+                        } else {
+                            LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
+                                    remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
+                        }
+                        ctx()
+                                .writeAndFlush(
+                                        Commands.newError(-1, ServerError.ServiceNotReady,
+                                                "Target broker cannot be validated."))
+                                .addListener(future -> ctx().close());
+                        return null;
+                    });
         } else {
             // Client is doing a lookup, we can consider the handshake complete
             // and we'll take care of just topics and
@@ -306,6 +354,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
         if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) {
             LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress);
+            state = State.Closing;
             ctx.close();
             return;
         }
@@ -485,6 +534,36 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return haProxyMessage;
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
+    private boolean isBrokerActive(String targetBrokerHostPort) {
+        for (ServiceLookupData serviceLookupData : getAvailableBrokers()) {
+            if (matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort)
+                    || matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(),
+                    targetBrokerHostPort)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private List<? extends ServiceLookupData> getAvailableBrokers() {
+        if (service.getDiscoveryProvider() == null) {
+            LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null."
+                    + "zookeeperServers and configurationStoreServers must be configured in proxy configuration "
+                    + "when checkActiveBrokers is enabled.");
+            return Collections.emptyList();
+        }
+        try {
+            return service.getDiscoveryProvider().getAvailableBrokers();
+        } catch (PulsarServerException e) {
+            LOG.error("Unable to get available brokers", e);
+            return Collections.emptyList();
+        }
+    }
 
+    static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) {
+        return pulsarServiceUrl != null
+                && pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length()
+                && pulsarServiceUrl.startsWith(expectedPrefix)
+                && pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length());
+    }
 }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 75f922d..302ceba 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -26,6 +26,8 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -73,6 +75,10 @@ public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
     private final Authentication proxyClientAuthentication;
+    @Getter
+    private final DnsNameResolver dnsNameResolver;
+    @Getter
+    private final BrokerProxyValidator brokerProxyValidator;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationMetadataCacheService configurationCacheService;
@@ -147,6 +153,15 @@ public class ProxyService implements Closeable {
                 false, workersThreadFactory);
         this.authenticationService = authenticationService;
 
+        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next())
+                .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup));
+        dnsNameResolver = dnsNameResolverBuilder.build();
+
+        brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(),
+                proxyConfig.getBrokerProxyAllowedHostNames(),
+                proxyConfig.getBrokerProxyAllowedIPAddresses(),
+                proxyConfig.getBrokerProxyAllowedTargetPorts());
+
         statsExecutor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor"));
         statsExecutor.schedule(()->{
@@ -234,6 +249,8 @@ public class ProxyService implements Closeable {
     }
 
     public void close() throws IOException {
+        dnsNameResolver.close();
+
         if (discoveryProvider != null) {
             discoveryProvider.close();
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 86ce236..6bbcc88 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -27,6 +27,7 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 
 import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -88,6 +89,7 @@ public class ProxyServiceStarter {
 
     private ProxyConfiguration config;
 
+    @Getter
     private ProxyService proxyService;
 
     private WebServer server;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 658dd87..a033a87 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.proxy.server;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -46,6 +48,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
     private final ProxyService proxyService;
     private final boolean enableTls;
     private final boolean tlsEnabledWithKeyStore;
+    private final int brokerProxyReadTimeoutMs;
 
     private SslContextAutoRefreshBuilder<SslContext> serverSslCtxRefresher;
     private SslContextAutoRefreshBuilder<SslContext> clientSslCtxRefresher;
@@ -58,6 +61,7 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
         this.proxyService = proxyService;
         this.enableTls = enableTls;
         this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
+        this.brokerProxyReadTimeoutMs = serviceConfig.getBrokerProxyReadTimeoutMs();
 
         if (enableTls) {
             if (tlsEnabledWithKeyStore) {
@@ -127,6 +131,10 @@ public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel>
             ch.pipeline().addLast(TLS_HANDLER,
                     new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine()));
         }
+        if (brokerProxyReadTimeoutMs > 0) {
+            ch.pipeline().addLast("readTimeoutHandler",
+                    new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS));
+        }
         if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) {
             ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
new file mode 100644
index 0000000..e62525f
--- /dev/null
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+class TargetAddressDeniedException extends RuntimeException {
+    public TargetAddressDeniedException(String message) {
+        super(message);
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index 545912d..88de4b3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -83,6 +83,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
         proxyConfig.setAuthenticationEnabled(true);
         proxyConfig.setAuthorizationEnabled(true);
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
new file mode 100644
index 0000000..8e45755
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.proxy.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import io.netty.resolver.AddressResolver;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.SucceededFuture;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+import org.apache.curator.shaded.com.google.common.net.InetAddresses;
+import org.testng.annotations.Test;
+
+public class BrokerProxyValidatorTest {
+
+    @Test
+    public void shouldAllowValidInput() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "myhost"
+                , "1.2.0.0/16"
+                , "6650");
+        InetSocketAddress inetSocketAddress = brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+        assertNotNull(inetSocketAddress);
+        assertEquals(inetSocketAddress.getAddress().getHostAddress(), "1.2.3.4");
+        assertEquals(inetSocketAddress.getPort(), 6650);
+    }
+
+    @Test(expectedExceptions = ExecutionException.class,
+            expectedExceptionsMessageRegExp = ".*Given host in 'myhost:6650' isn't allowed.")
+    public void shouldPreventInvalidHostName() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "allowedhost"
+                , "1.2.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+    }
+
+    @Test(expectedExceptions = ExecutionException.class,
+            expectedExceptionsMessageRegExp = ".* The IP address of the given host and port 'myhost:6650' isn't allowed.")
+    public void shouldPreventInvalidIPAddress() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "myhost"
+                , "1.3.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get();
+    }
+
+    @Test
+    public void shouldSupportHostNamePattern() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "*.mydomain"
+                , "1.2.0.0/16"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    @Test
+    public void shouldAllowAllWithWildcard() throws Exception {
+        BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator(
+                createMockedAddressResolver("1.2.3.4"),
+                "*"
+                , "*"
+                , "6650");
+        brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get();
+    }
+
+    private AddressResolver<InetSocketAddress> createMockedAddressResolver(String ipAddressResult) {
+        AddressResolver<InetSocketAddress> inetSocketAddressResolver = mock(AddressResolver.class);
+        when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> {
+            InetSocketAddress address = (InetSocketAddress) invocationOnMock.getArgument(0);
+            return new SucceededFuture<SocketAddress>(mock(EventExecutor.class),
+                    new InetSocketAddress(InetAddresses.forString(ipAddressResult), address.getPort()));
+        });
+        return inetSocketAddressResolver;
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index a909a9f..94009c8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -72,6 +72,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index e63d3ae..b37dedf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -106,6 +106,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 57aa781..25eee72 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -216,6 +216,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
 		ProxyConfiguration proxyConfig = new ProxyConfiguration();
 		proxyConfig.setAuthenticationEnabled(true);
 		proxyConfig.setServicePort(Optional.of(0));
+		proxyConfig.setBrokerProxyAllowedTargetPorts("*");
 		proxyConfig.setWebServicePort(Optional.of(0));
 		proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
new file mode 100644
index 0000000..5f533e3
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+public class ProxyConnectionTest {
+
+    @Test
+    public void testMatchesHostAndPort() {
+        assertTrue(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:6650", "1.2.3.4:6650"));
+        assertTrue(ProxyConnection
+                .matchesHostAndPort("pulsar+ssl://", "pulsar+ssl://1.2.3.4:6650", "1.2.3.4:6650"));
+        assertFalse(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "5.6.7.8:1234"));
+        assertFalse(ProxyConnection
+                .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
+    }
+}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 062db18..128d33f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -53,6 +53,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 44403fb..496b3ca 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -56,6 +56,7 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest
         internalSetup();
 
         proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         proxyConfig.setHaProxyProtocolEnabled(true);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index cf61dac..aa84755 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -104,6 +104,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
         proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
index af76bfa..f1cb69f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java
@@ -78,6 +78,7 @@ public class ProxyKeyStoreTlsTestWithAuth extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
index 9b0e9b4..03d0b2b 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java
@@ -73,6 +73,7 @@ public class ProxyKeyStoreTlsTestWithoutAuth extends MockedPulsarServiceBaseTest
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index fa3c485..5145026 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -52,6 +52,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 905ca20..654686d 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -71,6 +71,7 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         //enable full parsing feature
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 9ae3fbc..39446af 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -209,6 +209,7 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
         proxyConfig.setAuthenticationEnabled(true);
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index bdba8d3..62b65d3 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -51,6 +51,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
     static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
 
     private ProxyServiceStarter serviceStarter;
+    private String serviceUrl;
 
     @Override
     @BeforeClass
@@ -62,7 +63,9 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
         serviceStarter.getConfig().setWebServicePort(Optional.of(0));
         serviceStarter.getConfig().setServicePort(Optional.of(0));
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+        serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         serviceStarter.start();
+        serviceUrl = serviceStarter.getProxyService().getServiceUrl();
     }
 
     @Override
@@ -92,7 +95,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + this.pulsar.getBrokerService().getListenPort().get())
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
                 .build();
 
         @Cleanup
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 7e6c0f5..742cfbb 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -52,6 +52,8 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
     private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem";
     private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem";
     private ProxyServiceStarter serviceStarter;
+    private String serviceUrl;
+    private int webPort;
 
     @Override
     @BeforeClass
@@ -62,12 +64,17 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
         serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
         serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
-        serviceStarter.getConfig().setServicePortTls(Optional.of(11043));
+        serviceStarter.getConfig().setServicePort(Optional.empty());
+        serviceStarter.getConfig().setServicePortTls(Optional.of(0));
+        serviceStarter.getConfig().setWebServicePort(Optional.of(0));
         serviceStarter.getConfig().setTlsEnabledWithBroker(true);
         serviceStarter.getConfig().setWebSocketServiceEnabled(true);
         serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
         serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+        serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
         serviceStarter.start();
+        serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
+        webPort = serviceStarter.getServer().getListenPortHTTP().get();
     }
 
     protected void doInitConf() throws Exception {
@@ -86,7 +93,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testProducer() throws Exception {
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043")
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl)
                 .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
                 .build();
 
@@ -106,7 +113,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
         producerWebSocketClient.start();
         MyWebSocket producerSocket = new MyWebSocket();
-        String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        String produceUri = "ws://localhost:" + webPort + "/ws/producer/persistent/sample/test/local/websocket-topic";
         Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
 
         ProducerMessage produceRequest = new ProducerMessage();
@@ -117,7 +124,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
         WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
         consumerWebSocketClient.start();
         MyWebSocket consumerSocket = new MyWebSocket();
-        String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        String consumeUri = "ws://localhost:" + webPort + "/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
         Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
         consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
         producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 2b1c22c..1859c24 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -67,6 +67,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 92f6a63..a90243f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -90,6 +90,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setZookeeperServers(DUMMY_VALUE);
         proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
 
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 59beb94..5081d0e 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -56,6 +56,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
         internalSetup();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
index 0d3d3a0..ece35cf 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java
@@ -58,6 +58,7 @@ public class ProxyTlsTestWithAuth extends MockedPulsarServiceBaseTest {
         writer.close();
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index 5d05867..b9d9b04 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -113,6 +113,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 14c7288..d813777 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -184,6 +184,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
@@ -402,6 +403,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 693e4ca..6178454 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -98,6 +98,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
 
         // enable auth&auth and use JWT at proxy
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index f20401c..59c50de 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -104,6 +104,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
         proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
 
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index 7dc927a..342df28 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -80,6 +80,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBas
         proxyConfig.setAuthenticationEnabled(true);
         proxyConfig.setAuthorizationEnabled(true);
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setServicePortTls(Optional.of(0));
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setWebServicePortTls(Optional.of(0));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index 628f7cc..ee18b60 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -68,6 +68,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest {
 
         // start proxy service
         proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
         proxyConfig.setWebServicePort(Optional.of(0));
         proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());
         proxyConfig.setStatusFilePath(STATUS_FILE_PATH);

[pulsar] 01/03: [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 25e6b65e1e4bc20a1d1fc3c875f73982854954da
Author: Lari Hotari <lh...@apache.org>
AuthorDate: Wed Feb 9 15:34:48 2022 +0200

    [Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)
    
    (cherry picked from commit 324aa1bf14d89a93b66ed3613a16c118cf9d4c0f)
---
 .../apache/pulsar/client/impl/ConnectionPool.java  |  62 ++++++------
 .../pulsar/proxy/server/ProxyConnection.java       | 105 +++++++++++----------
 .../apache/pulsar/proxy/server/ProxyService.java   |  11 ---
 3 files changed, 81 insertions(+), 97 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 8e28c87..a0ffe5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
 import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
-
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
 import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelException;
@@ -31,9 +29,6 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.resolver.dns.DnsNameResolver;
 import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -45,9 +40,7 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -58,7 +51,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConnectionPool implements Closeable {
+public class ConnectionPool implements AutoCloseable {
     protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
     private final Bootstrap bootstrap;
@@ -222,7 +215,7 @@ public class ConnectionPool implements Closeable {
     }
 
     /**
-     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
+     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
      */
     private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
         int port;
@@ -247,27 +240,32 @@ public class ConnectionPool implements Closeable {
     }
 
     /**
-     * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
-     * working
+     * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
+     * address is working.
      */
-    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
+    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
+                                                                  int port,
+                                                                  InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
         // Successfully connected to server
-        connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
-            if (unresolvedAddresses.hasNext()) {
-                // Try next IP address
-                connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
-                    // This is already unwinding the recursive call
-                    future.completeExceptionally(ex);
+        connectToAddress(unresolvedAddresses.next(), port, sniHost)
+                .thenAccept(future::complete)
+                .exceptionally(exception -> {
+                    if (unresolvedAddresses.hasNext()) {
+                        // Try next IP address
+                        connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
+                                .exceptionally(ex -> {
+                                    // This is already unwinding the recursive call
+                                    future.completeExceptionally(ex);
+                                    return null;
+                                });
+                    } else {
+                        // Failed to connect to any IP address
+                        future.completeExceptionally(exception);
+                    }
                     return null;
                 });
-            } else {
-                // Failed to connect to any IP address
-                future.completeExceptionally(exception);
-            }
-            return null;
-        });
 
         return future;
     }
@@ -285,7 +283,7 @@ public class ConnectionPool implements Closeable {
     }
 
     /**
-     * Attempt to establish a TCP connection to an already resolved single IP address
+     * Attempt to establish a TCP connection to an already resolved single IP address.
      */
     private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
         InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
@@ -303,7 +301,7 @@ public class ConnectionPool implements Closeable {
         if (maxConnectionsPerHosts == 0) {
             //Disable pooling
             if (cnx.channel().isActive()) {
-                if(log.isDebugEnabled()) {
+                if (log.isDebugEnabled()) {
                     log.debug("close connection due to pooling disabled.");
                 }
                 cnx.close();
@@ -312,14 +310,8 @@ public class ConnectionPool implements Closeable {
     }
 
     @Override
-    public void close() throws IOException {
-        try {
-            if (!eventLoopGroup.isShutdown()) {
-                eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
-            }
-        } catch (InterruptedException e) {
-            log.warn("EventLoopGroup shutdown was interrupted", e);
-        }
+    public void close() throws Exception {
+        closeAllConnections();
         dnsResolver.close();
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index f1b7807..ff392ca 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -21,8 +21,9 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.net.SocketAddress;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
@@ -34,11 +35,9 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarChannelInitializer;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.protocol.Commands;
@@ -68,8 +67,9 @@ import lombok.Getter;
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
     // ConnectionPool is used by the proxy to issue lookup requests
-    private PulsarClientImpl client;
     private ConnectionPool connectionPool;
+    private final AtomicLong requestIdGenerator =
+            new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
     private ProxyService service;
     AuthenticationDataSource authenticationData;
     private State state;
@@ -113,7 +113,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     ConnectionPool getConnectionPool() {
-        return client.getCnxPool();
+        return connectionPool;
     }
 
     public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
@@ -130,7 +130,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
             ctx.close();
             ProxyService.rejectedConnections.inc();
-            return;
         }
     }
 
@@ -149,26 +148,27 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
         if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
             directProxyHandler.outboundChannel.close();
+            directProxyHandler = null;
         }
 
-        if (client != null) {
-            client.close();
-        }
         service.getClientCnxs().remove(this);
         LOG.info("[{}] Connection closed", remoteAddress);
 
         if (connectionPool != null) {
             try {
                 connectionPool.close();
+                connectionPool = null;
             } catch (Exception e) {
                 LOG.error("Failed to close connection pool {}", e.getMessage(), e);
             }
         }
+
+        state = State.Closed;
     }
 
     @Override
@@ -222,7 +222,30 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
     }
 
-    private void completeConnect() {
+    private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+        if (service.getConfiguration().isAuthenticationEnabled()) {
+            if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                this.clientAuthData = clientData;
+                this.clientAuthMethod = authMethod;
+            }
+            if (this.connectionPool == null) {
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                                clientAuthMethod, protocolVersionToAdvertise));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+                        remoteAddress, state, clientAuthRole);
+            }
+        } else {
+            if (this.connectionPool == null) {
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
+                        remoteAddress, state);
+            }
+        }
+
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
             remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
@@ -242,17 +265,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
     }
 
-    private void createClientAndCompleteConnect(AuthData clientData)
-        throws PulsarClientException {
-        if (service.getConfiguration().isForwardAuthorizationCredentials()) {
-            this.clientAuthData = clientData;
-            this.clientAuthMethod = authMethod;
-        }
-        this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
-
-        completeConnect();
-    }
-
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
@@ -263,7 +275,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
                     remoteAddress, authMethod, clientAuthRole);
             }
-            createClientAndCompleteConnect(clientData);
+            completeConnect(clientData);
             return;
         }
 
@@ -274,7 +286,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                 remoteAddress, authMethod);
         }
         state = State.Connecting;
-        return;
     }
 
     @Override
@@ -302,16 +313,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         try {
             // init authn
             this.clientConf = createClientConfiguration();
-            int protocolVersion = getProtocolVersionToAdvertise(connect);
 
             // authn not enabled, complete
             if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
-                this.client =
-                        new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-
-                completeConnect();
+                completeConnect(null);
                 return;
             }
 
@@ -336,7 +341,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
                     .orElseThrow(() ->
                         new AuthenticationException("No anonymous role, and no authentication provider configured"));
 
-                createClientAndCompleteConnect(clientData);
+                completeConnect(clientData);
                 return;
             }
 
@@ -354,7 +359,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
             LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
             close();
-            return;
         }
     }
 
@@ -409,19 +413,26 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         lookupProxyHandler.handleLookup(lookup);
     }
 
-    private void close() {
-        state = State.Closed;
-        ctx.close();
-        try {
-            if (client != null) {
-                client.close();
+    private synchronized void close() {
+        if (state != State.Closed) {
+            state = State.Closed;
+            if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+                directProxyHandler.outboundChannel.close();
+                directProxyHandler = null;
+            }
+            if (connectionPool != null) {
+                try {
+                    connectionPool.close();
+                    connectionPool = null;
+                } catch (Exception e) {
+                    LOG.error("Error closing connection pool", e);
+                }
             }
-        } catch (PulsarClientException e) {
-            LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+            ctx.close();
         }
     }
 
-    ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+    ClientConfigurationData createClientConfiguration() {
         ClientConfigurationData clientConf = new ClientConfigurationData();
         clientConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
@@ -441,20 +452,12 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         return clientConf;
     }
 
-    private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
-            final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
-        this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                        clientAuthMethod, protocolVersion));
-        return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-    }
-
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {
         return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
     }
 
     long newRequestId() {
-        return client.newRequestId();
+        return requestIdGenerator.getAndIncrement();
     }
 
     public Authentication getClientAuthentication() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index abc7a5f..c5e04b6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -27,8 +27,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import lombok.Getter;
@@ -75,7 +73,6 @@ public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
     private final Authentication proxyClientAuthentication;
-    private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
     private ConfigurationMetadataCacheService configurationCacheService;
@@ -135,7 +132,6 @@ public class ProxyService implements Closeable {
                         AuthenticationService authenticationService) throws IOException {
         checkNotNull(proxyConfig);
         this.proxyConfig = proxyConfig;
-        this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
         this.clientCnxs = Sets.newConcurrentHashSet();
         this.topicStats = Maps.newConcurrentMap();
 
@@ -275,9 +271,6 @@ public class ProxyService implements Closeable {
         }
         acceptorGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
-        if (timer != null) {
-            timer.stop();
-        }
     }
 
     public String getServiceUrl() {
@@ -292,10 +285,6 @@ public class ProxyService implements Closeable {
         return proxyConfig;
     }
 
-    public Timer getTimer() {
-        return timer;
-    }
-
     public AuthenticationService getAuthenticationService() {
         return authenticationService;
     }

[pulsar] 02/03: Allow config of IO and acceptor threads in proxy (#14054)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1742df8fa7d5f98976af02cd643c56c2a928bffd
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Sun Jan 30 13:04:26 2022 -0700

    Allow config of IO and acceptor threads in proxy (#14054)
    
    * Allow config of IO and acceptor threads in proxy
    
    Previously, the Pulasr Proxy did not allow configuration of the number
    of IO threads and acceptor threads in the proxy.
    
    These options can be very important to tune, as is tuneable in the
    broker, so this change simply matches the brokers perspective.
    
    Also, we increase the default number of IO threads to 2x number of
    processors instead of 1x, as in a single CPU config, it still makes
    sense to have 2 threads, at least for now, where some blocking
    operatings can happen (such as authn/authz plugins)
    
    * fix checkstyle
    
    (cherry picked from commit f455418c8e1efcd3895d4dab593aadb36a682165)
---
 .../org/apache/pulsar/proxy/server/ProxyConfiguration.java | 14 ++++++++++++++
 .../java/org/apache/pulsar/proxy/server/ProxyService.java  |  8 ++++----
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 69b4480..a50ac70 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -495,6 +495,20 @@ public class ProxyConfiguration implements PulsarConfiguration {
     )
     private int httpNumThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors());
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Number of threads to use for Netty IO."
+                    + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`"
+    )
+    private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors();
+
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Number of threads to use for Netty Acceptor."
+                    + " Default is set to `1`"
+    )
+    private int numAcceptorThreads = 1;
+
     @Deprecated
     @FieldContext(
             category = CATEGORY_PLUGIN,
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index c5e04b6..75f922d 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -101,8 +101,6 @@ public class ProxyService implements Closeable {
 
     private final ScheduledExecutorService statsExecutor;
 
-    private static final int numThreads = Runtime.getRuntime().availableProcessors();
-
     static final Gauge activeConnections = Gauge
             .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create()
             .register();
@@ -143,8 +141,10 @@ public class ProxyService implements Closeable {
         } else {
             proxyLogLevel = 0;
         }
-        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, acceptorThreadFactory);
-        this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workersThreadFactory);
+        this.acceptorGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumAcceptorThreads(),
+                false, acceptorThreadFactory);
+        this.workerGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(),
+                false, workersThreadFactory);
         this.authenticationService = authenticationService;
 
         statsExecutor = Executors